2022-01-18 09:13:11 +08:00
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from arctic . arctic import Arctic
import qlib
from qlib . data import D
import unittest
class TestClass ( unittest . TestCase ) :
"""
Useful commands
- run all tests: pytest examples/orderbook_data/example.py
- run a single test: pytest -s --pdb --disable-warnings examples/orderbook_data/example.py::TestClass::test_basic01
"""
def setUp ( self ) :
"""
Configure for arctic
"""
provider_uri = " ~/.qlib/qlib_data/yahoo_cn_1min "
qlib . init (
provider_uri = provider_uri ,
2022-02-06 22:33:16 +08:00
mem_cache_size_limit = 1024 * * 3 * 2 ,
2022-01-18 09:13:11 +08:00
mem_cache_type = " sizeof " ,
kernels = 1 ,
expression_provider = { " class " : " LocalExpressionProvider " , " kwargs " : { " time2idx " : False } } ,
2022-01-19 10:39:37 +08:00
feature_provider = {
" class " : " ArcticFeatureProvider " ,
" module_path " : " qlib.contrib.data.data " ,
" kwargs " : { " uri " : " 127.0.0.1 " } ,
} ,
2022-01-18 09:13:11 +08:00
dataset_provider = {
" class " : " LocalDatasetProvider " ,
" kwargs " : {
" align_time " : False , # Order book is not fixed, so it can't be align to a shared fixed frequency calendar
} ,
} ,
)
# self.stocks_list = ["SH600519"]
self . stocks_list = [ " SZ000725 " ]
def test_basic ( self ) :
# NOTE: this data contains a lot of zeros in $askX and $bidX
df = D . features (
self . stocks_list ,
fields = [ " $ask1 " , " $ask2 " , " $bid1 " , " $bid2 " ] ,
freq = " ticks " ,
start_time = " 20201230 " ,
end_time = " 20210101 " ,
)
print ( df )
def test_basic_without_time ( self ) :
df = D . features ( self . stocks_list , fields = [ " $ask1 " ] , freq = " ticks " )
print ( df )
def test_basic01 ( self ) :
df = D . features (
self . stocks_list ,
fields = [ " TResample($ask1, ' 1min ' , ' last ' ) " ] ,
freq = " ticks " ,
start_time = " 20201230 " ,
end_time = " 20210101 " ,
)
print ( df )
def test_basic02 ( self ) :
df = D . features (
self . stocks_list ,
fields = [ " $function_code " ] ,
freq = " transaction " ,
start_time = " 20201230 " ,
end_time = " 20210101 " ,
)
print ( df )
def test_basic03 ( self ) :
df = D . features (
self . stocks_list ,
fields = [ " $function_code " ] ,
freq = " order " ,
start_time = " 20201230 " ,
end_time = " 20210101 " ,
)
print ( df )
# Here are some popular expressions for high-frequency
# 1) some shared expression
expr_sum_buy_ask_1 = " (TResample($ask1, ' 1min ' , ' last ' ) + TResample($bid1, ' 1min ' , ' last ' )) "
total_volume = (
" TResample( "
+ " + " . join ( [ f " $ { name } { i } " for i in range ( 1 , 11 ) for name in [ " asize " , " bsize " ] ] )
+ " , ' 1min ' , ' sum ' ) "
)
@staticmethod
def total_func ( name , method ) :
return " TResample( " + " + " . join ( [ f " $ { name } { i } " for i in range ( 1 , 11 ) ] ) + " , ' 1min ' , ' {} ' ) " . format ( method )
def test_exp_01 ( self ) :
exprs = [ ]
names = [ ]
for name in [ " asize " , " bsize " ] :
for i in range ( 1 , 11 ) :
exprs . append ( f " TResample($ { name } { i } , ' 1min ' , ' mean ' ) / ( { self . total_volume } ) " )
names . append ( f " v_ { name } _ { i } " )
df = D . features ( self . stocks_list , fields = exprs , freq = " ticks " )
df . columns = names
print ( df )
# 2) some often used papers;
def test_exp_02 ( self ) :
spread_func = (
lambda index : f " 2 * TResample($ask { index } - $bid { index } , ' 1min ' , ' last ' ) / { self . expr_sum_buy_ask_1 } "
)
mid_func = (
lambda index : f " 2 * TResample(($ask { index } + $bid { index } )/2, ' 1min ' , ' last ' ) / { self . expr_sum_buy_ask_1 } "
)
exprs = [ ]
names = [ ]
for i in range ( 1 , 11 ) :
exprs . extend ( [ spread_func ( i ) , mid_func ( i ) ] )
names . extend ( [ f " p_spread_ { i } " , f " p_mid_ { i } " ] )
df = D . features ( self . stocks_list , fields = exprs , freq = " ticks " )
df . columns = names
print ( df )
def test_exp_03 ( self ) :
expr3_func1 = (
lambda name , index_left , index_right : f " 2 * TResample(Abs($ { name } { index_left } - $ { name } { index_right } ), ' 1min ' , ' last ' ) / { self . expr_sum_buy_ask_1 } "
)
for name in [ " ask " , " bid " ] :
for i in range ( 1 , 10 ) :
exprs = [ expr3_func1 ( name , i + 1 , i ) ]
names = [ f " p_diff_ { name } _ { i } _ { i + 1 } " ]
exprs . extend ( [ expr3_func1 ( " ask " , 10 , 1 ) , expr3_func1 ( " bid " , 1 , 10 ) ] )
names . extend ( [ " p_diff_ask_10_1 " , " p_diff_bid_1_10 " ] )
df = D . features ( self . stocks_list , fields = exprs , freq = " ticks " )
df . columns = names
print ( df )
def test_exp_04 ( self ) :
exprs = [ ]
names = [ ]
for name in [ " asize " , " bsize " ] :
exprs . append ( f " (( { self . total_func ( name , ' mean ' ) } ) / 10) / { self . total_volume } " )
names . append ( f " v_avg_ { name } " )
df = D . features ( self . stocks_list , fields = exprs , freq = " ticks " )
df . columns = names
print ( df )
def test_exp_05 ( self ) :
exprs = [
f " 2 * Sub( { self . total_func ( ' ask ' , ' last ' ) } , { self . total_func ( ' bid ' , ' last ' ) } )/ { self . expr_sum_buy_ask_1 } " ,
f " Sub( { self . total_func ( ' asize ' , ' mean ' ) } , { self . total_func ( ' bsize ' , ' mean ' ) } )/ { self . total_volume } " ,
]
names = [ " p_accspread " , " v_accspread " ]
df = D . features ( self . stocks_list , fields = exprs , freq = " ticks " )
df . columns = names
print ( df )
# (p|v)_diff_(ask|bid|asize|bsize)_(time_interval)
def test_exp_06 ( self ) :
t = 3
expr6_price_func = (
lambda name , index , method : f ' 2 * (TResample($ { name } { index } , " { t } s " , " { method } " ) - Ref(TResample($ { name } { index } , " { t } s " , " { method } " ), 1)) / { t } '
)
exprs = [ ]
names = [ ]
for i in range ( 1 , 11 ) :
for name in [ " bid " , " ask " ] :
exprs . append (
f " TResample( { expr6_price_func ( name , i , ' last ' ) } , ' 1min ' , ' mean ' ) / { self . expr_sum_buy_ask_1 } "
)
names . append ( f " p_diff_ { name } { i } _ { t } s " )
for i in range ( 1 , 11 ) :
for name in [ " asize " , " bsize " ] :
exprs . append ( f " TResample( { expr6_price_func ( name , i , ' mean ' ) } , ' 1min ' , ' mean ' ) / { self . total_volume } " )
names . append ( f " v_diff_ { name } { i } _ { t } s " )
df = D . features ( self . stocks_list , fields = exprs , freq = " ticks " )
df . columns = names
print ( df )
# TODOs:
# Following expressions may be implemented in the future
# expr7_2 = lambda funccode, bsflag, time_interval: \
# "TResample(TRolling(TEq(@transaction.function_code, {}) & TEq(@transaction.bs_flag ,{}), '{}s', 'sum') / \
# TRolling(@transaction.function_code, '{}s', 'count') , '1min', 'mean')".format(ord(funccode), bsflag,time_interval,time_interval)
# create_dataset(7, "SH600000", [expr7_2("C")] + [expr7(funccode, ordercode) for funccode in ['B','S'] for ordercode in ['0','1']])
# create_dataset(7, ["SH600000"], [expr7_2("C", 48)] )
@staticmethod
def expr7_init ( funccode , ordercode , time_interval ) :
# NOTE: based on on order frequency (i.e. freq="order")
return f " Rolling(Eq($function_code, { ord ( funccode ) } ) & Eq($order_kind , { ord ( ordercode ) } ), ' { time_interval } s ' , ' sum ' ) / Rolling($function_code, ' { time_interval } s ' , ' count ' ) "
# (la|lb|ma|mb|ca|cb)_intensity_(time_interval)
def test_exp_07_1 ( self ) :
# NOTE: based on transaction frequency (i.e. freq="transaction")
expr7_3 = (
lambda funccode , code , time_interval : f " TResample(Rolling(Eq($function_code, { ord ( funccode ) } ) & { code } ($ask_order, $bid_order) , ' { time_interval } s ' , ' sum ' ) / Rolling($function_code, ' { time_interval } s ' , ' count ' ) , ' 1min ' , ' mean ' ) "
)
exprs = [ expr7_3 ( " C " , " Gt " , " 3 " ) , expr7_3 ( " C " , " Lt " , " 3 " ) ]
names = [ " ca_intensity_3s " , " cb_intensity_3s " ]
df = D . features ( self . stocks_list , fields = exprs , freq = " transaction " )
df . columns = names
print ( df )
trans_dict = { " B " : " a " , " S " : " b " , " 0 " : " l " , " 1 " : " m " }
def test_exp_07_2 ( self ) :
# NOTE: based on on order frequency
expr7 = (
lambda funccode , ordercode , time_interval : f " TResample( { self . expr7_init ( funccode , ordercode , time_interval ) } , ' 1min ' , ' mean ' ) "
)
exprs = [ ]
names = [ ]
for funccode in [ " B " , " S " ] :
for ordercode in [ " 0 " , " 1 " ] :
exprs . append ( expr7 ( funccode , ordercode , " 3 " ) )
names . append ( self . trans_dict [ ordercode ] + self . trans_dict [ funccode ] + " _intensity_3s " )
df = D . features ( self . stocks_list , fields = exprs , freq = " transaction " )
df . columns = names
print ( df )
@staticmethod
def expr7_3_init ( funccode , code , time_interval ) :
# NOTE: It depends on transaction frequency
return f " Rolling(Eq($function_code, { ord ( funccode ) } ) & { code } ($ask_order, $bid_order) , ' { time_interval } s ' , ' sum ' ) / Rolling($function_code, ' { time_interval } s ' , ' count ' ) "
# (la|lb|ma|mb|ca|cb)_relative_intensity_(time_interval_small)_(time_interval_big)
def test_exp_08_1 ( self ) :
expr8_1 = (
lambda funccode , ordercode , time_interval_short , time_interval_long : f " TResample(Gt( { self . expr7_init ( funccode , ordercode , time_interval_short ) } , { self . expr7_init ( funccode , ordercode , time_interval_long ) } ), ' 1min ' , ' mean ' ) "
)
exprs = [ ]
names = [ ]
for funccode in [ " B " , " S " ] :
for ordercode in [ " 0 " , " 1 " ] :
exprs . append ( expr8_1 ( funccode , ordercode , " 10 " , " 900 " ) )
names . append ( self . trans_dict [ ordercode ] + self . trans_dict [ funccode ] + " _relative_intensity_10s_900s " )
df = D . features ( self . stocks_list , fields = exprs , freq = " order " )
df . columns = names
print ( df )
def test_exp_08_2 ( self ) :
# NOTE: It depends on transaction frequency
expr8_2 = (
lambda funccode , ordercode , time_interval_short , time_interval_long : f " TResample(Gt( { self . expr7_3_init ( funccode , ordercode , time_interval_short ) } , { self . expr7_3_init ( funccode , ordercode , time_interval_long ) } ), ' 1min ' , ' mean ' ) "
)
exprs = [ expr8_2 ( " C " , " Gt " , " 10 " , " 900 " ) , expr8_2 ( " C " , " Lt " , " 10 " , " 900 " ) ]
names = [ " ca_relative_intensity_10s_900s " , " cb_relative_intensity_10s_900s " ]
df = D . features ( self . stocks_list , fields = exprs , freq = " transaction " )
df . columns = names
print ( df )
## v9(la|lb|ma|mb|ca|cb)_diff_intensity_(time_interval1)_(time_interval2)
# 1) calculating the original data
# 2) Resample data to 3s and calculate the changing rate
# 3) Resample data to 1min
def test_exp_09_trans ( self ) :
exprs = [
f ' TResample(Div(Sub(TResample( { self . expr7_3_init ( " C " , " Gt " , " 3 " ) } , " 3s " , " last " ), Ref(TResample( { self . expr7_3_init ( " C " , " Gt " , " 3 " ) } , " 3s " , " last " ), 1)), 3), " 1min " , " mean " ) ' ,
f ' TResample(Div(Sub(TResample( { self . expr7_3_init ( " C " , " Lt " , " 3 " ) } , " 3s " , " last " ), Ref(TResample( { self . expr7_3_init ( " C " , " Lt " , " 3 " ) } , " 3s " , " last " ), 1)), 3), " 1min " , " mean " ) ' ,
]
names = [ " ca_diff_intensity_3s_3s " , " cb_diff_intensity_3s_3s " ]
df = D . features ( self . stocks_list , fields = exprs , freq = " transaction " )
df . columns = names
print ( df )
def test_exp_09_order ( self ) :
exprs = [ ]
names = [ ]
for funccode in [ " B " , " S " ] :
for ordercode in [ " 0 " , " 1 " ] :
exprs . append (
f ' TResample(Div(Sub(TResample( { self . expr7_init ( funccode , ordercode , " 3 " ) } , " 3s " , " last " ), Ref(TResample( { self . expr7_init ( funccode , ordercode , " 3 " ) } , " 3s " , " last " ), 1)), 3) , " 1min " , " mean " ) '
)
names . append ( self . trans_dict [ ordercode ] + self . trans_dict [ funccode ] + " _diff_intensity_3s_3s " )
df = D . features ( self . stocks_list , fields = exprs , freq = " order " )
df . columns = names
print ( df )
def test_exp_10 ( self ) :
exprs = [ ]
names = [ ]
for i in [ 5 , 10 , 30 , 60 ] :
exprs . append (
f ' TResample(Ref(TResample($ask1 + $bid1, " 1s " , " ffill " ), { - i } ) / TResample($ask1 + $bid1, " 1s " , " ffill " ) - 1, " 1min " , " mean " ) '
)
names . append ( f " lag_ { i } _change_rate " for i in [ 5 , 10 , 30 , 60 ] )
df = D . features ( self . stocks_list , fields = exprs , freq = " ticks " )
df . columns = names
print ( df )
if __name__ == " __main__ " :
unittest . main ( )