|
|
#include "TestRecvThread.h"
|
|||
|
|
#include <iostream>
|
|||
|
|
#include <arpa/inet.h>
|
|||
|
|
|
|||
|
|
/*
|
|||
|
|
响应包解码函数,根据特定格式解码从服务端收到的数据,解析为ResponsePacket
|
|||
|
|
*/
|
|||
|
|
static size_t pushResponse(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
|
|||
|
|
{
|
|||
|
|
size_t pos = 0;
|
|||
|
|
while (pos < length)
|
|||
|
|
{
|
|||
|
|
unsigned int len = length - pos;
|
|||
|
|
if(len < sizeof(unsigned int))
|
|||
|
|
{
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
unsigned int iHeaderLen = ntohl(*(unsigned int*)(recvBuffer + pos));
|
|||
|
|
|
|||
|
|
//做一下保护,长度大于M
|
|||
|
|
if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int))
|
|||
|
|
{
|
|||
|
|
throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
//包没有接收全
|
|||
|
|
if (len < iHeaderLen)
|
|||
|
|
{
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
ResponsePacket rsp;
|
|||
|
|
rsp.iRequestId = ntohl(*((unsigned int *)(recvBuffer + pos + sizeof(unsigned int))));
|
|||
|
|
rsp.sBuffer.resize(iHeaderLen - 2*sizeof(unsigned int));
|
|||
|
|
::memcpy(&rsp.sBuffer[0], recvBuffer + pos + 2*sizeof(unsigned int), iHeaderLen - 2*sizeof(unsigned int));
|
|||
|
|
|
|||
|
|
pos += iHeaderLen;
|
|||
|
|
|
|||
|
|
done.push_back(rsp);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return pos;
|
|||
|
|
}
|
|||
|
|
/*
|
|||
|
|
请求包编码函数,本函数的打包格式为
|
|||
|
|
整个包长度(字节)+iRequestId(字节)+包内容
|
|||
|
|
*/
|
|||
|
|
static void pushRequest(const RequestPacket& request, string& buff)
|
|||
|
|
{
|
|||
|
|
unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
|
|||
|
|
unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
|
|||
|
|
|
|||
|
|
buff = "";
|
|||
|
|
for (int i = 0; i<4; ++i)
|
|||
|
|
{
|
|||
|
|
buff += *bufflengthptr++;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
unsigned int netrequestId = htonl(request.iRequestId);
|
|||
|
|
unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
|
|||
|
|
|
|||
|
|
for (int i = 0; i<4; ++i)
|
|||
|
|
{
|
|||
|
|
buff += *netrequestIdptr++;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
string tmp;
|
|||
|
|
tmp.assign((const char*)(&request.sBuffer[0]), request.sBuffer.size());
|
|||
|
|
buff+=tmp;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
static void printResult(int iRequestId, const string &sResponseStr)
|
|||
|
|
{
|
|||
|
|
cout << "request id: " << iRequestId << endl;
|
|||
|
|
cout << "response str: " << sResponseStr << endl;
|
|||
|
|
}
|
|||
|
|
static void printPushInfo(const string &sResponseStr)
|
|||
|
|
{
|
|||
|
|
cout << "push message: " << sResponseStr << endl;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
int TestPushCallBack::onDispatch(ReqMessagePtr msg)
|
|||
|
|
{
|
|||
|
|
if(msg->request.sFuncName == "printResult")
|
|||
|
|
{
|
|||
|
|
string sRet;
|
|||
|
|
cout << "sBuffer: " << msg->response.sBuffer.size() << endl;
|
|||
|
|
sRet.assign(&(msg->response.sBuffer[0]), msg->response.sBuffer.size());
|
|||
|
|
printResult(msg->request.iRequestId, sRet);
|
|||
|
|
return 0;
|
|||
|
|
}
|
|||
|
|
else if(msg->response.iRequestId == 0)
|
|||
|
|
{
|
|||
|
|
string sRet;
|
|||
|
|
sRet.assign(&(msg->response.sBuffer[0]), msg->response.sBuffer.size());
|
|||
|
|
printPushInfo(sRet);
|
|||
|
|
return 0;
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
cout << "no match func!" <<endl;
|
|||
|
|
}
|
|||
|
|
return -3;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
RecvThread::RecvThread():_bTerminate(false)
|
|||
|
|
{
|
|||
|
|
string sObjName = "Test.TestPushServer.TestPushServantObj";
|
|||
|
|
string sObjHost = "tcp -h 10.120.129.226 -t 60000 -p 10099";
|
|||
|
|
|
|||
|
|
_prx = _comm.stringToProxy<ServantPrx>(sObjName+"@"+sObjHost);
|
|||
|
|
|
|||
|
|
ProxyProtocol prot;
|
|||
|
|
prot.requestFunc = pushRequest;
|
|||
|
|
prot.responseFunc = pushResponse;
|
|||
|
|
|
|||
|
|
_prx->tars_set_protocol(prot);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void RecvThread::terminate()
|
|||
|
|
{
|
|||
|
|
_bTerminate = true;
|
|||
|
|
{
|
|||
|
|
tars::TC_ThreadLock::Lock sync(*this);
|
|||
|
|
notifyAll();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void RecvThread::run(void)
|
|||
|
|
{
|
|||
|
|
TestPushCallBackPtr cbPush = new TestPushCallBack();
|
|||
|
|
_prx->tars_set_push_callback(cbPush);
|
|||
|
|
|
|||
|
|
string buf("heartbeat");
|
|||
|
|
|
|||
|
|
while(!_bTerminate)
|
|||
|
|
{
|
|||
|
|
{
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
TestPushCallBackPtr cb = new TestPushCallBack();
|
|||
|
|
_prx->rpc_call_async(_prx->tars_gen_requestid(), "printResult", buf.c_str(), buf.length(), cb);
|
|||
|
|
}
|
|||
|
|
catch(TarsException& e)
|
|||
|
|
{
|
|||
|
|
cout << "TarsException: " << e.what() << endl;
|
|||
|
|
}
|
|||
|
|
catch(...)
|
|||
|
|
{
|
|||
|
|
cout << "unknown exception" << endl;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
{
|
|||
|
|
TC_ThreadLock::Lock sync(*this);
|
|||
|
|
timedWait(5000);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|