Tomasz Grel 2 лет назад
Родитель
Сommit
da7e1a701b

+ 13 - 5
TensorFlow2/Recommendation/DLRM_and_DCNv2/Dockerfile

@@ -14,10 +14,11 @@
 #
 # author: Tomasz Grel ([email protected])
 
-ARG FROM_IMAGE_NAME=nvcr.io/nvidia/tensorflow:23.02-tf2-py3
-FROM ${FROM_IMAGE_NAME}
+ARG FROM_IMAGE_NAME=nvcr.io/nvidia/tensorflow:23.06-tf2-py3
+FROM nvcr.io/nvidia/tritonserver:23.06-py3-sdk as clientsdk
+FROM ${FROM_IMAGE_NAME} as base
 
-ARG DISTRIBUTED_EMBEDDINGS_COMMIT=c635ed84
+ARG DISTRIBUTED_EMBEDDINGS_COMMIT=45cffaa8
 
 WORKDIR /dlrm
 
@@ -38,8 +39,7 @@ RUN rm -rf distributed-embeddings &&\
     pip install artifacts/*.whl &&\
     cd ..
 
-ADD . .
-
+ADD tensorflow-dot-based-interact tensorflow-dot-based-interact
 RUN mkdir -p /usr/local/lib/python3.8/dist-packages/tensorflow/include/third_party/gpus/cuda/ &&\
     cd tensorflow-dot-based-interact &&\
     make clean &&\
@@ -49,5 +49,13 @@ RUN mkdir -p /usr/local/lib/python3.8/dist-packages/tensorflow/include/third_par
     pip install ./artifacts/tensorflow_dot_based_interact-*.whl &&\
     cd ..
 
+COPY --from=clientsdk /workspace/install/python/tritonclient-2.35.0-py3-*.whl /dlrm/
+RUN if [[ "$(uname -m)" == "x86_64" ]]; \
+    then echo x86; pip install tritonclient-2.35.0-py3-none-manylinux1_x86_64.whl[all]; \
+    else echo arm; pip install tritonclient-2.35.0-py3-none-manylinux2014_aarch64.whl[all]; \
+    fi
+
+ADD . .
+
 ENV HOROVOD_CYCLE_TIME=0.2
 ENV HOROVOD_ENABLE_ASYNC_COMPLETION=1

+ 6 - 3
TensorFlow2/Recommendation/DLRM_and_DCNv2/dataloading/dataloader.py

@@ -80,7 +80,8 @@ def _create_pipelines_tf_raw(**kwargs):
                                        local_categorical_feature_names=local_categorical_names,
                                        rank=kwargs['rank'],
                                        world_size=kwargs['world_size'],
-                                       concat_features=kwargs['concat_features'])
+                                       concat_features=kwargs['concat_features'],
+                                       data_parallel_categoricals=kwargs['data_parallel_input'])
 
     test_dataset = TfRawBinaryDataset(feature_spec=feature_spec,
                                       instance=TEST_MAPPING,
@@ -89,7 +90,8 @@ def _create_pipelines_tf_raw(**kwargs):
                                       local_categorical_feature_names=local_categorical_names,
                                       rank=kwargs['rank'],
                                       world_size=kwargs['world_size'],
-                                      concat_features=kwargs['concat_features'])
+                                      concat_features=kwargs['concat_features'],
+                                      data_parallel_categoricals=kwargs['data_parallel_input'])
     return train_dataset, test_dataset
 
 
@@ -113,7 +115,8 @@ def _create_pipelines_split_tfrecords(**kwargs):
 
 
 def create_input_pipelines(dataset_type, dataset_path, train_batch_size, test_batch_size,
-                           table_ids, feature_spec, rank=0, world_size=1, concat_features=False):
+                           table_ids, feature_spec, rank=0, world_size=1, concat_features=False,
+                           data_parallel_input=False):
 
     # pass along all arguments except dataset type
     kwargs = locals()

+ 5 - 1
TensorFlow2/Recommendation/DLRM_and_DCNv2/dataloading/raw_binary_dataset.py

@@ -67,12 +67,14 @@ class TfRawBinaryDataset:
             numerical_features_enabled: bool = False,
             rank: int = 0,
             world_size: int = 1,
-            concat_features: bool = False
+            concat_features: bool = False,
+            data_parallel_categoricals = False,
     ):
 
         self._concat_features = concat_features
         self._feature_spec = feature_spec
         self._batch_size = batch_size
+        self._data_parallel_categoricals = data_parallel_categoricals
 
         local_batch_size = int(batch_size / world_size)
         batch_sizes_per_gpu = [local_batch_size] * world_size
@@ -180,6 +182,8 @@ class TfRawBinaryDataset:
                 feature = tf.cast(feature, dtype=tf.int32)
                 feature = tf.expand_dims(feature, axis=1)
                 feature = tf.reshape(feature, [self._batch_size, 1])
+                if self._data_parallel_categoricals:
+                    feature = feature[self.dp_begin_idx:self.dp_end_idx]
                 cat_data.append(feature)
             if self._concat_features:
                 cat_data = tf.concat(cat_data, axis=1)

+ 1 - 1
TensorFlow2/Recommendation/DLRM_and_DCNv2/deployment/tf/Dockerfile

@@ -15,6 +15,6 @@
 # author: Tomasz Grel ([email protected])
 
 
-FROM nvcr.io/nvidia/tritonserver:23.02-py3 as tritonserver
+FROM nvcr.io/nvidia/tritonserver:23.06-py3 as tritonserver
 
 WORKDIR /opt/tritonserver

+ 15 - 3
TensorFlow2/Recommendation/DLRM_and_DCNv2/main.py

@@ -50,7 +50,13 @@ def define_common_flags():
     flags.DEFINE_string("dist_strategy", default='memory_balanced',
                         help="Strategy for the Distributed Embeddings to use. Supported options are"
                         "'memory_balanced', 'basic' and 'memory_optimized'")
-    flags.DEFINE_integer("column_slice_threshold", default=10*1000*1000*1000,
+    flags.DEFINE_integer("column_slice_threshold", default=5*1000*1000*1000,
+                         help='Number of elements above which a distributed embedding will be sliced across'
+                         'multiple devices')
+    flags.DEFINE_integer("row_slice_threshold", default=10*1000*1000*1000,
+                         help='Number of elements above which a distributed embedding will be sliced across'
+                         'multiple devices')
+    flags.DEFINE_integer("data_parallel_threshold", default=None,
                          help='Number of elements above which a distributed embedding will be sliced across'
                          'multiple devices')
 
@@ -97,6 +103,8 @@ def define_common_flags():
     flags.DEFINE_enum("dataset_type", default="tf_raw",
                       enum_values=['tf_raw', 'synthetic', 'split_tfrecords'],
                       help='The type of the dataset to use')
+    flags.DEFINE_boolean("data_parallel_input", default=False, help="Use a data-parallel dataloader,"
+                         " i.e., load a local batch of of data for all input features")
 
     # Synthetic dataset settings
     flags.DEFINE_boolean("synthetic_dataset_use_feature_spec", default=False,
@@ -296,14 +304,18 @@ def main():
                       categorical_cardinalities=dataset_metadata.categorical_cardinalities,
                       transpose=False)
 
+    table_ids = model.sparse_model.get_local_table_ids(hvd.rank())
+    print(f'local feature ids={table_ids}')
+
     train_pipeline, validation_pipeline = create_input_pipelines(dataset_type=FLAGS.dataset_type,
                                                                  dataset_path=FLAGS.dataset_path,
                                                                  train_batch_size=FLAGS.batch_size,
                                                                  test_batch_size=FLAGS.valid_batch_size,
-                                                                 table_ids=model.sparse_model.get_local_table_ids(hvd.rank()),
+                                                                 table_ids=table_ids,
                                                                  feature_spec=FLAGS.feature_spec,
                                                                  rank=hvd.rank(), world_size=hvd.size(),
-                                                                 concat_features=FLAGS.concat_embedding)
+                                                                 concat_features=FLAGS.concat_embedding,
+                                                                 data_parallel_input=FLAGS.data_parallel_input)
 
     mlp_optimizer, embedding_optimizer = create_optimizers(FLAGS)
 

+ 1 - 1
TensorFlow2/Recommendation/DLRM_and_DCNv2/nn/embedding.py

@@ -265,7 +265,7 @@ class DualEmbeddingGroup(tf.keras.layers.Layer):
         reversed_sizes = self.table_sizes[idx_mapping]
 
         cumulative_size = np.cumsum(reversed_sizes)
-        cumulative_indicators = (cumulative_size > self.memory_threshold * 2 ** 30).tolist()
+        cumulative_indicators = (cumulative_size > self.memory_threshold * (10 ** 9)).tolist()
         if True in cumulative_indicators:
             index = cumulative_indicators.index(True)
         else:

+ 28 - 11
TensorFlow2/Recommendation/DLRM_and_DCNv2/nn/sparse_model.py

@@ -20,7 +20,6 @@ import numpy as np
 import json
 
 from distributed_embeddings.python.layers import dist_model_parallel as dmp
-from distributed_embeddings.python.layers import embedding
 
 from utils.checkpointing import get_variable_path
 
@@ -29,7 +28,19 @@ from .embedding import EmbeddingInitializer, DualEmbeddingGroup
 
 sparse_model_parameters = ['use_mde_embeddings', 'embedding_dim', 'column_slice_threshold',
                            'embedding_zeros_initializer', 'embedding_trainable', 'categorical_cardinalities',
-                           'concat_embedding', 'cpu_offloading_threshold_gb']
+                           'concat_embedding', 'cpu_offloading_threshold_gb',
+                           'data_parallel_input', 'row_slice_threshold', 'data_parallel_threshold']
+
+def _gigabytes_to_elements(gb, dtype=tf.float32):
+    if gb is None:
+        return None
+
+    if dtype == tf.float32:
+        bytes_per_element = 4
+    else:
+        raise ValueError(f'Unsupported dtype: {dtype}')
+
+    return gb * 10**9 / bytes_per_element
 
 class SparseModel(tf.keras.Model):
     def __init__(self, **kwargs):
@@ -61,21 +72,21 @@ class SparseModel(tf.keras.Model):
         for table_size, dim in zip(self.categorical_cardinalities, self.embedding_dim):
             if hvd.rank() == 0:
                 print(f'Creating embedding with size: {table_size} {dim}')
-            if self.use_mde_embeddings:
-                e = embedding.Embedding(input_dim=table_size, output_dim=dim,
-                                        combiner='sum', embeddings_initializer=initializer_cls())
-            else:
-                e = tf.keras.layers.Embedding(input_dim=table_size, output_dim=dim,
-                                              embeddings_initializer=initializer_cls())
+            e = tf.keras.layers.Embedding(input_dim=table_size, output_dim=dim,
+                                          embeddings_initializer=initializer_cls())
             self.embedding_layers.append(e)
 
+        gpu_size = _gigabytes_to_elements(self.cpu_offloading_threshold_gb)
         self.embedding = dmp.DistributedEmbedding(self.embedding_layers,
                                                   strategy='memory_balanced',
-                                                  dp_input=False,
-                                                  column_slice_threshold=self.column_slice_threshold)
+                                                  dp_input=self.data_parallel_input,
+                                                  column_slice_threshold=self.column_slice_threshold,
+                                                  row_slice_threshold=self.row_slice_threshold,
+                                                  data_parallel_threshold=self.data_parallel_threshold,
+                                                  gpu_embedding_size=gpu_size)
 
     def get_local_table_ids(self, rank):
-        if self.use_concat_embedding:
+        if self.use_concat_embedding or self.data_parallel_input:
             return list(range(self.num_all_categorical_features))
         else:
             return self.embedding.strategy.input_ids_list[rank]
@@ -127,4 +138,10 @@ class SparseModel(tf.keras.Model):
     def from_config(path):
         with open(path) as f:
             config = json.load(fp=f)
+        if 'data_parallel_input' not in config:
+            config['data_parallel_input'] = False
+        if 'row_slice_threshold' not in config:
+            config['row_slice_threshold'] = None
+        if 'data_parallel_threshold' not in config:
+            config['data_parallel_threshold'] = None
         return SparseModel(**config)

+ 0 - 1
TensorFlow2/Recommendation/DLRM_and_DCNv2/requirements.txt

@@ -7,7 +7,6 @@ tqdm
 pyyaml
 onnxruntime
 git+https://github.com/onnx/tensorflow-onnx
-tritonclient[all]==2.31
 numpy<1.24
 tabulate>=0.8.7
 natsort>=7.0.0