From b713ff354b2e47c38c46f11b2ba95f1ec3068d74 Mon Sep 17 00:00:00 2001 From: ShawnXuan Date: Thu, 3 Jun 2021 19:30:14 +0800 Subject: [PATCH 1/4] multi dataloader process --- .../WideDeepLearning/wdl_train_eval.py | 57 +++++++++++-------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py index 95e8091..e5118c1 100644 --- a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py +++ b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py @@ -26,6 +26,12 @@ def str_list(x): return x.split(',') parser = argparse.ArgumentParser() parser.add_argument('--dataset_format', type=str, default='ofrecord', help='ofrecord or onerec') +parser.add_argument( + "--use_single_dataloader_process", + action="store_true", + help="use single dataloader processes per node or not." +) +parser.add_argument('--num_dataloader_process_per_gpu', type=int, default=1) parser.add_argument('--train_data_dir', type=str, default='') parser.add_argument('--train_data_part_num', type=int, default=1) parser.add_argument('--train_part_name_suffix_length', type=int, default=-1) @@ -60,16 +66,23 @@ def str_list(x): DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)] def _data_loader(data_dir, data_part_num, batch_size, part_name_suffix_length=-1, shuffle=True): - if FLAGS.dataset_format == 'ofrecord': - return _data_loader_ofrecord(data_dir, data_part_num, batch_size, part_name_suffix_length, - shuffle) - elif FLAGS.dataset_format == 'onerec': - return _data_loader_onerec(data_dir, batch_size, shuffle) - elif FLAGS.dataset_format == 'synthetic': - return _data_loader_synthetic(batch_size) + assert FLAGS.num_dataloader_process_per_gpu >= 1 + if FLAGS.use_single_dataloader_process: + devices = ['{}:0'.format(i) for i in range(FLAGS.num_nodes)] else: - assert 0, "Please specify dataset_type as `ofrecord`, `onerec` or `synthetic`." - + num_dataloader_process = FLAGS.num_dataloader_process_per_gpu * FLAGS.gpu_num_per_node + devices = ['{}:0-{}'.format(i, num_dataloader_process - 1) for i in range(FLAGS.num_nodes)] + with flow.scope.placement("cpu", devices): + if FLAGS.dataset_format == 'ofrecord': + data = _data_loader_ofrecord(data_dir, data_part_num, batch_size, + part_name_suffix_length, shuffle) + elif FLAGS.dataset_format == 'onerec': + data = _data_loader_onerec(data_dir, batch_size, shuffle) + elif FLAGS.dataset_format == 'synthetic': + data = _data_loader_synthetic(batch_size) + else: + assert 0, "Please specify dataset_type as `ofrecord`, `onerec` or `synthetic`." + return flow.identity_n(data) def _data_loader_ofrecord(data_dir, data_part_num, batch_size, part_name_suffix_length=-1, @@ -88,22 +101,20 @@ def _blob_decoder(bn, shape, dtype=flow.int32): dense_fields = _blob_decoder("dense_fields", (FLAGS.num_dense_fields,), flow.float) wide_sparse_fields = _blob_decoder("wide_sparse_fields", (FLAGS.num_wide_sparse_fields,)) deep_sparse_fields = _blob_decoder("deep_sparse_fields", (FLAGS.num_deep_sparse_fields,)) - return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + return [labels, dense_fields, wide_sparse_fields, deep_sparse_fields] def _data_loader_synthetic(batch_size): - devices = ['{}:0-{}'.format(i, FLAGS.gpu_num_per_node - 1) for i in range(FLAGS.num_nodes)] - with flow.scope.placement("cpu", devices): - def _blob_random(shape, dtype=flow.int32, initializer=flow.zeros_initializer(flow.int32)): - return flow.data.decode_random(shape=shape, dtype=dtype, batch_size=batch_size, - initializer=initializer) - labels = _blob_random((1,), initializer=flow.random_uniform_initializer(dtype=flow.int32)) - dense_fields = _blob_random((FLAGS.num_dense_fields,), dtype=flow.float, - initializer=flow.random_uniform_initializer()) - wide_sparse_fields = _blob_random((FLAGS.num_wide_sparse_fields,)) - deep_sparse_fields = _blob_random((FLAGS.num_deep_sparse_fields,)) - print('use synthetic data') - return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + def _blob_random(shape, dtype=flow.int32, initializer=flow.zeros_initializer(flow.int32)): + return flow.data.decode_random(shape=shape, dtype=dtype, batch_size=batch_size, + initializer=initializer) + labels = _blob_random((1,), initializer=flow.random_uniform_initializer(dtype=flow.int32)) + dense_fields = _blob_random((FLAGS.num_dense_fields,), dtype=flow.float, + initializer=flow.random_uniform_initializer()) + wide_sparse_fields = _blob_random((FLAGS.num_wide_sparse_fields,)) + deep_sparse_fields = _blob_random((FLAGS.num_deep_sparse_fields,)) + print('use synthetic data') + return [labels, dense_fields, wide_sparse_fields, deep_sparse_fields] def _data_loader_onerec(data_dir, batch_size, shuffle): @@ -122,7 +133,7 @@ def _blob_decoder(bn, shape, dtype=flow.int32): dense_fields = _blob_decoder("dense_fields", (FLAGS.num_dense_fields,), flow.float) wide_sparse_fields = _blob_decoder("wide_sparse_fields", (FLAGS.num_wide_sparse_fields,)) deep_sparse_fields = _blob_decoder("deep_sparse_fields", (FLAGS.num_deep_sparse_fields,)) - return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + return [labels, dense_fields, wide_sparse_fields, deep_sparse_fields] def _model(dense_fields, wide_sparse_fields, deep_sparse_fields): From 178de6a6bb21ba51924324c5fd4faf111a99bdf0 Mon Sep 17 00:00:00 2001 From: ShawnXuan Date: Fri, 4 Jun 2021 11:07:54 +0800 Subject: [PATCH 2/4] enable legacy model io and nccl compute stream --- ClickThroughRate/WideDeepLearning/wdl_train_eval.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py index e5118c1..2dde211 100644 --- a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py +++ b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py @@ -271,7 +271,9 @@ def main(): flow.config.gpu_device_num(FLAGS.gpu_num_per_node) flow.config.enable_model_io_v2(True) flow.config.enable_debug_mode(True) - flow.config.collective_boxing.nccl_enable_all_to_all(True) + flow.config.enable_legacy_model_io(True) + flow.config.nccl_use_compute_stream(True) + # flow.config.collective_boxing.nccl_enable_all_to_all(True) #flow.config.enable_numa_aware_cuda_malloc_host(True) #flow.config.collective_boxing.enable_fusion(False) check_point = flow.train.CheckPoint() From e659921629cd1695439e3e105b72af72fe2ed508 Mon Sep 17 00:00:00 2001 From: ShawnXuan Date: Fri, 4 Jun 2021 19:37:16 +0800 Subject: [PATCH 3/4] shuffle_mode=batch --- ClickThroughRate/WideDeepLearning/wdl_train_eval.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py index 2dde211..ea3c87a 100644 --- a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py +++ b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py @@ -123,6 +123,7 @@ def _data_loader_onerec(data_dir, batch_size, shuffle): files = glob.glob(os.path.join(data_dir, '*.onerec')) readdata = flow.data.onerec_reader(files=files, batch_size=batch_size, random_shuffle=shuffle, verify_example=False, + shuffle_mode="batch", shuffle_buffer_size=64, shuffle_after_epoch=shuffle) From 30f9480852c28eb64ec16631840d68d1ddc61afb Mon Sep 17 00:00:00 2001 From: ShawnXuan Date: Fri, 4 Jun 2021 20:55:00 +0800 Subject: [PATCH 4/4] process -> thread --- .../WideDeepLearning/wdl_train_eval.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py index ea3c87a..726fb69 100644 --- a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py +++ b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py @@ -27,11 +27,11 @@ def str_list(x): parser = argparse.ArgumentParser() parser.add_argument('--dataset_format', type=str, default='ofrecord', help='ofrecord or onerec') parser.add_argument( - "--use_single_dataloader_process", + "--use_single_dataloader_thread", action="store_true", - help="use single dataloader processes per node or not." + help="use single dataloader threads per node or not." ) -parser.add_argument('--num_dataloader_process_per_gpu', type=int, default=1) +parser.add_argument('--num_dataloader_thread_per_gpu', type=int, default=2) parser.add_argument('--train_data_dir', type=str, default='') parser.add_argument('--train_data_part_num', type=int, default=1) parser.add_argument('--train_part_name_suffix_length', type=int, default=-1) @@ -66,12 +66,12 @@ def str_list(x): DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)] def _data_loader(data_dir, data_part_num, batch_size, part_name_suffix_length=-1, shuffle=True): - assert FLAGS.num_dataloader_process_per_gpu >= 1 - if FLAGS.use_single_dataloader_process: + assert FLAGS.num_dataloader_thread_per_gpu >= 1 + if FLAGS.use_single_dataloader_thread: devices = ['{}:0'.format(i) for i in range(FLAGS.num_nodes)] else: - num_dataloader_process = FLAGS.num_dataloader_process_per_gpu * FLAGS.gpu_num_per_node - devices = ['{}:0-{}'.format(i, num_dataloader_process - 1) for i in range(FLAGS.num_nodes)] + num_dataloader_thread = FLAGS.num_dataloader_thread_per_gpu * FLAGS.gpu_num_per_node + devices = ['{}:0-{}'.format(i, num_dataloader_thread - 1) for i in range(FLAGS.num_nodes)] with flow.scope.placement("cpu", devices): if FLAGS.dataset_format == 'ofrecord': data = _data_loader_ofrecord(data_dir, data_part_num, batch_size,