#include "TestRecvThread.h" #include #include /* 响应包解码函数,根据特定格式解码从服务端收到的数据,解析为ResponsePacket */ static size_t pushResponse(const char* recvBuffer, size_t length, list& done) { size_t pos = 0; while (pos < length) { unsigned int len = length - pos; if(len < sizeof(unsigned int)) { break; } unsigned int iHeaderLen = ntohl(*(unsigned int*)(recvBuffer + pos)); //做一下保护,长度大于M if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int)) { throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen)); } //包没有接收全 if (len < iHeaderLen) { break; } else { ResponsePacket rsp; rsp.iRequestId = ntohl(*((unsigned int *)(recvBuffer + pos + sizeof(unsigned int)))); rsp.sBuffer.resize(iHeaderLen - 2*sizeof(unsigned int)); ::memcpy(&rsp.sBuffer[0], recvBuffer + pos + 2*sizeof(unsigned int), iHeaderLen - 2*sizeof(unsigned int)); pos += iHeaderLen; done.push_back(rsp); } } return pos; } /* 请求包编码函数,本函数的打包格式为 整个包长度(字节)+iRequestId(字节)+包内容 */ static void pushRequest(const RequestPacket& request, string& buff) { unsigned int net_bufflength = htonl(request.sBuffer.size()+8); unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength); buff = ""; for (int i = 0; i<4; ++i) { buff += *bufflengthptr++; } unsigned int netrequestId = htonl(request.iRequestId); unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId); for (int i = 0; i<4; ++i) { buff += *netrequestIdptr++; } string tmp; tmp.assign((const char*)(&request.sBuffer[0]), request.sBuffer.size()); buff+=tmp; } static void printResult(int iRequestId, const string &sResponseStr) { cout << "request id: " << iRequestId << endl; cout << "response str: " << sResponseStr << endl; } static void printPushInfo(const string &sResponseStr) { cout << "push message: " << sResponseStr << endl; } int TestPushCallBack::onDispatch(ReqMessagePtr msg) { if(msg->request.sFuncName == "printResult") { string sRet; cout << "sBuffer: " << msg->response.sBuffer.size() << endl; sRet.assign(&(msg->response.sBuffer[0]), msg->response.sBuffer.size()); printResult(msg->request.iRequestId, sRet); return 0; } else if(msg->response.iRequestId == 0) { string sRet; sRet.assign(&(msg->response.sBuffer[0]), msg->response.sBuffer.size()); printPushInfo(sRet); return 0; } else { cout << "no match func!" <(sObjName+"@"+sObjHost); ProxyProtocol prot; prot.requestFunc = pushRequest; prot.responseFunc = pushResponse; _prx->tars_set_protocol(prot); } void RecvThread::terminate() { _bTerminate = true; { tars::TC_ThreadLock::Lock sync(*this); notifyAll(); } } void RecvThread::run(void) { TestPushCallBackPtr cbPush = new TestPushCallBack(); _prx->tars_set_push_callback(cbPush); string buf("heartbeat"); while(!_bTerminate) { { try { TestPushCallBackPtr cb = new TestPushCallBack(); _prx->rpc_call_async(_prx->tars_gen_requestid(), "printResult", buf.c_str(), buf.length(), cb); } catch(TarsException& e) { cout << "TarsException: " << e.what() << endl; } catch(...) { cout << "unknown exception" << endl; } } { TC_ThreadLock::Lock sync(*this); timedWait(5000); } } }