SIGN IN SIGN UP
TarsCloud / Tars UNCLAIMED

Tars is a high-performance RPC framework based on name service and Tars protocol, also integrated administration platform, and implemented hosting-service via flexible schedule.

0 0 12 C++
2017-01-18 16:19:06 +08:00
/**
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
#include "KeepAliveThread.h"
#include "RegistryProxy.h"
#include "NodeRollLogger.h"
#include "util/tc_timeprovider.h"
#include "util.h"
KeepAliveThread::KeepAliveThread()
: _terminate(false)
, _registryPrx(NULL)
{
_runTime = time(0);
_nodeInfo = _platformInfo.getNodeInfo();
_heartTimeout = TC_Common::strto<int>(g_pconf->get("/tars/node/keepalive<heartTimeout>", "10"));
_monitorInterval = TC_Common::strto<int>(g_pconf->get("/tars/node/keepalive<monitorInterval>", "2"));
_monitorIntervalMs = TC_Common::strto<int>(g_pconf->get("/tars/node/keepalive<monitorIntervalMs>", "10"));
_synInterval = TC_Common::strto<int>(g_pconf->get("/tars/node/keepalive<synStatInterval>", "60"));
_synStatBatch = g_pconf->get("/tars/node/keepalive<synStatBatch>", "Y");
_monitorInterval = _monitorInterval > 10 ? 10 : (_monitorInterval < 1 ? 1 : _monitorInterval);
}
KeepAliveThread::~KeepAliveThread()
{
terminate();
}
void KeepAliveThread::terminate()
{
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << endl;
_terminate = true;
if (isAlive())
{
_lock.notifyAll();
getThreadControl().join();
}
}
bool KeepAliveThread::timedWait(int millsecond)
{
TC_ThreadLock::Lock lock(_lock);
if (_terminate)
{
return true;
}
return _lock.timedWait(millsecond);
}
void KeepAliveThread::run()
{
bool bLoaded = false;
bool bRegistered = false;
while (!_terminate)
{
int64_t startMs = TC_TimeProvider::getInstance()->getNowMs();
try
{
//获取主控代理
if (!_registryPrx)
{
_registryPrx = AdminProxy::getInstance()->getRegistryProxy();
if (!_registryPrx)
{
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "RegistryProxy init fail ! retry it after " + TC_Common::tostr(_monitorInterval) + " second" << endl;
}
}
//注册node信息
if (_registryPrx && bRegistered == false)
{
bRegistered = registerNode();
}
//加载服务
if (bLoaded == false)
{
bLoaded = loadAllServers();
}
//检查服务的limit配置是否需要更新
if (bLoaded)
{
ServerFactory::getInstance()->setAllServerResourceLimit();
}
//检查服务
checkAlive();
// 上报node状态
if (reportAlive() != 0)
{
bRegistered = false; //registry服务重启 需要重新注册
}
}
catch (exception& e)
{
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << "catch exception|" << e.what() << endl;
}
catch (...)
{
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << "catch unkown exception|" << endl;
}
{
int64_t useMs = (TC_TimeProvider::getInstance()->getNowMs() - startMs);
2017-04-14 17:07:54 +08:00
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "run use:" << useMs << " ms" << endl;
2017-01-18 16:19:06 +08:00
}
timedWait(ServerFactory::getInstance()->getMinMonitorIntervalMs());
}
}
bool KeepAliveThread::registerNode()
{
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "registerNode begin===============|node name|" << _nodeInfo.nodeName << endl;
try
{
int iRet = _registryPrx->registerNode(_nodeInfo.nodeName, _nodeInfo, _platformInfo.getLoadInfo());
if (iRet == 0)
{
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "register node succ" << endl;
return true;
}
}
catch (exception& e)
{
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << "KeepAliveThread::registerNode catch exception|" << e.what() << endl;
}
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "register node fail" << endl;
return false;
}
bool KeepAliveThread::loadAllServers()
{
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "load server begin===============|node name|" << _nodeInfo.nodeName << endl;
/**
* 由于加载失败或者node下没有部署服务这里就会一直去访问主控
* 增加这个限制如果超过5次失败则不去加载10分钟重试10次
* 如果之后有新服务部署会自动添加到node缓存中不影响监控流程
*/
static int iFailNum = 0;
if (iFailNum >= 5)
{
static time_t tLastLoadTime = 0;
if ((TNOW - tLastLoadTime) < 600)
{
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << "load server fail" << endl;
return false;
}
tLastLoadTime = TNOW;
iFailNum = 0;
}
try
{
if (ServerFactory::getInstance()->loadServer())
{
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "load server succ" << endl;
return true;
}
}
catch (exception& e)
{
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << "catch exception|" << e.what() << endl;
}
iFailNum++;
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << "load server fail" << endl;
return false;
}
int KeepAliveThread::reportAlive()
{
try
{
static time_t tReport;
time_t tNow = TNOW;
if (tNow - tReport > _heartTimeout)
{
tReport = tNow;
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "node keep alive ----------------------------------------------------|" << time(0) << "|" << pthread_self() << '\n' << endl;
int iRet = _registryPrx->keepAlive(_nodeInfo.nodeName, _platformInfo.getLoadInfo());
return iRet;
}
}
catch (exception& e)
{
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << "KeepAliveThread::reportAlive catch exception|" << e.what() << endl;
}
return 0;
}
int KeepAliveThread::synStat()
{
try
{
int iRet = -1;
vector<ServerStateInfo> v;
_stat.swap(v);
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "node syn stat size|" << v.size() << "|" << _synStatBatch << "|" << pthread_self() << endl;
if (v.size() > 0)
{
iRet = _registryPrx->updateServerBatch(v);
}
return iRet;
}
catch (exception& e)
{
string s = e.what();
if (s.find("server function mismatch exception") != string::npos)
{
_synStatBatch = "N";
}
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << "catch exception|" << s << endl;
}
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "fail,set SynStatBatch = " << _synStatBatch << endl;
return -1;
}
void KeepAliveThread::checkAlive()
{
int64_t startMs = TC_TimeProvider::getInstance()->getNowMs();
static time_t tSyn = 0;
bool bNeedSynServerState = false;
time_t tNow = TNOW;
if (tNow - tSyn > _synInterval)
{
tSyn = tNow;
bNeedSynServerState = true;
}
string sServerId;
_stat.clear();
map<string, ServerGroup> mmServerList = ServerFactory::getInstance()->getAllServers();
map<string, ServerGroup>::const_iterator it = mmServerList.begin();
for (; it != mmServerList.end(); it++)
{
map<string, ServerObjectPtr>::const_iterator p = it->second.begin();
for (; p != it->second.end(); p++)
{
try
{
sServerId = it->first + "." + p->first;
ServerObjectPtr pServerObjectPtr = p->second;
if (!pServerObjectPtr)
{
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << sServerId << "|=NULL|" << endl;
continue;
}
pServerObjectPtr->doMonScript();
if (TNOW - _runTime < ServantHandle::HEART_BEAT_INTERVAL * 5)
{
//等待心跳包
continue;
}
//corelimit checking
pServerObjectPtr->checkCoredumpLimit();
//(checkServer时上报Server所占用的内存给主控)
pServerObjectPtr->checkServer(_heartTimeout);
//cache 同步server 状态
if (bNeedSynServerState == true)
{
ServerInfo tServerInfo;
tServerInfo.application = it->first;
tServerInfo.serverName = p->first;
ServerDescriptor tServerDescriptor = pServerObjectPtr->getServerDescriptor();
tServerDescriptor.settingState = pServerObjectPtr->isEnabled() == true ? "active" : "inactive";
g_serverInfoHashmap.set(tServerInfo, tServerDescriptor);
ServerStateInfo tServerStateInfo;
tServerStateInfo.serverState = (pServerObjectPtr->IsEnSynState() ? pServerObjectPtr->getState() : tars::Inactive);
tServerStateInfo.processId = pServerObjectPtr->getPid();
tServerStateInfo.nodeName = _nodeInfo.nodeName;
tServerStateInfo.application = it->first;
tServerStateInfo.serverName = p->first;
if (TC_Common::lower(_synStatBatch) == "y")
{
_stat.push_back(tServerStateInfo);
}
else
{
pServerObjectPtr->asyncSynState();
}
}
}
catch (exception& e)
{
NODE_LOG("KeepAliveThread")->error() << FILE_FUN << sServerId << " catch exception|" << e.what() << endl;
}
}
}
if (bNeedSynServerState)
{
synStat();
}
{
int64_t useMs = (TC_TimeProvider::getInstance()->getNowMs() - startMs);
2017-04-14 17:07:54 +08:00
NODE_LOG("KeepAliveThread")->debug() << FILE_FUN << "checkAlive use:" << useMs << " ms" << endl;
2017-01-18 16:19:06 +08:00
}
}