2017-06-19 16:41:42 +08:00
/**
2017-01-18 16:19:06 +08:00
* Tencent is pleased to support the open source community by making Tars available.
*
* Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
# include "QueryDbThread.h"
# include "QueryServer.h"
# include "DbProxy.h"
/////////////////////////////////////////////////////////////////////////
QueryDbThread : : QueryDbThread ( )
: _terminate ( false )
{
TLOGDEBUG ( " QueryDbThread init ok " < < endl ) ;
}
QueryDbThread : : ~ QueryDbThread ( )
{
terminate ( ) ;
TLOGDEBUG ( " QueryDbThread terminate. " < < endl ) ;
}
void QueryDbThread : : start ( int iThreadNum )
{
for ( int i = 0 ; i < iThreadNum ; + + i )
{
HandleThreadRunner * r = new HandleThreadRunner ( this ) ;
r - > start ( ) ;
_runners . push_back ( r ) ;
}
}
void QueryDbThread : : terminate ( )
{
_terminate = true ;
for ( uint32_t i = 0 ; i < _runners . size ( ) ; + + i )
{
if ( _runners [ i ] - > isAlive ( ) )
{
_runners [ i ] - > terminate ( ) ;
_queue . notifyT ( ) ;
}
}
for ( uint32_t i = 0 ; i < _runners . size ( ) ; + + i )
{
if ( _runners [ i ] - > isAlive ( ) )
{
_runners [ i ] - > getThreadControl ( ) . join ( ) ;
}
}
_queue . clear ( ) ;
for ( uint32_t i = 0 ; i < _runners . size ( ) ; + + i )
{
if ( _runners [ i ] )
{
delete _runners [ i ] ;
_runners [ i ] = NULL ;
}
}
}
bool QueryDbThread : : pop ( QueryItem * & pItem )
{
return _queue . pop_front ( pItem , 2000 ) ;
}
void QueryDbThread : : put ( QueryItem * pItem )
{
if ( ! _terminate & & pItem )
{
_queue . push_back ( pItem ) ;
}
}
/////////////////////////////////////////////////////////////////////////
HandleThreadRunner : : HandleThreadRunner ( QueryDbThread * proc )
: _terminate ( false )
, _proc ( proc )
{
}
void HandleThreadRunner : : terminate ( )
{
_terminate = true ;
}
void HandleThreadRunner : : run ( )
{
string sRes ( " " ) ;
DbProxy _dbproxy ;
int64_t tStart = 0 ;
int64_t tEnd = 0 ;
while ( ! _terminate )
{
QueryItem * pQueryItem = NULL ;
if ( ! _terminate & & _proc - > pop ( pQueryItem ) )
{
try
{
tStart = TNOWMS ;
_dbproxy . queryData ( pQueryItem - > mQuery , sRes , pQueryItem - > bFlag ) ;
tEnd = TNOWMS ;
sRes + = " endline \n " ;
pQueryItem - > current - > sendResponse ( sRes . c_str ( ) , sRes . length ( ) ) ;
FDLOG ( " inout " ) < < " HandleThreadRunner::run sUid: " < < pQueryItem - > sUid < < " queryData timecost(ms): " < < ( tEnd - tStart ) < < endl ;
}
catch ( exception & ex )
{
TLOGERROR ( " HandleThreadRunner::run exception: " < < ex . what ( ) < < endl ) ;
string sResult = " Ret:-1 \n " + string ( ex . what ( ) ) + " \n endline \n " ;
pQueryItem - > current - > sendResponse ( sResult . c_str ( ) , sResult . length ( ) ) ;
FDLOG ( " inout " ) < < " HandleThreadRunner::run sUid: " < < pQueryItem - > sUid < < " exception: " < < ex . what ( ) < < endl ;
}
catch ( . . . )
{
TLOGERROR ( " HandleThreadRunner::run exception. " < < endl ) ;
string sResult = " Ret:-1 \n unknown exception \n endline \n " ;
pQueryItem - > current - > sendResponse ( sResult . c_str ( ) , sResult . length ( ) ) ;
FDLOG ( " inout " ) < < " HandleThreadRunner::run sUid: " < < pQueryItem - > sUid < < " unknown exception. " < < endl ;
}
sRes = " " ;
delete pQueryItem ;
pQueryItem = NULL ;
}
}
}