# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import ctypes import os import time import argparse import subprocess import scipy.sparse as sp import mxnet as mx import numpy as np import numpy.random as rnd from mxnet.test_utils import rand_ndarray, set_default_device, assert_almost_equal, get_bz2_data from mxnet.base import check_call, _LIB from util import estimate_density PARSER = argparse.ArgumentParser(description="Benchmark sparse operators", formatter_class=argparse.ArgumentDefaultsHelpFormatter) PARSER.add_argument('--num-omp-threads', type=int, default=1, help='number of omp threads to set in MXNet') PARSER.add_argument('--gpu', action='store_true', help="to be run on gpu") # TODO: Use logging later PARSER.add_argument('--verbose', action='store_true', help="Verbose output") ARGS = PARSER.parse_args() # some data information KDDA = { 'data_mini': 'kdda.t.mini', 'data_name': 'kdda.t', 'data_origin_name': 'kdda.t.bz2', 'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2", 'feature_dim': 20216830, 'm': [1, 8, 32], 'batch_size': [64], 'default_index': {'batch_size': 0, 'output_dim': 2}, 'num_batches': 10 } AVAZU = { 'data_mini': 'avazu-app.t.mini', 'data_name': 'avazu-app.t', 'data_origin_name': 'avazu-app.t.bz2', 'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2", 'feature_dim': 1000000, 'm': [1, 1000, 2000], 'batch_size': [128, 256], 'default_index': {'batch_size': 0, 'output_dim': 1}, 'num_batches': 10 } CRITEO = { 'data_mini': 'criteo.t.mini', 'data_name': 'criteo.t', 'data_origin_name': 'criteo.t.bz2', 'url': "https://s3-us-west-2.amazonaws.com/sparse-dataset/criteo.t.bz2", 'feature_dim': 8388621, 'm': [1, 8, 16, 32, 64], 'batch_size': [64, 128], 'default_index': {'batch_size': 1, 'output_dim': 3}, 'num_batches': 10 } SYNTHETIC1 = { 'feature_dim': [1000000], 'm': [256, 1000], 'density': [0.001, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 0.65], 'batch_size': [64, 128], 'default_index': {'batch_size': 1, 'density': 2, 'output_dim': 1, 'feature_dim': 0}, 'num_repeat': 10 } SYNTHETIC2 = { 'feature_dim': [8000000, 16000000], 'm': [1, 32], 'density': [0.001, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 0.65], 'batch_size': [64, 128], 'default_index': {'batch_size': 1, 'density': 2, 'output_dim': 1, 'feature_dim': 0}, 'num_repeat': 10 } def measure_cost(repeat, scipy_trans_lhs, scipy_dns_lhs, func_name, *args, **kwargs): """Measure time cost of running a function """ mx.nd.waitall() args_list = [] for arg in args: args_list.append(arg) start = time.time() if scipy_trans_lhs: args_list[0] = np.transpose(args_list[0]) if scipy_dns_lhs else sp.spmatrix.transpose(args_list[0]) for _ in range(repeat): func_name(*args_list, **kwargs) mx.nd.waitall() end = time.time() diff = end - start return diff / repeat def _get_iter(path, data_shape, batch_size): data_train = mx.io.LibSVMIter(data_libsvm=path, data_shape=data_shape, batch_size=batch_size) data_iter = iter(data_train) return data_iter def _line_count(path): return int(subprocess.check_output('wc -l {}'.format(path), shell=True).split()[0]) def _compare_sparse_dense(data_dir, file_name, mini_file_name, feature_dim, output_dim, density, batch_size, num_batches=3, num_repeat=5, transpose=False, rsp=False): def create_mini_path(mini_path, path, num_batches): """Samples batches of size: batch_size, total number: num_batches from the dataset files for running benchmarks""" if not os.path.exists(mini_path): last = _line_count(path) - num_batches * batch_size last = last if last >= 1 else 1 start = int(rnd.uniform(1, last)) os.system("sed -n '{},{}p' {} > {}".format( start, start + num_batches * batch_size, repr(path), repr(mini_path))) assert os.path.exists(mini_path) def run_benchmark(mini_path): """Run benchmarks """ data_shape = (feature_dim, ) train_iter = _get_iter(mini_path, data_shape, batch_size) weight_row_dim = batch_size if transpose else feature_dim weight_shape = (weight_row_dim, output_dim) if not rsp: weight = mx.nd.random.uniform(low=0, high=1, shape=weight_shape) else: weight = rand_ndarray(weight_shape, "row_sparse", density=0.05, distribution="uniform") total_cost = {} average_cost = {} count = 0 total_cost["sparse"] = 0. total_cost["dense"] = 0. for _ in train_iter: csr_data = train_iter.getdata() dns_data = csr_data.tostype('default') cost_sparse = measure_cost(num_repeat, False, False, mx.nd.sparse.dot, csr_data, weight, transpose_a=transpose) cost_dense = measure_cost(num_repeat, False, False, mx.nd.dot, dns_data, weight, transpose_a=transpose) total_cost["sparse"] += cost_sparse total_cost["dense"] += cost_dense count = count + 1 average_cost["sparse"] = total_cost["sparse"] / count average_cost["dense"] = total_cost["dense"] / count return (average_cost["sparse"], average_cost["dense"]) def print_result(average_cost_sparse, average_cost_dense): """Print result of comparison between sparse and dense """ ratio = average_cost_dense / average_cost_sparse fmt = '{:15.4f} {:10d} {:10d} {:10d} {:20.2f} {:15.2f} {:15.2f} {:10} {:10}' print(fmt.format(density * 100, batch_size, output_dim, feature_dim, ratio, average_cost_dense*1000, average_cost_sparse*1000, transpose, rsp)) mini_path = os.path.join(data_dir, mini_file_name) path = os.path.join(data_dir, file_name) create_mini_path(mini_path, path, num_batches) average_cost_sparse, average_cost_dense = run_benchmark(mini_path) print_result(average_cost_sparse, average_cost_dense) def test_dot_real(data_dict): """Dot operator testing with real datasets""" data_dir = os.path.join(os.getcwd(), 'data') path = os.path.join(data_dir, data_dict['data_name']) if not os.path.exists(path): get_bz2_data( data_dir, data_dict['data_name'], data_dict['url'], data_dict['data_origin_name'] ) assert os.path.exists(path) k = data_dict['feature_dim'] m = data_dict['m'] batch_size_list = data_dict['batch_size'] default_output_index = data_dict['default_index']['output_dim'] default_batch_size_index = data_dict['default_index']['batch_size'] density = estimate_density(path, data_dict['feature_dim']) num_batches = data_dict['num_batches'] assert default_batch_size_index < len(batch_size_list) assert default_output_index < len(m) if ARGS.verbose: print(f"Running Benchmarking on {repr(data_dict['data_mini'])} data") print('{:>15} {:>10} {:>10} {:>10} {:>20} {:>15} {:>15} {:>10} {:>10}'.format('density(%)', 'n', 'm', 'k', 't_dense/t_sparse', 't_dense(ms)', 't_sparse(ms)', 'is_transpose', 'rhs_rsp')) for output_dim in m: _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], k, output_dim, density, batch_size_list[default_batch_size_index], num_batches) _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], k, output_dim, density, batch_size_list[default_batch_size_index], num_batches, transpose=True) _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], k, output_dim, density, batch_size_list[default_batch_size_index], num_batches, rsp=True) for batch_size in batch_size_list: _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], k, m[default_output_index], density, batch_size, num_batches) _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], k, m[default_output_index], density, batch_size, num_batches, transpose=True) _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], k, output_dim, density, batch_size_list[default_batch_size_index], num_batches, rsp=True) def test_dot_synthetic(data_dict): """benchmark sparse mxnet dot and scipy dot operator with matrices of given density. `t_sparse` is the runtime of the invoked sparse dot operator in ms, while `t_dense` is the runtime of dot(dns, dns), with the same matrices except that they are in default storage type. """ # Benchmark MXNet and Scipys dot operator def bench_dot(lhs_shape, rhs_shape, lhs_stype, rhs_stype, lhs_den, rhs_den, trans_lhs, ctx, num_repeat=10, fw="mxnet", distribution="uniform"): set_default_device(ctx) assert fw == "mxnet" or fw == "scipy" # Set funcs dot_func_sparse = mx.nd.sparse.dot if fw == "mxnet" else sp.spmatrix.dot dot_func_dense = mx.nd.dot if fw == "mxnet" else np.dot # Create matrix instances lhs_nd = rand_ndarray(lhs_shape, lhs_stype, density=lhs_den, distribution=distribution) # only uniform distribution supported for rhs if rhs_stype == 'csr': rhs_nd = rand_ndarray(rhs_shape, rhs_stype, density=rhs_den, distribution=distribution) else: rhs_nd = rand_ndarray(rhs_shape, rhs_stype, density=rhs_den, distribution="uniform") lhs_dns = None rhs_dns = None dense_cost = None sparse_cost = None if fw == "mxnet": lhs_dns = lhs_nd if lhs_stype == 'default' else lhs_nd.tostype('default') rhs_dns = rhs_nd if rhs_stype == 'default' else rhs_nd.tostype('default') # One warm up run, verify correctness out = dot_func_sparse(lhs_nd, rhs_dns, trans_lhs) out_expected = dot_func_dense(lhs_dns, rhs_dns, trans_lhs) assert_almost_equal(out.asnumpy(), out_expected.asnumpy(), rtol=1e-1, atol=1e-1) sparse_cost = measure_cost(num_repeat, False, False, dot_func_sparse, lhs_nd, rhs_nd, trans_lhs) dense_cost = measure_cost(num_repeat, False, False, dot_func_dense, lhs_dns, rhs_dns, trans_lhs) else: lhs_dns = lhs_nd.asnumpy() rhs_dns = rhs_nd.asnumpy() lhs_nd = sp.csr_matrix(lhs_nd.asnumpy()) rhs_nd = rhs_nd.asnumpy() # One warm up run, verify correctness lhs_nd_copy = sp.spmatrix.transpose(lhs_nd) if trans_lhs else lhs_nd out = dot_func_sparse(lhs_nd_copy, rhs_dns) sparse_cost = measure_cost(num_repeat, trans_lhs, False, dot_func_sparse, lhs_nd, rhs_nd) dense_cost = measure_cost(num_repeat, trans_lhs, True, dot_func_dense, lhs_dns, rhs_dns) speedup = dense_cost / sparse_cost # Print results m = lhs_shape[0] k = lhs_shape[1] n = rhs_shape[1] result_pattern = '{:15.1f} {:15.1f} {:>10} {:8d} {:8d} {:8d} {:13.2f} {:13.2f} {:8.2f}' results = result_pattern.format(lhs_den*100, rhs_den*100, str(ctx), m, k, n, sparse_cost*1000, dense_cost*1000, speedup) print(results) def print_benchmark_info(lhs, rhs, lhs_trans, fw): trans_str = "^T" if lhs_trans else "" print("========================================================") print(f" {fw} sparse dot benchmark: dot({lhs}, {rhs}) = {rhs} ") print( f" (matrix multiplication: (m x k){trans_str} * (k x n) = m x n) ") print("========================================================") headline_pattern = '{:>15} {:>15} {:>10} {:>8} {:>8} {:>8} {:>13} {:>13} {:>8}' headline = headline_pattern.format('lhs_density(%)', 'rhs_density(%)', 'context', 'm', 'k', 'n', 't_sparse(ms)', 't_dense(ms)', 'speedup') print(headline) def run_benchmark(ctx=None, lhs="csr", lhs_trans=False, rhs="dns", fw="mxnet", rhs_density=1, distribution="uniform"): if rhs_density > 1 or rhs_density < 0: raise ValueError("rhs_density has to be between 0 and 1") print_benchmark_info(lhs, rhs, lhs_trans, fw) if rhs == "csr": lhs_stype = "default" rhs_stype = "csr" assert (lhs_stype == 'default'), "Only dot(default, csr) supported" # Arrange dimensions according to use case. For below csr will have num_rows << num_cols feature_dim_list = data_dict['batch_size'] batch_size_list = data_dict['m'] output_dim_list = data_dict['feature_dim'] density_list = data_dict['density'] default_output_index = data_dict['default_index']['feature_dim'] default_density_index = data_dict['default_index']['density'] default_feature_index = data_dict['default_index']['batch_size'] default_batch_size_index = data_dict['default_index']['output_dim'] num_repeat = data_dict['num_repeat'] else: lhs_stype = "csr" rhs_stype = "row_sparse" if rhs == "rsp" else "default" feature_dim_list = data_dict['feature_dim'] output_dim_list = data_dict['m'] batch_size_list = data_dict['batch_size'] density_list = data_dict['density'] default_output_index = data_dict['default_index']['output_dim'] default_batch_size_index = data_dict['default_index']['batch_size'] default_feature_index = data_dict['default_index']['feature_dim'] default_density_index = data_dict['default_index']['density'] num_repeat = data_dict['num_repeat'] for output_dim in output_dim_list: if lhs_trans: output_row_dim = batch_size_list[default_batch_size_index] else: output_row_dim = feature_dim_list[default_feature_index] bench_dot((batch_size_list[default_batch_size_index], feature_dim_list[default_feature_index]), (output_row_dim, output_dim), lhs_stype, rhs_stype, density_list[default_density_index], rhs_density, lhs_trans, ctx, num_repeat=num_repeat, fw=fw, distribution=distribution) for feature_dim in feature_dim_list: if lhs_trans: output_row_dim = batch_size_list[default_batch_size_index] else: output_row_dim = feature_dim bench_dot((batch_size_list[default_batch_size_index], feature_dim), (output_row_dim, output_dim_list[default_output_index]), lhs_stype, rhs_stype, density_list[default_density_index], rhs_density, lhs_trans, ctx, num_repeat=num_repeat, fw=fw, distribution=distribution) for batch_size in batch_size_list: if lhs_trans: output_row_dim = batch_size else: output_row_dim = feature_dim_list[default_feature_index] bench_dot((batch_size, feature_dim_list[default_feature_index]), (output_row_dim, output_dim_list[default_output_index]), lhs_stype, rhs_stype, density_list[default_density_index], rhs_density, lhs_trans, ctx, num_repeat=num_repeat, fw=fw, distribution=distribution) for density in density_list: if lhs_trans: output_row_dim = batch_size_list[default_batch_size_index] else: output_row_dim = feature_dim_list[default_feature_index] bench_dot((batch_size_list[default_batch_size_index], feature_dim_list[default_feature_index]), (output_row_dim, output_dim_list[default_output_index]), lhs_stype, rhs_stype, density, density, lhs_trans, ctx, num_repeat=num_repeat, fw=fw, distribution=distribution) check_call(_LIB.MXSetNumOMPThreads(ctypes.c_int(ARGS.num_omp_threads))) context = mx.gpu() if ARGS.gpu else mx.cpu() # TODO(anirudh): make the data dicts to config which can be passed at runtime distributions = ["uniform", "powerlaw"] for distribution in distributions: run_benchmark(context, lhs="csr", rhs="default", lhs_trans=False, fw="mxnet", rhs_density=1, distribution=distribution) run_benchmark(context, lhs="csr", rhs="default", lhs_trans=True, fw="mxnet", rhs_density=1, distribution=distribution) run_benchmark(context, lhs="csr", rhs="rsp", lhs_trans=False, fw="mxnet", rhs_density=0.05, distribution=distribution) run_benchmark(context, lhs="default", rhs="csr", lhs_trans=False, fw="mxnet", rhs_density=0.001, distribution=distribution) if not ARGS.gpu: run_benchmark(context, lhs="csr", rhs="default", lhs_trans=False, fw="scipy", rhs_density=1, distribution=distribution) run_benchmark(context, lhs="csr", rhs="default", lhs_trans=True, fw="scipy", rhs_density=1, distribution=distribution) if __name__ == "__main__": begin_time = time.time() test_dot_real(KDDA) test_dot_real(AVAZU) test_dot_real(CRITEO) test_dot_synthetic(SYNTHETIC1) test_dot_synthetic(SYNTHETIC2) total_time = time.time() - begin_time print(f"total time is {total_time}")