Ver Fonte

[WideAndDeep] Improved Spark preprocessing scripts performance

Przemek Strzelczyk há 5 anos atrás
pai
commit
15ba45666d
21 ficheiros alterados com 748 adições e 639 exclusões
  1. 2 1
      README.md
  2. 1 1
      TensorFlow/Recommendation/WideAndDeep/Dockerfile
  3. 47 7
      TensorFlow/Recommendation/WideAndDeep/README.md
  4. 0 150
      TensorFlow/Recommendation/WideAndDeep/dataflow_preprocess.py
  5. 0 180
      TensorFlow/Recommendation/WideAndDeep/outbrain_transform.py
  6. 0 88
      TensorFlow/Recommendation/WideAndDeep/preproc/csv_data_imputation.py
  7. BIN
      TensorFlow/Recommendation/WideAndDeep/preproc/data/tensorflow-hadoop-1.5.0.jar
  8. 4 5
      TensorFlow/Recommendation/WideAndDeep/preproc/preproc1.py
  9. 1 4
      TensorFlow/Recommendation/WideAndDeep/preproc/preproc2.py
  10. 2 71
      TensorFlow/Recommendation/WideAndDeep/preproc/preproc3.py
  11. 568 0
      TensorFlow/Recommendation/WideAndDeep/preproc/preproc4.py
  12. 0 24
      TensorFlow/Recommendation/WideAndDeep/preproc/sort_csv.sh
  13. 1 1
      TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp16_1gpu.sh
  14. 1 1
      TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp16_4gpu.sh
  15. 1 1
      TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp16_8gpu.sh
  16. 1 1
      TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp32_1gpu.sh
  17. 1 1
      TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp32_4gpu.sh
  18. 1 1
      TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp32_8gpu.sh
  19. 12 31
      TensorFlow/Recommendation/WideAndDeep/scripts/preproc.sh
  20. 100 0
      TensorFlow/Recommendation/WideAndDeep/trainer/dataset_utils.py
  21. 5 71
      TensorFlow/Recommendation/WideAndDeep/trainer/task.py

+ 2 - 1
README.md

@@ -71,7 +71,7 @@ The examples are organized first by framework, such as TensorFlow, PyTorch, etc.
 | [BERT](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/LanguageModeling/BERT) |PyTorch  | N/A  | Yes  | Yes  | Yes  | -  |   -  | [Yes](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/LanguageModeling/BERT/triton)  | -  |
 | [Transformer-XL](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/LanguageModeling/Transformer-XL) |PyTorch  | N/A  | Yes  | Yes  | Yes  | -  |   -  | -  | -  |
 | [Neural Collaborative Filtering](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/Recommendation/NCF) |PyTorch  | N/A  | Yes  | Yes  | -  |  -  |-  | -  | -  |
-| [DLRM](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/Recommendation/NCF) |PyTorch  | N/A  | Yes  | -  | -  |  -  |-  | -  | -  |
+| [DLRM](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/Recommendation/NCF) |PyTorch  | N/A  | Yes  | Yes  | -  |  -  |-  | -  | -  |
 | [Mask R-CNN](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/Segmentation/MaskRCNN) |PyTorch  | N/A  | Yes  | Yes  | -  | -  |   -  | -  | -  |
 | [Jasper](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/SpeechRecognition/Jasper) |PyTorch  | N/A  | Yes  | Yes  | -  | Yes  |   Yes  | [Yes](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/SpeechRecognition/Jasper/trtis)  | -  |
 | [Tacotron 2 And WaveGlow v1.10](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/SpeechSynthesis/Tacotron2) | PyTorch  | N/A  | Yes  | Yes  | -  | Yes  |   Yes  | [Yes](https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/SpeechSynthesis/Tacotron2/notebooks/trtis)  | -  |
@@ -91,6 +91,7 @@ The examples are organized first by framework, such as TensorFlow, PyTorch, etc.
 | [Mask R-CNN](https://github.com/NVIDIA/DeepLearningExamples/tree/master/TensorFlow2/Segmentation/MaskRCNN) |TensorFlow  | N/A  | Yes  | Yes  | -  | -  |   -  | -  | -  |
 | [GNMT v2](https://github.com/NVIDIA/DeepLearningExamples/tree/master/TensorFlow/Translation/GNMT) | TensorFlow  | N/A  | Yes  | Yes  | -  | -  |   -  | -  | -  |
 | [Faster Transformer](https://github.com/NVIDIA/DeepLearningExamples/tree/master/FasterTransformer) | Tensorflow  | N/A  | -  | -  | -  | Yes  |   -  | -  | -  |
+| [Transformer-XL](https://github.com/NVIDIA/DeepLearningExamples/tree/master/TensorFlow/LanguageModeling/Transformer-XL) |TensorFlow  | N/A  | Yes  | Yes  | -  | -  |   -  | -  | -  |
 | [U-Net Medical](https://github.com/NVIDIA/DeepLearningExamples/tree/master/TensorFlow2/Segmentation/UNet_Medical) | TensorFlow-2  | N/A  | Yes  | Yes  | -  |  Yes  |-  |   -  | Yes  |
 | [Mask R-CNN](https://github.com/NVIDIA/DeepLearningExamples/tree/master/TensorFlow2/Segmentation/MaskRCNN) |TensorFlow-2  | N/A  | Yes  | Yes  | -  |  -  |-  |   -  | -  |
 | [ResNet50 v1.5](https://github.com/NVIDIA/DeepLearningExamples/tree/master/MxNet/Classification/RN50v1.5) | MXNet  | Yes  | Yes  | Yes  | -  | -  |   -  | -  | -  |

+ 1 - 1
TensorFlow/Recommendation/WideAndDeep/Dockerfile

@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-ARG FROM_IMAGE_NAME=nvcr.io/nvidia/tensorflow:20.02-tf1-py3
+ARG FROM_IMAGE_NAME=nvcr.io/nvidia/tensorflow:20.03-tf1-py3
 
 FROM ${FROM_IMAGE_NAME}
 

+ 47 - 7
TensorFlow/Recommendation/WideAndDeep/README.md

@@ -24,6 +24,7 @@ This repository provides a script and recipe to train the Wide and Deep Recommen
     * [Command-line options](#command-line-options)
     * [Getting the data](#getting-the-data)
         * [Dataset guidelines](#dataset-guidelines)
+        * [Spark preprocessing](#spark-preprocessing)
     * [Training process](#training-process)
 - [Performance](#performance)
     * [Benchmarking](#benchmarking)
@@ -195,7 +196,7 @@ docker build . -t wide_deep
 4.  Start an interactive session in the NGC container to run preprocessing/training/inference.
 
 ```bash
-docker run --runtime=nvidia --rm -ti -v ${HOST_OUTBRAIN_PATH}:/outbrain wide_deep /bin/bash
+docker run --runtime=nvidia --privileged --rm -ti -v ${HOST_OUTBRAIN_PATH}:/outbrain wide_deep /bin/bash
 ```
 5. Start preprocessing.
 
@@ -294,17 +295,54 @@ The Outbrain dataset can be downloaded from [Kaggle](https://www.kaggle.com/c/ou
 
 #### Dataset guidelines
 
-The dataset contains a sample of users’ page views and clicks, as observed on multiple publisher sites. Viewed pages and clicked recommendations have further semantic attributes of the documents.
-
-The dataset contains sets of content recommendations served to a specific user in a specific context. Each context (i.e. a set of recommendations) is given a display_id. In each such set, the user has clicked on at least one recommendation. The page view logs originally has more than 2 billion rows (around 100 GB uncompressed). 
-
-The data within the preprocessing stage are transferred into tabular data of 54 features, for training having 55 million rows.
+The dataset contains a sample of users’ page views and clicks, as observed on multiple publisher sites. Viewed pages and clicked recommendations have additional semantic attributes of the documents.
+The dataset contains sets of content recommendations served to a specific user in a specific context. Each context (i.e. a set of recommended ads) is given a `display_id`. In each such recommendation set, the user has clicked on exactly one of the ads.
+
+The original data is stored in several separate files:
+- `page_views.csv` - log of users visiting documents (2B rows, ~100GB uncompressed)
+- `clicks_train.csv` - data showing which ad was clicked in each recommendation set (87M rows)
+- `clicks_test.csv` - used only for the submission in the original Kaggle contest
+- `events.csv` - metadata about the context of each recommendation set (23M rows)
+- `promoted_content.csv` - metadata about the ads
+- `document_meta.csv`, `document_topics.csv`, `document_entities.csv`, `document_categories.csv` - metadata about the documents
+ 
+During the preprocessing stage the data is transformed into 55M rows tabular data of 54 features and eventually saved in pre-batched TFRecord format.
+
+
+#### Spark preprocessing
+
+The original dataset is preprocessed using Spark scripts from the `preproc` directory. The preprocessing consists of the following operations:
+- separating out the validation set for cross-validation
+- filling missing data with the most frequent value
+- generating the user profiles from the page views data
+- joining the tables for the ad clicks data
+- computing click-through rates (CTR) for ads grouped by different contexts
+- computing cosine similarity between the features of the clicked ads and the viewed ads
+- math transformations of the numeric features (taking logarithm, scaling, binning)
+- storing the resulting set of features in TFRecord format
+
+The `preproc1-4.py` preprocessing scripts use PySpark. 
+In the Docker image, we have installed Spark 2.3.1 as a standalone cluster of Spark. 
+The `preproc1.py` script splits the data into a training set and a validation set. 
+The `preproc2.py` script generates the user profiles from the page views data. 
+The `preproc3.py` computes the click-through rates (CTR) and cosine similarities between the features. 
+The `preproc4.py` script performs the math transformations and generates the final TFRecord files. 
+The data in the output files is pre-batched (with the default batch size of 4096) to avoid the overhead 
+of the TFRecord format, which otherwise is not suitable for the tabular data - 
+it stores a separate dictionary with each feature name in plain text for every data entry.
+
+The preprocessing includes some very resource-exhausting operations, like joining 2B+ rows tables. 
+Such operations may not fit into the RAM memory, therefore we decided to use Spark which is a suitable tool 
+for handling tabular operations on large data. 
+Note that the Spark job requires about 1 TB disk space and 500 GB RAM to perform the preprocessing.
+For more information about Spark, please refer to the
+[Spark documentation](https://spark.apache.org/docs/2.3.1/).
 
 
 ### Training process
 
 The training can be started by running the `trainer/task.py` script. By default the script is in train mode. Other training related 
-configs are also present in the `trainer/task.py` and can be seen using the command `python -m trainer.task --help`. Training happens for `--num_epochs` epochs with custom estimator for the model. The model has a wide linear part and a deep feed forward network, and the networks are built according to the default configuration.
+configs are also present in the `trainer/task.py` and can be seen using the command `python -m trainer.task --help`. Training happens for `--num_epochs` epochs with a custom estimator for the model. The model has a wide linear part and a deep feed forward network, and the networks are built according to the default configuration.
 
 Two separate optimizers are used to optimize the wide and the deep part of the network:
     
@@ -399,6 +437,8 @@ This section needs to include the date of the release and the most important cha
 March 2020
 - Initial release
 
+May 2020
+- Improved Spark preprocessing scripts performance
 ### Known issues
 
 - Limited tf.feature_column support

+ 0 - 150
TensorFlow/Recommendation/WideAndDeep/dataflow_preprocess.py

@@ -1,150 +0,0 @@
-# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import argparse
-import datetime
-import sys
-
-import outbrain_transform
-
-import tensorflow as tf
-import glob
-
-import pandas as pd
-
-import trainer.features
-
-
-def parse_arguments(argv):
-  """Parse command line arguments.
-
-  Args:
-    argv: list of command line arguments including program name.
-  Returns:
-    The parsed arguments as returned by argparse.ArgumentParser.
-  """
-  parser = argparse.ArgumentParser(
-      description='Runs Transformation on the Outbrain Click Prediction model data.')
-
-  parser.add_argument(
-      '--training_data',
-      default='',
-      help='Data to analyze and encode as training features.')
-  parser.add_argument(
-      '--eval_data',
-      default='',
-      help='Data to encode as evaluation features.')
-  parser.add_argument(
-      '--output_dir',
-      default=None,
-      required=True,
-      help=('Google Cloud Storage or Local directory in which '
-            'to place outputs.'))
-  parser.add_argument('--batch_size', default=None, type=int, help='Size of batches to create.')
-  parser.add_argument('--submission', default=False, action='store_true', help='Use real test set for submission')
-
-  args, _ = parser.parse_known_args(args=argv[1:])
-
-  return args
-
-# a version of this method that prefers pandas methods
-def local_transform_chunk(nr, csv, output_prefix, min_logs, max_logs, batch_size=None, remainder=None):
-  # put any remainder at the front of the line, with the new rows after
-  if remainder is not None:
-    csv = remainder.append(csv)
-  
-  # for each batch, slice into the datafrom to retrieve the corresponding data
-  print(str(datetime.datetime.now()) + '\tWriting rows...')
-  num_rows = len(csv.index)
-  with tf.python_io.TFRecordWriter(output_prefix + str(nr).zfill(3) + '.tfrecord') as writer:
-    for start_ind in range(0,num_rows,batch_size if batch_size is not None else 1): # for each batch
-      if start_ind + batch_size - 1 > num_rows: # if we'd run out of rows
-        return csv.iloc[start_ind:] # return remainder for use with the next file
-      # otherwise write this batch to TFRecord
-      csv_slice = csv.iloc[start_ind:start_ind+(batch_size if batch_size is not None else 1)]
-      example = outbrain_transform.create_tf_example(csv_slice, min_logs, max_logs)
-      writer.write(example.SerializeToString())
-
-# calculate min and max stats for the given dataframes all in one go
-def compute_min_max_logs(dataframes):
-  print(str(datetime.datetime.now()) + '\tComputing min and max')
-  min_logs = {}
-  max_logs = {}
-  df = pd.concat(dataframes) # concatenate all dataframes, to process at once
-  for name in trainer.features.FLOAT_COLUMNS_LOG_BIN_TRANSFORM:
-    feature_series = df[name]
-    min_logs[name + '_log_01scaled'] = outbrain_transform.log2_1p(feature_series.min(axis=0)*1000)
-    max_logs[name + '_log_01scaled'] = outbrain_transform.log2_1p(feature_series.max(axis=0)*1000)
-  for name in trainer.features.INT_COLUMNS:
-    feature_series = df[name]
-    min_logs[name + '_log_01scaled'] = outbrain_transform.log2_1p(feature_series.min(axis=0))
-    max_logs[name + '_log_01scaled'] = outbrain_transform.log2_1p(feature_series.max(axis=0))
-  return min_logs, max_logs
-
-
-def main(argv=None):
-  args = parse_arguments(sys.argv if argv is None else argv)
-  
-  # Retrieve and sort training and eval data (to ensure consistent order)
-  # Order is important so that the right data will get sorted together for MAP
-  training_data = sorted(glob.glob(args.training_data))
-  eval_data = sorted(glob.glob(args.eval_data))
-  print('Training data:\n{}\nFound:\n{}'.format(args.training_data,training_data))
-  print('Evaluation data:\n{}\nFound:\n{}'.format(args.eval_data,eval_data))
-  
-  outbrain_transform.make_spec(args.output_dir + '/transformed_metadata', batch_size=args.batch_size)
-  
-  # read all dataframes
-  print('\n' + str(datetime.datetime.now()) + '\tReading input files')
-  eval_dataframes = [pd.read_csv(filename, header=None, names=outbrain_transform.CSV_ORDERED_COLUMNS) 
-                for filename in eval_data]
-  train_dataframes = [pd.read_csv(filename, header=None, names=outbrain_transform.CSV_ORDERED_COLUMNS) 
-                for filename in training_data]
-  
-  # calculate stats once over all records given
-  min_logs, max_logs = compute_min_max_logs(eval_dataframes + train_dataframes)
- 
-  if args.submission:
-    train_output_string = '/sub_train_'
-    eval_output_string = '/test_'
-  else:
-    train_output_string = '/train_'
-    eval_output_string = '/eval_'
- 
-  # process eval files
-  print('\n' + str(datetime.datetime.now()) + '\tWorking on evaluation data')
-  eval_remainder = None # remainder when a file's records don't divide evenly into batches
-  for i, df in enumerate(eval_dataframes):
-    print(eval_data[i])
-    eval_remainder = local_transform_chunk(i, df, args.output_dir + eval_output_string, min_logs, max_logs,
-                                           batch_size=args.batch_size, remainder=eval_remainder)
-  if eval_remainder is not None:
-    print('Dropping {} records (eval) on the floor'.format(len(eval_remainder)))
-  
-  # process train files
-  print('\n' + str(datetime.datetime.now()) + '\tWorking on training data')
-  train_remainder = None
-  for i, df in enumerate(train_dataframes):
-    print(training_data[i])
-    train_remainder = local_transform_chunk(i, df, args.output_dir + train_output_string, min_logs, max_logs,
-                                           batch_size=args.batch_size, remainder=train_remainder)
-  if train_remainder is not None:
-    print('Dropping {} records (train) on the floor'.format(len(train_remainder)))
-
-if __name__ == '__main__':
-  main()

+ 0 - 180
TensorFlow/Recommendation/WideAndDeep/outbrain_transform.py

@@ -1,180 +0,0 @@
-# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import tensorflow as tf
-from tensorflow_transform.tf_metadata import dataset_schema
-from tensorflow_transform.tf_metadata import dataset_metadata
-from tensorflow_transform.tf_metadata import metadata_io
-import numpy as np
-
-from trainer.features import LABEL_COLUMN, DISPLAY_ID_COLUMN, IS_LEAK_COLUMN, DISPLAY_ID_AND_IS_LEAK_ENCODED_COLUMN, CATEGORICAL_COLUMNS, DOC_CATEGORICAL_MULTIVALUED_COLUMNS, BOOL_COLUMNS, INT_COLUMNS, FLOAT_COLUMNS, FLOAT_COLUMNS_LOG_BIN_TRANSFORM, FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM
-
-RENAME_COLUMNS = False
-
-CSV_ORDERED_COLUMNS = ['label','display_id','ad_id','doc_id','doc_event_id','is_leak','event_weekend',
-              'user_has_already_viewed_doc','user_views','ad_views','doc_views',
-              'doc_event_days_since_published','doc_event_hour','doc_ad_days_since_published',              
-              'pop_ad_id','pop_ad_id_conf',
-              'pop_ad_id_conf_multipl','pop_document_id','pop_document_id_conf',
-              'pop_document_id_conf_multipl','pop_publisher_id','pop_publisher_id_conf',
-              'pop_publisher_id_conf_multipl','pop_advertiser_id','pop_advertiser_id_conf',
-              'pop_advertiser_id_conf_multipl','pop_campain_id','pop_campain_id_conf',
-              'pop_campain_id_conf_multipl','pop_doc_event_doc_ad','pop_doc_event_doc_ad_conf',
-              'pop_doc_event_doc_ad_conf_multipl','pop_source_id','pop_source_id_conf',
-              'pop_source_id_conf_multipl','pop_source_id_country','pop_source_id_country_conf',
-              'pop_source_id_country_conf_multipl','pop_entity_id','pop_entity_id_conf',
-              'pop_entity_id_conf_multipl','pop_entity_id_country','pop_entity_id_country_conf',
-              'pop_entity_id_country_conf_multipl','pop_topic_id','pop_topic_id_conf',
-              'pop_topic_id_conf_multipl','pop_topic_id_country','pop_topic_id_country_conf',
-              'pop_topic_id_country_conf_multipl','pop_category_id','pop_category_id_conf',
-              'pop_category_id_conf_multipl','pop_category_id_country','pop_category_id_country_conf',
-              'pop_category_id_country_conf_multipl','user_doc_ad_sim_categories',
-              'user_doc_ad_sim_categories_conf','user_doc_ad_sim_categories_conf_multipl',
-              'user_doc_ad_sim_topics','user_doc_ad_sim_topics_conf','user_doc_ad_sim_topics_conf_multipl',
-              'user_doc_ad_sim_entities','user_doc_ad_sim_entities_conf','user_doc_ad_sim_entities_conf_multipl',
-              'doc_event_doc_ad_sim_categories','doc_event_doc_ad_sim_categories_conf',
-              'doc_event_doc_ad_sim_categories_conf_multipl','doc_event_doc_ad_sim_topics',
-              'doc_event_doc_ad_sim_topics_conf','doc_event_doc_ad_sim_topics_conf_multipl',
-              'doc_event_doc_ad_sim_entities','doc_event_doc_ad_sim_entities_conf',
-              'doc_event_doc_ad_sim_entities_conf_multipl','ad_advertiser','doc_ad_category_id_1',
-              'doc_ad_category_id_2','doc_ad_category_id_3','doc_ad_topic_id_1','doc_ad_topic_id_2',
-              'doc_ad_topic_id_3','doc_ad_entity_id_1','doc_ad_entity_id_2','doc_ad_entity_id_3',
-              'doc_ad_entity_id_4','doc_ad_entity_id_5','doc_ad_entity_id_6','doc_ad_publisher_id',
-              'doc_ad_source_id','doc_event_category_id_1','doc_event_category_id_2','doc_event_category_id_3',
-              'doc_event_topic_id_1','doc_event_topic_id_2','doc_event_topic_id_3','doc_event_entity_id_1',
-              'doc_event_entity_id_2','doc_event_entity_id_3','doc_event_entity_id_4','doc_event_entity_id_5',
-              'doc_event_entity_id_6','doc_event_publisher_id','doc_event_source_id','event_country',
-              'event_country_state','event_geo_location','event_hour','event_platform','traffic_source']
-
-def make_spec(output_dir, batch_size=None):
-  fixed_shape = [batch_size,1] if batch_size is not None else []
-  spec = {}
-  spec[LABEL_COLUMN] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-  spec[DISPLAY_ID_COLUMN] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-  spec[IS_LEAK_COLUMN] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-  spec[DISPLAY_ID_AND_IS_LEAK_ENCODED_COLUMN] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-
-  for name in BOOL_COLUMNS:
-    spec[name] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-  for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM+FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM:
-    spec[name] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.float32, default_value=None)  
-  for name in FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM:
-    spec[name + '_binned'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-  for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM:
-    spec[name + '_binned'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-    spec[name + '_log_01scaled'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.float32, default_value=None)
-  for name in INT_COLUMNS:
-    spec[name + '_log_int'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-    spec[name + '_log_01scaled'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.float32, default_value=None)
-  for name in BOOL_COLUMNS + CATEGORICAL_COLUMNS:
-    spec[name] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
-
-  for multi_category in DOC_CATEGORICAL_MULTIVALUED_COLUMNS:
-    #spec[multi_category] = tf.VarLenFeature(dtype=tf.int64)
-    shape = fixed_shape[:-1]+[len(DOC_CATEGORICAL_MULTIVALUED_COLUMNS[multi_category])]
-    spec[multi_category] = tf.FixedLenFeature(shape=shape, dtype=tf.int64)
-
-  metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(spec))
-	
-  metadata_io.write_metadata(metadata, output_dir)
-
-def tf_log2_1p(x):
-  return tf.log1p(x) / tf.log(2.0)
-
-def log2_1p(x):
-  return np.log1p(x) / np.log(2.0)
-
-def compute_min_max_logs(rows):
-  min_logs = {}
-  max_logs = {}
-  
-  for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM + INT_COLUMNS:
-    min_logs[name + '_log_01scaled'] = float("inf")
-    max_logs[name + '_log_01scaled'] = float("-inf")
-
-  for row in rows:
-    names = CSV_ORDERED_COLUMNS
-    columns_dict = dict(zip(names, row))
-    for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM:
-      nn = name + '_log_01scaled'
-      min_logs[nn] = min(min_logs[nn], log2_1p(columns_dict[name] * 1000))
-      max_logs[nn] = max(max_logs[nn], log2_1p(columns_dict[name] * 1000))
-    for name in INT_COLUMNS:
-      nn = name + '_log_01scaled'
-      min_logs[nn] = min(min_logs[nn], log2_1p(columns_dict[name]))
-      max_logs[nn] = max(max_logs[nn], log2_1p(columns_dict[name]))
-
-  return min_logs, max_logs
-
-def scale_to_0_1(val, minv, maxv):
-  return (val - minv) / (maxv - minv)
-
-def create_tf_example(df, min_logs, max_logs):
-  result = {}
-  result[LABEL_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[LABEL_COLUMN].to_list()))
-  result[DISPLAY_ID_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[DISPLAY_ID_COLUMN].to_list()))
-  result[IS_LEAK_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[IS_LEAK_COLUMN].to_list()))
-  #is_leak = df[IS_LEAK_COLUMN].to_list()
-  encoded_value = df[DISPLAY_ID_COLUMN].multiply(10).add(df[IS_LEAK_COLUMN].clip(lower=0)).to_list()
-  # * 10 + (0 if is_leak < 0 else is_leak)
-  result[DISPLAY_ID_AND_IS_LEAK_ENCODED_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=encoded_value))
-  
-  for name in FLOAT_COLUMNS:
-    result[name] = tf.train.Feature(float_list=tf.train.FloatList(value=df[name].to_list()))
-  for name in FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM:
-    #[int(columns_dict[name] * 10)]
-    value = df[name].multiply(10).astype('int64').to_list()
-    result[name + '_binned'] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
-  for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM:
-    # [int(log2_1p(columns_dict[name] * 1000))]
-    value_prelim = df[name].multiply(1000).apply(np.log1p).multiply(1./np.log(2.0))
-    value = value_prelim.astype('int64').to_list()
-    result[name + '_binned'] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
-    nn = name + '_log_01scaled'
-    #val = log2_1p(columns_dict[name] * 1000)
-    #val = scale_to_0_1(val, min_logs[nn], max_logs[nn])
-    value = value_prelim.add(-min_logs[nn]).multiply(1./(max_logs[nn]-min_logs[nn])).to_list()
-    result[nn] = tf.train.Feature(float_list=tf.train.FloatList(value=value))
-  for name in INT_COLUMNS:
-    #[int(log2_1p(columns_dict[name]))]
-    value_prelim = df[name].apply(np.log1p).multiply(1./np.log(2.0))
-    value = value_prelim.astype('int64').to_list()
-    result[name + '_log_int'] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
-    nn = name + '_log_01scaled'
-    #val = log2_1p(columns_dict[name])
-    #val = scale_to_0_1(val, min_logs[nn], max_logs[nn])
-    value = value_prelim.add(-min_logs[nn]).multiply(1./(max_logs[nn]-min_logs[nn])).to_list()
-    result[nn] = tf.train.Feature(float_list=tf.train.FloatList(value=value))
-  
-  for name in BOOL_COLUMNS + CATEGORICAL_COLUMNS:
-    result[name] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[name].to_list()))
-  
-  for multi_category in DOC_CATEGORICAL_MULTIVALUED_COLUMNS:
-    values = []
-    for category in DOC_CATEGORICAL_MULTIVALUED_COLUMNS[multi_category]:
-      values = values + [df[category].to_numpy()]
-    # need to transpose the series so they will be parsed correctly by the FixedLenFeature
-    # we can pass in a single series here; they'll be reshaped to [batch_size, num_values]
-    # when parsed from the TFRecord
-    value = np.stack(values, axis=1).flatten().tolist()
-    result[multi_category] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))   
-
-  tf_example = tf.train.Example(features=tf.train.Features(feature=result))
-
-  return tf_example

+ 0 - 88
TensorFlow/Recommendation/WideAndDeep/preproc/csv_data_imputation.py

@@ -1,88 +0,0 @@
-# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-
-import pandas as pd
-import os
-import glob
-import tqdm
-import argparse
-from joblib import Parallel, delayed
-
-
-parser = argparse.ArgumentParser()
-parser.add_argument('--train_files_pattern', default='train_feature_vectors_integral_eval.csv/part-*')
-parser.add_argument('--valid_files_pattern', default='validation_feature_vectors_integral.csv/part-*')
-parser.add_argument('--train_dst_dir', default='train_feature_vectors_integral_eval_imputed.csv')
-parser.add_argument('--valid_dst_dir', default='validation_feature_vectors_integral_imputed.csv')
-parser.add_argument('--header_path', default='train_feature_vectors_integral_eval.csv.header')
-parser.add_argument('--num_workers', type=int, default=4)
-args = parser.parse_args()
-
-header = pd.read_csv(args.header_path, header=None)
-columns = header[0].to_list()
-
-train_files = glob.glob(args.train_files_pattern)
-print('train files: ', train_files)
-
-def get_counts(f):
-    df = pd.read_csv(f, header=None, dtype=object, names=columns, na_values='None')
-    counts = {}
-    for c in df:
-        counts[c] = df[c].value_counts()
-    return counts
-
-all_counts = Parallel(n_jobs=args.num_workers)(delayed(get_counts)(f) for f in train_files)
-cols = len(all_counts[0])
-imputation_dict = {}
-for c in tqdm.tqdm(columns):
-    temp = None
-    for i in range(len(all_counts)):
-        if temp is None:
-            temp = pd.Series(all_counts[i][c])
-        else:
-            temp += pd.Series(all_counts[i][c])
-    if len(temp) == 0:
-        imputation_dict[c] = 0
-    else:
-        imputation_dict[c] = temp.index[0]
-
-print('imputation_dict: ', imputation_dict)
-
-if not os.path.exists(args.train_dst_dir):
-    os.mkdir(args.train_dst_dir)
-
-def impute_part(src_path, dst_dir):
-    print('imputing: ', src_path, ' to: ', dst_dir)
-    filename = os.path.basename(src_path)
-    dst_path = os.path.join(dst_dir, filename)
-
-    df = pd.read_csv(src_path, header=None, dtype=object, names=columns, na_values='None')
-    df2 = df.fillna(imputation_dict)
-    df2.to_csv(dst_path, header=None, index=None)
-
-
-print('launching imputation for train CSVs')
-Parallel(n_jobs=args.num_workers)(delayed(impute_part)(f, args.train_dst_dir) for f in train_files)
-
-valid_files = glob.glob(args.valid_files_pattern)
-
-if not os.path.exists(args.valid_dst_dir):
-    os.mkdir(args.valid_dst_dir)
-
-print('launching imputation for validation CSVs')
-Parallel(n_jobs=args.num_workers)(delayed(impute_part)(f, args.valid_dst_dir) for f in valid_files)
-
-print('Done!')

BIN
TensorFlow/Recommendation/WideAndDeep/preproc/data/tensorflow-hadoop-1.5.0.jar


+ 4 - 5
TensorFlow/Recommendation/WideAndDeep/preproc/preproc1.py

@@ -19,21 +19,20 @@ OUTPUT_BUCKET_FOLDER = "/outbrain/preprocessed/"
 DATA_BUCKET_FOLDER = "/outbrain/orig/"
 SPARK_TEMP_FOLDER = "/outbrain/spark-temp/"
 
-from pyspark.sql.types import IntegerType, StringType, StructType, StructField
+from pyspark.sql.types import IntegerType, StringType, StructType, StructField 
 import pyspark.sql.functions as F
 
 from pyspark.context import SparkContext, SparkConf
 from pyspark.sql.session import SparkSession
+from pyspark.sql.functions import col
 
-conf = SparkConf().setMaster('local[*]').set('spark.executor.memory', '256g').set('spark.driver.memory', '126g').set("spark.local.dir", SPARK_TEMP_FOLDER)
+conf = SparkConf().setMaster('local[*]').set('spark.executor.memory', '40g').set('spark.driver.memory', '200g').set("spark.local.dir", SPARK_TEMP_FOLDER)
 
 sc = SparkContext(conf=conf)
 spark = SparkSession(sc)
 
 print('Loading data...')
 
-truncate_day_from_timestamp_udf = F.udf(lambda ts: int(ts / 1000 / 60 / 60 / 24), IntegerType())
-
 events_schema = StructType(
                     [StructField("display_id", IntegerType(), True),
                     StructField("uuid_event", StringType(), True),                    
@@ -46,7 +45,7 @@ events_schema = StructType(
 events_df = spark.read.schema(events_schema) \
   .options(header='true', inferschema='false', nullValue='\\N') \
   .csv(DATA_BUCKET_FOLDER + "events.csv") \
-  .withColumn('day_event', truncate_day_from_timestamp_udf('timestamp_event')) \
+  .withColumn('day_event', (col('timestamp_event') / 1000 / 60 / 60 / 24).cast("int")) \
   .alias('events')   
 
 events_df.count()

+ 1 - 4
TensorFlow/Recommendation/WideAndDeep/preproc/preproc2.py

@@ -25,9 +25,6 @@ import pyspark.sql.functions as F
 import math
 import time
 
-import random
-random.seed(42)
-
 from pyspark.context import SparkContext, SparkConf
 from pyspark.sql.session import SparkSession
 
@@ -43,7 +40,7 @@ args = parser.parse_args()
 
 evaluation = not args.submission
 
-conf = SparkConf().setMaster('local[*]').set('spark.executor.memory', '256g').set('spark.driver.memory', '126g').set("spark.local.dir", SPARK_TEMP_FOLDER)
+conf = SparkConf().setMaster('local[*]').set('spark.executor.memory', '40g').set('spark.driver.memory', '200g').set("spark.local.dir", SPARK_TEMP_FOLDER)
 
 sc = SparkContext(conf=conf)
 spark = SparkSession(sc)

+ 2 - 71
TensorFlow/Recommendation/WideAndDeep/preproc/preproc3.py

@@ -28,7 +28,7 @@ from pyspark.ml.linalg import SparseVector, VectorUDT
 from pyspark.context import SparkContext, SparkConf
 from pyspark.sql.session import SparkSession
 
-conf = SparkConf().setMaster('local[*]').set('spark.executor.memory', '256g').set('spark.driver.memory', '126g').set("spark.local.dir", SPARK_TEMP_FOLDER)
+conf = SparkConf().setMaster('local[*]').set('spark.executor.memory', '40g').set('spark.driver.memory', '200g').set("spark.local.dir", SPARK_TEMP_FOLDER)
 
 sc = SparkContext(conf=conf)
 spark = SparkSession(sc)
@@ -59,7 +59,6 @@ args = parser.parse_args()
 
 evaluation = not args.submission
 
-
 # ## UDFs
 def date_time_to_unix_epoch(date_time):
   return int(time.mktime(date_time.timetuple()))
@@ -98,7 +97,6 @@ def convert_odd_timestamp(timestamp_ms_relative):
     TIMESTAMP_DELTA=1465876799998
     return datetime.datetime.fromtimestamp((int(timestamp_ms_relative)+TIMESTAMP_DELTA)//1000)
 
-
 # # Loading Files
 
 # ## Loading UTC/BST for each country and US / CA states (local time)
@@ -870,9 +868,9 @@ len(entities_docs_counts)
 documents_total = documents_meta_df.count()
 documents_total
 
-
 # ## Exploring Publish Time
 publish_times_df = train_set_df.filter('publish_time is not null').select('document_id_promo','publish_time').distinct().select(F.col('publish_time').cast(IntegerType()))
+
 publish_time_percentiles = get_percentiles(publish_times_df, 'publish_time', quantiles_levels=[0.5], max_error_rate=0.001)
 publish_time_percentiles
 
@@ -1962,42 +1960,6 @@ else:
 
 train_set_feature_vectors_df.write.parquet(OUTPUT_BUCKET_FOLDER+train_feature_vector_gcs_folder_name, mode='overwrite')
 
-
-# ## Exporting integral feature vectors to CSV
-train_feature_vectors_exported_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER+train_feature_vector_gcs_folder_name)
-train_feature_vectors_exported_df.take(3)
-
-if evaluation:
-  train_feature_vector_integral_csv_folder_name = 'train_feature_vectors_integral_eval.csv'
-else:
-  train_feature_vector_integral_csv_folder_name = 'train_feature_vectors_integral.csv'
-
-integral_headers = ['label', 'display_id', 'ad_id', 'doc_id', 'doc_event_id', 'is_leak'] + feature_vector_labels_integral
-  
-with open(OUTPUT_BUCKET_FOLDER+train_feature_vector_integral_csv_folder_name+".header", 'w') as output:
-  output.writelines('\n'.join(integral_headers))
-
-def sparse_vector_to_csv_with_nulls_row(additional_column_values, vec, num_columns):  
-  def format_number(x):
-      if int(x) == x:
-          return str(int(x))
-      else:
-          return '{:.3}'.format(x)
-      
-  return ','.join([str(value) for value in additional_column_values] + 
-                   list([ format_number(vec[x]) if x in vec.indices else '' for x in range(vec.size) ])[:num_columns]) \
-          .replace('.0,',',')
-
-train_feature_vectors_integral_csv_rdd = train_feature_vectors_exported_df.select(
-    'label', 'display_id', 'ad_id', 'document_id', 'document_id_event', 'feature_vector') \
-  .withColumn('is_leak', F.lit(-1)) \
-  .rdd.map(lambda x: sparse_vector_to_csv_with_nulls_row([x['label'], x['display_id'], 
-      x['ad_id'], x['document_id'], x['document_id_event'], x['is_leak']], 
-      x['feature_vector'], len(integral_headers)))
-
-train_feature_vectors_integral_csv_rdd.saveAsTextFile(OUTPUT_BUCKET_FOLDER+train_feature_vector_integral_csv_folder_name)
-
-
 # # Export Validation/Test set feature vectors
 def is_leak(max_timestamp_pv_leak, timestamp_event):
   return max_timestamp_pv_leak >= 0 and max_timestamp_pv_leak >= timestamp_event
@@ -2009,7 +1971,6 @@ if evaluation:
 else:
   data_df = test_set_df
 
-
 test_validation_set_enriched_df = data_df.select(
     'display_id','uuid_event','event_country','event_country_state','platform_event',
     'source_id_doc_event', 'publisher_doc_event','publish_time_doc_event',     
@@ -2099,35 +2060,5 @@ else:
 
 test_validation_set_feature_vectors_df.write.parquet(OUTPUT_BUCKET_FOLDER+test_validation_feature_vector_gcs_folder_name, mode='overwrite')
 
-
-# ## Exporting integral feature vectors to CSV
-test_validation_feature_vectors_exported_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER+test_validation_feature_vector_gcs_folder_name)
-test_validation_feature_vectors_exported_df.take(3)
-
-if evaluation:
-  test_validation_feature_vector_integral_csv_folder_name = \
-    'validation_feature_vectors_integral.csv'
-else:
-  test_validation_feature_vector_integral_csv_folder_name = \
-    'test_feature_vectors_integral.csv'
-
-integral_headers = ['label', 'display_id', 'ad_id', 'doc_id', 'doc_event_id', 'is_leak'] \
-  + feature_vector_labels_integral
-  
-with open(OUTPUT_BUCKET_FOLDER + test_validation_feature_vector_integral_csv_folder_name \
-    +".header", 'w') as output:
-  output.writelines('\n'.join(integral_headers))
-
-test_validation_feature_vectors_integral_csv_rdd = \
-  test_validation_feature_vectors_exported_df.select(
-    'label', 'display_id', 'ad_id', 'document_id', 'document_id_event', 
-    'is_leak', 'feature_vector') \
-  .rdd.map(lambda x: sparse_vector_to_csv_with_nulls_row([x['label'], 
-    x['display_id'], x['ad_id'], x['document_id'], x['document_id_event'], x['is_leak']], 
-    x['feature_vector'], len(integral_headers)))
-
-test_validation_feature_vectors_integral_csv_rdd.saveAsTextFile(
-  OUTPUT_BUCKET_FOLDER+test_validation_feature_vector_integral_csv_folder_name)
-
 spark.stop()
 

+ 568 - 0
TensorFlow/Recommendation/WideAndDeep/preproc/preproc4.py

@@ -0,0 +1,568 @@
+#!/usr/bin/env python
+# coding: utf-8
+
+# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+evaluation = True
+evaluation_verbose = False
+
+OUTPUT_BUCKET_FOLDER = "/outbrain/preprocessed/"
+DATA_BUCKET_FOLDER = "/outbrain/orig/"
+SPARK_TEMP_FOLDER = "/outbrain/spark-temp/"
+LOCAL_DATA_TFRECORDS_DIR="/outbrain/tfrecords"
+
+TEST_SET_MODE = False
+
+TENSORFLOW_HADOOP="preproc/data/tensorflow-hadoop-1.5.0.jar"
+
+from IPython.display import display
+
+import pyspark.sql.functions as F
+from pyspark.ml.linalg import Vectors, SparseVector, VectorUDT
+
+from pyspark.context import SparkContext, SparkConf
+from pyspark.sql.session import SparkSession
+
+conf = SparkConf().setMaster('local[*]').set('spark.executor.memory', '40g').set('spark.driver.memory', '200g').set("spark.local.dir", SPARK_TEMP_FOLDER)
+conf.set("spark.jars", TENSORFLOW_HADOOP)
+conf.set("spark.sql.files.maxPartitionBytes", 805306368)
+
+sc = SparkContext(conf=conf)
+spark = SparkSession(sc)
+
+from pyspark.sql import Row
+from pyspark.sql.types import ArrayType, BinaryType, DoubleType, LongType, StringType, StructField, StructType
+from pyspark.sql.functions import col, when, log1p, udf
+
+import numpy as np
+import scipy.sparse
+
+import math
+import datetime
+import time
+import itertools
+
+import pickle
+
+import pandas as pd
+import tensorflow as tf
+from tensorflow_transform.tf_metadata import dataset_schema
+from tensorflow_transform.tf_metadata import dataset_metadata
+from tensorflow_transform.tf_metadata import metadata_io
+
+import trainer
+from trainer.features import LABEL_COLUMN, DISPLAY_ID_COLUMN, AD_ID_COLUMN, IS_LEAK_COLUMN, DISPLAY_ID_AND_IS_LEAK_ENCODED_COLUMN, CATEGORICAL_COLUMNS, DOC_CATEGORICAL_MULTIVALUED_COLUMNS, BOOL_COLUMNS, INT_COLUMNS, FLOAT_COLUMNS, FLOAT_COLUMNS_LOG_BIN_TRANSFORM, FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM
+
+import argparse
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument(
+  '--prebatch_size',
+  help='Prebatch size in created tfrecords',
+  type=int,
+  default=4096)
+
+parser.add_argument(
+    '--submission',
+    action='store_true',
+    default=False
+)
+
+args = parser.parse_args()
+
+batch_size = args.prebatch_size
+
+# # Feature Vector export
+bool_feature_names = ['event_weekend',
+                      'user_has_already_viewed_doc']
+
+int_feature_names = ['user_views',
+                    'ad_views',
+                    'doc_views',
+                    'doc_event_days_since_published',
+                    'doc_event_hour',
+                    'doc_ad_days_since_published',
+                    ]
+
+float_feature_names = [
+                'pop_ad_id',
+                'pop_ad_id_conf',
+                'pop_ad_id_conf_multipl',
+                'pop_document_id',
+                'pop_document_id_conf',
+                'pop_document_id_conf_multipl',
+                'pop_publisher_id',
+                'pop_publisher_id_conf',
+                'pop_publisher_id_conf_multipl',
+                'pop_advertiser_id',
+                'pop_advertiser_id_conf',
+                'pop_advertiser_id_conf_multipl',
+                'pop_campain_id',
+                'pop_campain_id_conf',
+                'pop_campain_id_conf_multipl',
+                'pop_doc_event_doc_ad',
+                'pop_doc_event_doc_ad_conf',
+                'pop_doc_event_doc_ad_conf_multipl',
+                'pop_source_id',
+                'pop_source_id_conf',
+                'pop_source_id_conf_multipl',
+                'pop_source_id_country',
+                'pop_source_id_country_conf',
+                'pop_source_id_country_conf_multipl',
+                'pop_entity_id',
+                'pop_entity_id_conf',
+                'pop_entity_id_conf_multipl',
+                'pop_entity_id_country',
+                'pop_entity_id_country_conf',
+                'pop_entity_id_country_conf_multipl',
+                'pop_topic_id',
+                'pop_topic_id_conf',
+                'pop_topic_id_conf_multipl',
+                'pop_topic_id_country',
+                'pop_topic_id_country_conf',
+                'pop_topic_id_country_conf_multipl',
+                'pop_category_id',
+                'pop_category_id_conf',
+                'pop_category_id_conf_multipl',
+                'pop_category_id_country',
+                'pop_category_id_country_conf',
+                'pop_category_id_country_conf_multipl',
+                'user_doc_ad_sim_categories',
+                'user_doc_ad_sim_categories_conf',
+                'user_doc_ad_sim_categories_conf_multipl',
+                'user_doc_ad_sim_topics',
+                'user_doc_ad_sim_topics_conf',
+                'user_doc_ad_sim_topics_conf_multipl',
+                'user_doc_ad_sim_entities',
+                'user_doc_ad_sim_entities_conf',
+                'user_doc_ad_sim_entities_conf_multipl',
+                'doc_event_doc_ad_sim_categories',
+                'doc_event_doc_ad_sim_categories_conf',
+                'doc_event_doc_ad_sim_categories_conf_multipl',
+                'doc_event_doc_ad_sim_topics',
+                'doc_event_doc_ad_sim_topics_conf',
+                'doc_event_doc_ad_sim_topics_conf_multipl',
+                'doc_event_doc_ad_sim_entities',
+                'doc_event_doc_ad_sim_entities_conf',
+                'doc_event_doc_ad_sim_entities_conf_multipl'
+               ]
+
+# ### Configuring feature vector
+category_feature_names_integral = ['ad_advertiser',
+ 'doc_ad_category_id_1',
+ 'doc_ad_category_id_2',
+ 'doc_ad_category_id_3',
+ 'doc_ad_topic_id_1',
+ 'doc_ad_topic_id_2',
+ 'doc_ad_topic_id_3',
+ 'doc_ad_entity_id_1',
+ 'doc_ad_entity_id_2',
+ 'doc_ad_entity_id_3',
+ 'doc_ad_entity_id_4',
+ 'doc_ad_entity_id_5',
+ 'doc_ad_entity_id_6',
+ 'doc_ad_publisher_id',
+ 'doc_ad_source_id',
+ 'doc_event_category_id_1',
+ 'doc_event_category_id_2',
+ 'doc_event_category_id_3',
+ 'doc_event_topic_id_1',
+ 'doc_event_topic_id_2',
+ 'doc_event_topic_id_3',
+ 'doc_event_entity_id_1',
+ 'doc_event_entity_id_2',
+ 'doc_event_entity_id_3',
+ 'doc_event_entity_id_4',
+ 'doc_event_entity_id_5',
+ 'doc_event_entity_id_6',
+ 'doc_event_publisher_id',
+ 'doc_event_source_id',
+ 'event_country',
+ 'event_country_state',
+ 'event_geo_location',
+ 'event_hour',
+ 'event_platform',
+ 'traffic_source']
+
+
+feature_vector_labels_integral = bool_feature_names + int_feature_names + float_feature_names + category_feature_names_integral
+
+if args.submission:
+  train_feature_vector_gcs_folder_name = 'train_feature_vectors_integral'
+else:
+  train_feature_vector_gcs_folder_name = 'train_feature_vectors_integral_eval'
+
+# ## Exporting integral feature vectors to CSV
+train_feature_vectors_exported_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER+train_feature_vector_gcs_folder_name)
+train_feature_vectors_exported_df.take(3)
+
+integral_headers = ['label', 'display_id', 'ad_id', 'doc_id', 'doc_event_id', 'is_leak'] + feature_vector_labels_integral
+
+CSV_ORDERED_COLUMNS = ['label','display_id','ad_id','doc_id','doc_event_id','is_leak','event_weekend',
+              'user_has_already_viewed_doc','user_views','ad_views','doc_views',
+              'doc_event_days_since_published','doc_event_hour','doc_ad_days_since_published',
+              'pop_ad_id','pop_ad_id_conf',
+              'pop_ad_id_conf_multipl','pop_document_id','pop_document_id_conf',
+              'pop_document_id_conf_multipl','pop_publisher_id','pop_publisher_id_conf',
+              'pop_publisher_id_conf_multipl','pop_advertiser_id','pop_advertiser_id_conf',
+              'pop_advertiser_id_conf_multipl','pop_campain_id','pop_campain_id_conf',
+              'pop_campain_id_conf_multipl','pop_doc_event_doc_ad','pop_doc_event_doc_ad_conf',
+              'pop_doc_event_doc_ad_conf_multipl','pop_source_id','pop_source_id_conf',
+              'pop_source_id_conf_multipl','pop_source_id_country','pop_source_id_country_conf',
+              'pop_source_id_country_conf_multipl','pop_entity_id','pop_entity_id_conf',
+              'pop_entity_id_conf_multipl','pop_entity_id_country','pop_entity_id_country_conf',
+              'pop_entity_id_country_conf_multipl','pop_topic_id','pop_topic_id_conf',
+              'pop_topic_id_conf_multipl','pop_topic_id_country','pop_topic_id_country_conf',
+              'pop_topic_id_country_conf_multipl','pop_category_id','pop_category_id_conf',
+              'pop_category_id_conf_multipl','pop_category_id_country','pop_category_id_country_conf',
+              'pop_category_id_country_conf_multipl','user_doc_ad_sim_categories',
+              'user_doc_ad_sim_categories_conf','user_doc_ad_sim_categories_conf_multipl',
+              'user_doc_ad_sim_topics','user_doc_ad_sim_topics_conf','user_doc_ad_sim_topics_conf_multipl',
+              'user_doc_ad_sim_entities','user_doc_ad_sim_entities_conf','user_doc_ad_sim_entities_conf_multipl',
+              'doc_event_doc_ad_sim_categories','doc_event_doc_ad_sim_categories_conf',
+              'doc_event_doc_ad_sim_categories_conf_multipl','doc_event_doc_ad_sim_topics',
+              'doc_event_doc_ad_sim_topics_conf','doc_event_doc_ad_sim_topics_conf_multipl',
+              'doc_event_doc_ad_sim_entities','doc_event_doc_ad_sim_entities_conf',
+              'doc_event_doc_ad_sim_entities_conf_multipl','ad_advertiser','doc_ad_category_id_1',
+              'doc_ad_category_id_2','doc_ad_category_id_3','doc_ad_topic_id_1','doc_ad_topic_id_2',
+              'doc_ad_topic_id_3','doc_ad_entity_id_1','doc_ad_entity_id_2','doc_ad_entity_id_3',
+              'doc_ad_entity_id_4','doc_ad_entity_id_5','doc_ad_entity_id_6','doc_ad_publisher_id',
+              'doc_ad_source_id','doc_event_category_id_1','doc_event_category_id_2','doc_event_category_id_3',
+              'doc_event_topic_id_1','doc_event_topic_id_2','doc_event_topic_id_3','doc_event_entity_id_1',
+              'doc_event_entity_id_2','doc_event_entity_id_3','doc_event_entity_id_4','doc_event_entity_id_5',
+              'doc_event_entity_id_6','doc_event_publisher_id','doc_event_source_id','event_country',
+              'event_country_state','event_geo_location','event_hour','event_platform','traffic_source']
+  
+FEAT_CSV_ORDERED_COLUMNS = ['event_weekend',
+              'user_has_already_viewed_doc','user_views','ad_views','doc_views',
+              'doc_event_days_since_published','doc_event_hour','doc_ad_days_since_published',
+              'pop_ad_id','pop_ad_id_conf',
+              'pop_ad_id_conf_multipl','pop_document_id','pop_document_id_conf',
+              'pop_document_id_conf_multipl','pop_publisher_id','pop_publisher_id_conf',
+              'pop_publisher_id_conf_multipl','pop_advertiser_id','pop_advertiser_id_conf',
+              'pop_advertiser_id_conf_multipl','pop_campain_id','pop_campain_id_conf',
+              'pop_campain_id_conf_multipl','pop_doc_event_doc_ad','pop_doc_event_doc_ad_conf',
+              'pop_doc_event_doc_ad_conf_multipl','pop_source_id','pop_source_id_conf',
+              'pop_source_id_conf_multipl','pop_source_id_country','pop_source_id_country_conf',
+              'pop_source_id_country_conf_multipl','pop_entity_id','pop_entity_id_conf',
+              'pop_entity_id_conf_multipl','pop_entity_id_country','pop_entity_id_country_conf',
+              'pop_entity_id_country_conf_multipl','pop_topic_id','pop_topic_id_conf',
+              'pop_topic_id_conf_multipl','pop_topic_id_country','pop_topic_id_country_conf',
+              'pop_topic_id_country_conf_multipl','pop_category_id','pop_category_id_conf',
+              'pop_category_id_conf_multipl','pop_category_id_country','pop_category_id_country_conf',
+              'pop_category_id_country_conf_multipl','user_doc_ad_sim_categories',
+              'user_doc_ad_sim_categories_conf','user_doc_ad_sim_categories_conf_multipl',
+              'user_doc_ad_sim_topics','user_doc_ad_sim_topics_conf','user_doc_ad_sim_topics_conf_multipl',
+              'user_doc_ad_sim_entities','user_doc_ad_sim_entities_conf','user_doc_ad_sim_entities_conf_multipl',
+              'doc_event_doc_ad_sim_categories','doc_event_doc_ad_sim_categories_conf',
+              'doc_event_doc_ad_sim_categories_conf_multipl','doc_event_doc_ad_sim_topics',
+              'doc_event_doc_ad_sim_topics_conf','doc_event_doc_ad_sim_topics_conf_multipl',
+              'doc_event_doc_ad_sim_entities','doc_event_doc_ad_sim_entities_conf',
+              'doc_event_doc_ad_sim_entities_conf_multipl','ad_advertiser','doc_ad_category_id_1',
+              'doc_ad_category_id_2','doc_ad_category_id_3','doc_ad_topic_id_1','doc_ad_topic_id_2',
+              'doc_ad_topic_id_3','doc_ad_entity_id_1','doc_ad_entity_id_2','doc_ad_entity_id_3',
+              'doc_ad_entity_id_4','doc_ad_entity_id_5','doc_ad_entity_id_6','doc_ad_publisher_id',
+              'doc_ad_source_id','doc_event_category_id_1','doc_event_category_id_2','doc_event_category_id_3',
+              'doc_event_topic_id_1','doc_event_topic_id_2','doc_event_topic_id_3','doc_event_entity_id_1',
+              'doc_event_entity_id_2','doc_event_entity_id_3','doc_event_entity_id_4','doc_event_entity_id_5',
+              'doc_event_entity_id_6','doc_event_publisher_id','doc_event_source_id','event_country',
+              'event_country_state','event_geo_location','event_hour','event_platform','traffic_source']
+
+def to_array(col):
+    def to_array_(v):
+      return v.toArray().tolist()
+    # Important: asNondeterministic requires Spark 2.3 or later
+    # It can be safely removed i.e.
+    # return udf(to_array_, ArrayType(DoubleType()))(col)
+    # but at the cost of decreased performance
+ 
+    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
+
+
+CONVERT_TO_INT = ['doc_ad_category_id_1',
+              'doc_ad_category_id_2','doc_ad_category_id_3','doc_ad_topic_id_1','doc_ad_topic_id_2',
+              'doc_ad_topic_id_3','doc_ad_entity_id_1','doc_ad_entity_id_2','doc_ad_entity_id_3',
+              'doc_ad_entity_id_4','doc_ad_entity_id_5','doc_ad_entity_id_6',
+              'doc_ad_source_id','doc_event_category_id_1','doc_event_category_id_2','doc_event_category_id_3',
+              'doc_event_topic_id_1','doc_event_topic_id_2','doc_event_topic_id_3','doc_event_entity_id_1',
+              'doc_event_entity_id_2','doc_event_entity_id_3','doc_event_entity_id_4','doc_event_entity_id_5', 'doc_event_entity_id_6']
+
+
+def format_number(element, name):
+    if name in BOOL_COLUMNS + CATEGORICAL_COLUMNS:
+      return element.cast("int")
+    elif name in CONVERT_TO_INT:
+      return element.cast("int")
+    else:
+      return element
+
+def to_array_with_none(col):
+    def to_array_with_none_(v):
+      tmp= np.full((v.size,), fill_value=None, dtype=np.float64)
+      tmp[v.indices] = v.values
+      return tmp.tolist()
+    # Important: asNondeterministic requires Spark 2.3 or later
+    # It can be safely removed i.e.
+    # return udf(to_array_, ArrayType(DoubleType()))(col)
+    # but at the cost of decreased performance
+ 
+    return udf(to_array_with_none_, ArrayType(DoubleType())).asNondeterministic()(col)
+
+@udf
+def count_value(x):
+    from collections import Counter
+    tmp = Counter(x).most_common(2)
+    if not tmp or np.isnan(tmp[0][0]):
+      return 0
+    return float(tmp[0][0])
+
+def replace_with_most_frequent(most_value):
+    return udf( lambda x: most_value if not x or np.isnan(x) else x)
+
+
+train_feature_vectors_integral_csv_rdd_df = train_feature_vectors_exported_df.select('label', 'display_id', 'ad_id', 'document_id', 'document_id_event', 'feature_vector').withColumn('is_leak', F.lit(-1)).withColumn("featvec", to_array("feature_vector")).select(['label'] + ['display_id'] + ['ad_id'] + ['document_id'] + ['document_id_event'] + ['is_leak'] + [format_number(element, FEAT_CSV_ORDERED_COLUMNS[index]).alias(FEAT_CSV_ORDERED_COLUMNS[index]) for index, element in enumerate([col("featvec")[i] for i in range(len(feature_vector_labels_integral))])]).replace(float('nan'), 0)
+
+if args.submission:
+  test_validation_feature_vector_gcs_folder_name = 'test_feature_vectors_integral'
+else:
+  test_validation_feature_vector_gcs_folder_name = 'validation_feature_vectors_integral'
+
+# ## Exporting integral feature vectors 
+test_validation_feature_vectors_exported_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER+test_validation_feature_vector_gcs_folder_name)
+test_validation_feature_vectors_exported_df.take(3)
+
+test_validation_feature_vectors_integral_csv_rdd_df = test_validation_feature_vectors_exported_df.select(
+    'label', 'display_id', 'ad_id', 'document_id', 'document_id_event',
+    'is_leak', 'feature_vector').withColumn("featvec", to_array("feature_vector")).select(['label'] + ['display_id'] + ['ad_id'] + ['document_id'] + ['document_id_event'] + ['is_leak'] + [format_number(element, FEAT_CSV_ORDERED_COLUMNS[index]).alias(FEAT_CSV_ORDERED_COLUMNS[index]) for index, element in enumerate([col("featvec")[i] for i in range(len(feature_vector_labels_integral))])]).replace(float('nan'), 0)
+
+def make_spec(output_dir, batch_size=None):
+  fixed_shape = [batch_size,1] if batch_size is not None else []
+  spec = {}
+  spec[LABEL_COLUMN] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+  spec[DISPLAY_ID_COLUMN] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+  spec[IS_LEAK_COLUMN] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+  spec[DISPLAY_ID_AND_IS_LEAK_ENCODED_COLUMN] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+  for name in BOOL_COLUMNS:
+    spec[name] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+  for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM+FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM:
+    spec[name] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.float32, default_value=None)
+  for name in FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM:
+    spec[name + '_binned'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+  for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM:
+    spec[name + '_binned'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+    spec[name + '_log_01scaled'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.float32, default_value=None)
+  for name in INT_COLUMNS:
+    spec[name + '_log_int'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+    spec[name + '_log_01scaled'] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.float32, default_value=None)
+  for name in BOOL_COLUMNS + CATEGORICAL_COLUMNS:
+    spec[name] = tf.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
+  for multi_category in DOC_CATEGORICAL_MULTIVALUED_COLUMNS:
+    shape = fixed_shape[:-1]+[len(DOC_CATEGORICAL_MULTIVALUED_COLUMNS[multi_category])]
+    spec[multi_category] = tf.FixedLenFeature(shape=shape, dtype=tf.int64)
+  metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(spec))
+  metadata_io.write_metadata(metadata, output_dir)
+
+# write out tfrecords meta 
+make_spec(LOCAL_DATA_TFRECORDS_DIR + '/transformed_metadata', batch_size=batch_size)
+
+def log2_1p(x):
+  return np.log1p(x) / np.log(2.0)
+
+# calculate min and max stats for the given dataframes all in one go
+def compute_min_max_logs(df):
+  print(str(datetime.datetime.now()) + '\tComputing min and max')
+  min_logs = {}
+  max_logs = {}
+  all_dict = {}
+  float_expr = []
+  for name in trainer.features.FLOAT_COLUMNS_LOG_BIN_TRANSFORM + trainer.features.INT_COLUMNS:
+    float_expr.append(F.min(name))
+    float_expr.append(F.max(name))
+  floatDf = all_df.agg(*float_expr).collect()
+  for name in trainer.features.FLOAT_COLUMNS_LOG_BIN_TRANSFORM:
+    minAgg = floatDf[0]["min("+name+")"]
+    maxAgg = floatDf[0]["max("+name+")"]
+    min_logs[name + '_log_01scaled'] = log2_1p(minAgg*1000)
+    max_logs[name + '_log_01scaled'] = log2_1p(maxAgg*1000)
+  for name in  trainer.features.INT_COLUMNS:
+    minAgg = floatDf[0]["min("+name+")"]
+    maxAgg = floatDf[0]["max("+name+")"]
+    min_logs[name + '_log_01scaled'] = log2_1p(minAgg)
+    max_logs[name + '_log_01scaled'] = log2_1p(maxAgg)
+
+  return min_logs, max_logs
+
+
+all_df = test_validation_feature_vectors_integral_csv_rdd_df.union(train_feature_vectors_integral_csv_rdd_df)
+min_logs, max_logs = compute_min_max_logs(all_df)
+
+if args.submission:
+  train_output_string = '/sub_train'
+  eval_output_string = '/test'
+else:
+  train_output_string = '/train'
+  eval_output_string = '/eval'
+
+path = LOCAL_DATA_TFRECORDS_DIR
+
+def create_tf_example_spark(df, min_logs, max_logs):
+  result = {}
+  result[LABEL_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[LABEL_COLUMN].to_list()))
+  result[DISPLAY_ID_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[DISPLAY_ID_COLUMN].to_list()))
+  result[IS_LEAK_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[IS_LEAK_COLUMN].to_list()))
+  encoded_value = df[DISPLAY_ID_COLUMN].multiply(10).add(df[IS_LEAK_COLUMN].clip(lower=0)).to_list()
+  result[DISPLAY_ID_AND_IS_LEAK_ENCODED_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=encoded_value))
+  for name in FLOAT_COLUMNS:
+    value = df[name].to_list()
+    result[name] = tf.train.Feature(float_list=tf.train.FloatList(value=value))
+  for name in FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM:
+    value = df[name].multiply(10).astype('int64').to_list()
+    result[name + '_binned'] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
+  for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM:
+    value_prelim = df[name].multiply(1000).apply(np.log1p).multiply(1./np.log(2.0))
+    value = value_prelim.astype('int64').to_list()
+    result[name + '_binned'] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
+    nn = name + '_log_01scaled'
+    value = value_prelim.add(-min_logs[nn]).multiply(1./(max_logs[nn]-min_logs[nn])).to_list()
+    result[nn] = tf.train.Feature(float_list=tf.train.FloatList(value=value))
+  for name in INT_COLUMNS:
+    value_prelim = df[name].apply(np.log1p).multiply(1./np.log(2.0))
+    value = value_prelim.astype('int64').to_list()
+    result[name + '_log_int'] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
+    nn = name + '_log_01scaled'
+    value = value_prelim.add(-min_logs[nn]).multiply(1./(max_logs[nn]-min_logs[nn])).to_list()
+    result[nn] = tf.train.Feature(float_list=tf.train.FloatList(value=value))
+  for name in BOOL_COLUMNS + CATEGORICAL_COLUMNS:
+    value = df[name].fillna(0).astype('int64').to_list()
+    result[name] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
+  for multi_category in DOC_CATEGORICAL_MULTIVALUED_COLUMNS:
+    values = []
+    for category in DOC_CATEGORICAL_MULTIVALUED_COLUMNS[multi_category]:
+      values = values + [df[category].to_numpy()]
+    # need to transpose the series so they will be parsed correctly by the FixedLenFeature
+    # we can pass in a single series here; they'll be reshaped to [batch_size, num_values]
+    # when parsed from the TFRecord
+    value = np.stack(values, axis=1).flatten().tolist()
+    result[multi_category] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
+  tf_example = tf.train.Example(features=tf.train.Features(feature=result))
+  return tf_example
+
+def _transform_to_tfrecords(rdds):
+  csv = pd.DataFrame(list(rdds), columns=CSV_ORDERED_COLUMNS)
+  num_rows = len(csv.index)
+  examples = []
+  for start_ind in range(0,num_rows,batch_size if batch_size is not None else 1): # for each batch
+    if start_ind + batch_size - 1 > num_rows: # if we'd run out of rows
+      csv_slice = csv.iloc[start_ind:] 
+      # drop the remainder
+      print("last Example has: ", len(csv_slice))
+      examples.append((create_tf_example_spark(csv_slice, min_logs, max_logs), len(csv_slice)))
+      return examples
+    else:
+      csv_slice = csv.iloc[start_ind:start_ind+(batch_size if batch_size is not None else 1)]
+    examples.append((create_tf_example_spark(csv_slice, min_logs, max_logs), batch_size))
+  return examples
+
+from pyspark import TaskContext
+max_partition_num = 30
+
+def _transform_to_slices(rdds):
+  taskcontext = TaskContext.get()
+  partitionid = taskcontext.partitionId()
+  csv = pd.DataFrame(list(rdds), columns=CSV_ORDERED_COLUMNS)
+  num_rows = len(csv.index)
+  print("working with partition: ", partitionid, max_partition_num, num_rows)
+  examples = []
+  for start_ind in range(0,num_rows,batch_size if batch_size is not None else 1): # for each batch
+    if start_ind + batch_size - 1 > num_rows: # if we'd run out of rows
+      csv_slice = csv.iloc[start_ind:] 
+      print("last Example has: ", len(csv_slice), partitionid)
+      examples.append((csv_slice, len(csv_slice)))
+      return examples
+    else:
+      csv_slice = csv.iloc[start_ind:start_ind+(batch_size if batch_size is not None else 1)]
+    examples.append((csv_slice, len(csv_slice)))
+  return examples
+
+def _transform_to_tfrecords_from_slices(rdds):
+  examples = []
+  for slice in rdds:
+    if len(slice[0]) != batch_size:
+      print("slice size is not correct, dropping: ", len(slice[0]))
+    else:
+      examples.append((bytearray((create_tf_example_spark(slice[0], min_logs, max_logs)).SerializeToString()), None))
+  return examples
+
+def _transform_to_tfrecords_from_reslice(rdds):
+  examples = []
+  all_dataframes = pd.DataFrame([])
+  for slice in rdds:
+    all_dataframes = all_dataframes.append(slice[0])
+  num_rows = len(all_dataframes.index)
+  examples = []
+  for start_ind in range(0,num_rows,batch_size if batch_size is not None else 1): # for each batch
+    if start_ind + batch_size - 1 > num_rows: # if we'd run out of rows
+      csv_slice = all_dataframes.iloc[start_ind:]
+      if TEST_SET_MODE:
+        remain_len = batch_size - len(csv_slice)
+        (m, n) = divmod(remain_len, len(csv_slice))
+        print("remainder: ", len(csv_slice), remain_len, m, n)
+        if m:
+          for i in range(m):
+            csv_slice = csv_slice.append(csv_slice)
+        csv_slice = csv_slice.append(csv_slice.iloc[:n])
+        print("after fill remainder: ", len(csv_slice))
+        examples.append((bytearray((create_tf_example_spark(csv_slice, min_logs, max_logs)).SerializeToString()), None))
+        return examples
+      # drop the remainder
+      print("dropping remainder: ", len(csv_slice))
+      return examples
+    else:
+      csv_slice = all_dataframes.iloc[start_ind:start_ind+(batch_size if batch_size is not None else 1)]
+      examples.append((bytearray((create_tf_example_spark(csv_slice, min_logs, max_logs)).SerializeToString()), None))
+  return examples
+
+TEST_SET_MODE = False
+train_features = train_feature_vectors_integral_csv_rdd_df.coalesce(30).rdd.mapPartitions(_transform_to_slices)
+cached_train_features = train_features.cache()
+cached_train_features.count()
+train_full = cached_train_features.filter(lambda x: x[1] == batch_size)
+# split out slies where we don't have a full batch so that we can reslice them so we only drop mininal rows
+train_not_full = cached_train_features.filter(lambda x: x[1] < batch_size)
+train_examples_full = train_full.mapPartitions(_transform_to_tfrecords_from_slices)
+train_left = train_not_full.coalesce(1).mapPartitions(_transform_to_tfrecords_from_reslice)
+all_train = train_examples_full.union(train_left)
+
+TEST_SET_MODE = True
+valid_features = test_validation_feature_vectors_integral_csv_rdd_df.coalesce(30).rdd.mapPartitions(_transform_to_slices)
+cached_valid_features = valid_features.cache()
+cached_valid_features.count()
+valid_full = cached_valid_features.filter(lambda x: x[1] == batch_size)
+valid_not_full = cached_valid_features.filter(lambda x: x[1] < batch_size)
+valid_examples_full = valid_full.mapPartitions(_transform_to_tfrecords_from_slices)
+valid_left = valid_not_full.coalesce(1).mapPartitions(_transform_to_tfrecords_from_reslice)
+all_valid = valid_examples_full.union(valid_left)
+
+all_train.saveAsNewAPIHadoopFile(LOCAL_DATA_TFRECORDS_DIR + train_output_string, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
+                                keyClass="org.apache.hadoop.io.BytesWritable",
+                                valueClass="org.apache.hadoop.io.NullWritable")
+
+all_valid.saveAsNewAPIHadoopFile(LOCAL_DATA_TFRECORDS_DIR + eval_output_string, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
+                                keyClass="org.apache.hadoop.io.BytesWritable",
+                                valueClass="org.apache.hadoop.io.NullWritable")
+
+spark.stop()
+

+ 0 - 24
TensorFlow/Recommendation/WideAndDeep/preproc/sort_csv.sh

@@ -1,24 +0,0 @@
-#!/bin/bash
-
-# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-input=$1
-output=$2
-mkdir -p ${output}
-for f in ${input}/*
-do
-  filename=${f##*/}
-  sort -n -k2 -t ',' $f > ${output}/${filename}
-done

+ 1 - 1
TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp16_1gpu.sh

@@ -17,4 +17,4 @@
 set -x
 set -e
 
-python -m trainer.task --model_dir . --transformed_metadata_path "/outbrain/tfrecords" --eval_data_pattern "/outbrain/tfrecords/eval_*" --train_data_pattern "/outbrain/tfrecords/train_*" --save_checkpoints_secs 600 --linear_l1_regularization 0.0 --linear_l2_regularization 0.0 --linear_learning_rate 0.2 --deep_l1_regularization 0.0 --deep_l2_regularization 0.0 --deep_learning_rate 1.0 --deep_dropout 0.0 --deep_hidden_units 1024 1024 1024 1024 1024 --prebatch_size 4096 --global_batch_size 131072 --eval_batch_size 32768 --eval_steps 8 --model_type wide_n_deep --gpu --benchmark --amp
+python -m trainer.task --model_dir . --benchmark_warmup_steps 50 --benchmark_steps 200 --gpu --benchmark --amp

+ 1 - 1
TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp16_4gpu.sh

@@ -17,4 +17,4 @@
 set -x
 set -e
 
-mpiexec --allow-run-as-root --bind-to socket -np 4 python -m trainer.task --hvd --model_dir . --transformed_metadata_path "/outbrain/tfrecords" --eval_data_pattern "/outbrain/tfrecords/eval_*" --train_data_pattern "/outbrain/tfrecords/train_*" --save_checkpoints_secs 600 --linear_l1_regularization 0.0 --linear_l2_regularization 0.0 --linear_learning_rate 0.2 --deep_l1_regularization 0.0 --deep_l2_regularization 0.0 --deep_learning_rate 1.0 --deep_dropout 0.0 --deep_hidden_units 1024 1024 1024 1024 1024 --prebatch_size 4096 --global_batch_size 131072 --eval_batch_size 32768 --eval_steps 8 --model_type wide_n_deep --gpu --benchmark --amp
+mpiexec --allow-run-as-root --bind-to socket -np 4 python -m trainer.task --hvd --model_dir . --benchmark_warmup_steps 50 --benchmark_steps 200 --gpu --benchmark --amp

+ 1 - 1
TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp16_8gpu.sh

@@ -17,4 +17,4 @@
 set -x
 set -e
 
-mpiexec --allow-run-as-root --bind-to socket -np 8 python -m trainer.task --hvd --model_dir . --transformed_metadata_path "/outbrain/tfrecords" --eval_data_pattern "/outbrain/tfrecords/eval_*" --train_data_pattern "/outbrain/tfrecords/train_*" --save_checkpoints_secs 600 --linear_l1_regularization 0.0 --linear_l2_regularization 0.0 --linear_learning_rate 0.2 --deep_l1_regularization 0.0 --deep_l2_regularization 0.0 --deep_learning_rate 1.0 --deep_dropout 0.0 --deep_hidden_units 1024 1024 1024 1024 1024 --prebatch_size 4096 --global_batch_size 131072 --eval_batch_size 32768 --eval_steps 8 --model_type wide_n_deep --gpu --benchmark --amp
+mpiexec --allow-run-as-root --bind-to socket -np 8 python -m trainer.task --hvd --model_dir . --benchmark_warmup_steps 50 --benchmark_steps 200 --gpu --benchmark --amp

+ 1 - 1
TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp32_1gpu.sh

@@ -17,4 +17,4 @@
 set -x
 set -e
 
-python -m trainer.task --model_dir . --transformed_metadata_path "/outbrain/tfrecords" --eval_data_pattern "/outbrain/tfrecords/eval_*" --train_data_pattern "/outbrain/tfrecords/train_*" --save_checkpoints_secs 600 --linear_l1_regularization 0.0 --linear_l2_regularization 0.0 --linear_learning_rate 0.2 --deep_l1_regularization 0.0 --deep_l2_regularization 0.0 --deep_learning_rate 1.0 --deep_dropout 0.0 --deep_hidden_units 1024 1024 1024 1024 1024 --prebatch_size 4096 --global_batch_size 131072 --eval_batch_size 32768 --eval_steps 8 --model_type wide_n_deep --gpu --benchmark
+python -m trainer.task --model_dir . --benchmark_warmup_steps 50 --benchmark_steps 200 --gpu --benchmark

+ 1 - 1
TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp32_4gpu.sh

@@ -17,4 +17,4 @@
 set -x
 set -e
 
-mpiexec --allow-run-as-root --bind-to socket -np 4 python -m trainer.task --hvd --model_dir . --transformed_metadata_path "/outbrain/tfrecords" --eval_data_pattern "/outbrain/tfrecords/eval_*" --train_data_pattern "/outbrain/tfrecords/train_*" --save_checkpoints_secs 600 --linear_l1_regularization 0.0 --linear_l2_regularization 0.0 --linear_learning_rate 0.2 --deep_l1_regularization 0.0 --deep_l2_regularization 0.0 --deep_learning_rate 1.0 --deep_dropout 0.0 --deep_hidden_units 1024 1024 1024 1024 1024 --prebatch_size 4096 --global_batch_size 131072 --eval_batch_size 32768 --eval_steps 8 --model_type wide_n_deep --gpu --benchmark
+mpiexec --allow-run-as-root --bind-to socket -np 4 python -m trainer.task --hvd --model_dir . --benchmark_warmup_steps 50 --benchmark_steps 200 --gpu --benchmark

+ 1 - 1
TensorFlow/Recommendation/WideAndDeep/scripts/benchmark_training_fp32_8gpu.sh

@@ -17,4 +17,4 @@
 set -x
 set -e
 
-mpiexec --allow-run-as-root --bind-to socket -np 8 python -m trainer.task --hvd --model_dir . --transformed_metadata_path "/outbrain/tfrecords" --eval_data_pattern "/outbrain/tfrecords/eval_*" --train_data_pattern "/outbrain/tfrecords/train_*" --save_checkpoints_secs 600 --linear_l1_regularization 0.0 --linear_l2_regularization 0.0 --linear_learning_rate 0.2 --deep_l1_regularization 0.0 --deep_l2_regularization 0.0 --deep_learning_rate 1.0 --deep_dropout 0.0 --deep_hidden_units 1024 1024 1024 1024 1024 --prebatch_size 4096 --global_batch_size 131072 --eval_batch_size 32768 --eval_steps 8 --model_type wide_n_deep --gpu --benchmark
+mpiexec --allow-run-as-root --bind-to socket -np 8 python -m trainer.task --hvd --model_dir . --benchmark_warmup_steps 50 --benchmark_steps 200 --gpu --benchmark

+ 12 - 31
TensorFlow/Recommendation/WideAndDeep/scripts/preproc.sh

@@ -21,34 +21,15 @@ else
   PREBATCH_SIZE=4096
 fi
 
-python preproc/preproc1.py
-python preproc/preproc2.py
-python preproc/preproc3.py
-
-export CUDA_VISIBLE_DEVICES=
-LOCAL_DATA_DIR=/outbrain/preprocessed
-LOCAL_DATA_TFRECORDS_DIR=/outbrain/tfrecords
-
-TRAIN_DIR=train_feature_vectors_integral_eval.csv
-VALID_DIR=validation_feature_vectors_integral.csv
-TRAIN_IMPUTED_DIR=train_feature_vectors_integral_eval_imputed.csv
-VALID_IMPUTED_DIR=validation_feature_vectors_integral_imputed.csv
-HEADER_PATH=train_feature_vectors_integral_eval.csv.header
-
-cd ${LOCAL_DATA_DIR}
-python /wd/preproc/csv_data_imputation.py --num_workers 40 \
-  --train_files_pattern 'train_feature_vectors_integral_eval.csv/part-*' \
-  --valid_files_pattern 'validation_feature_vectors_integral.csv/part-*' \
-  --train_dst_dir ${TRAIN_IMPUTED_DIR} \
-  --valid_dst_dir ${VALID_IMPUTED_DIR} \
-  --header_path ${HEADER_PATH}
-cd -
-
-time preproc/sort_csv.sh ${LOCAL_DATA_DIR}/${VALID_IMPUTED_DIR} ${LOCAL_DATA_DIR}/${VALID_IMPUTED_DIR}_sorted
-
-python dataflow_preprocess.py \
-  --eval_data "${LOCAL_DATA_DIR}/${VALID_IMPUTED_DIR}_sorted/part-*" \
-  --training_data "${LOCAL_DATA_DIR}/${TRAIN_IMPUTED_DIR}/part-*" \
-  --output_dir ${LOCAL_DATA_TFRECORDS_DIR} \
-  --batch_size ${PREBATCH_SIZE}
-
+echo "Starting preprocessing 1/4..."
+time python -m preproc.preproc1
+echo "Preprocessing 1/4 done.\n"
+echo "Starting preprocessing 2/4..."
+time python -m preproc.preproc2
+echo "Preprocessing 2/4 done.\n"
+echo "Starting preprocessing 3/4..."
+time python -m preproc.preproc3
+echo "Preprocessing 3/4 done.\n"
+echo "Starting preprocessing 4/4..."
+time python -m preproc.preproc4 --prebatch_size ${PREBATCH_SIZE}
+echo "Preprocessing 4/4 done.\n"

+ 100 - 0
TensorFlow/Recommendation/WideAndDeep/trainer/dataset_utils.py

@@ -0,0 +1,100 @@
+# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==============================================================================
+import tensorflow as tf
+from tensorflow.compat.v1 import logging
+
+def separate_input_fn(
+    tf_transform_output,
+    transformed_examples,
+    create_batches,
+    mode,
+    reader_num_threads=1,
+    parser_num_threads=2,
+    shuffle_buffer_size=10,
+    prefetch_buffer_size=1,
+    print_display_ids=False):
+  """
+  A version of the training + eval input function that uses dataset operations.
+  (For more straightforward tweaking.)
+  """
+
+  logging.warn('Shuffle buffer size: {}'.format(shuffle_buffer_size))
+
+  filenames_dataset = tf.data.Dataset.list_files(
+    transformed_examples,
+    shuffle=False
+  )
+
+  raw_dataset = tf.data.TFRecordDataset(
+    filenames_dataset,
+    num_parallel_reads=reader_num_threads
+  )
+
+  if mode == tf.estimator.ModeKeys.TRAIN and shuffle_buffer_size > 1:
+    raw_dataset = raw_dataset.shuffle(shuffle_buffer_size)
+
+  raw_dataset = raw_dataset.repeat()
+  raw_dataset = raw_dataset.batch(create_batches)
+
+  # this function appears to require each element to be a vector
+  # batching should mean that this is always true
+  # one possible alternative for any problematic case is tf.io.parse_single_example
+  parsed_dataset = raw_dataset.apply(
+    tf.data.experimental.parse_example_dataset(
+      tf_transform_output.transformed_feature_spec(),
+      num_parallel_calls=parser_num_threads
+    )
+  )
+
+  # a function mapped over each dataset element
+  # will separate label, ensure that elements are two-dimensional (batch size, elements per record)
+  # adds print_display_ids injection
+  def consolidate_batch(elem):
+    label = elem.pop('label')
+    reshaped_label = tf.reshape(label, [-1, label.shape[-1]])
+    reshaped_elem = {
+      key: tf.reshape(elem[key], [-1, elem[key].shape[-1]])
+      for key in elem
+    }
+
+    if print_display_ids:
+      elem['ad_id'] = tf.Print(input_=elem['ad_id'],
+                               data=[tf.reshape(elem['display_id'], [-1])],
+                               message='display_id', name='print_display_ids',
+                               summarize=FLAGS.eval_batch_size)
+      elem['ad_id'] = tf.Print(input_=elem['ad_id'],
+                               data=[tf.reshape(elem['ad_id'], [-1])],
+                               message='ad_id', name='print_ad_ids',
+                               summarize=FLAGS.eval_batch_size)
+      elem['ad_id'] = tf.Print(input_=elem['ad_id'],
+                               data=[tf.reshape(elem['is_leak'], [-1])],
+                               message='is_leak', name='print_is_leak',
+                               summarize=FLAGS.eval_batch_size)
+
+    return reshaped_elem, reshaped_label
+
+  if mode == tf.estimator.ModeKeys.EVAL:
+    parsed_dataset = parsed_dataset.map(
+      consolidate_batch,
+      num_parallel_calls=None
+    )
+  else:
+    parsed_dataset = parsed_dataset.map(
+      consolidate_batch,
+      num_parallel_calls=parser_num_threads
+    )
+    parsed_dataset = parsed_dataset.prefetch(prefetch_buffer_size)
+
+  return parsed_dataset

+ 5 - 71
TensorFlow/Recommendation/WideAndDeep/trainer/task.py

@@ -24,6 +24,7 @@ import tensorflow as tf
 import tensorflow_transform as tft
 
 from trainer import features
+from trainer import dataset_utils
 from utils.hooks.benchmark_hooks import BenchmarkLoggingHook
 
 import horovod.tensorflow as hvd
@@ -59,7 +60,7 @@ def create_parser():
     help='Pattern of training file names. For example if training files are train_000.tfrecord, \
     train_001.tfrecord then --train_data_pattern is train_*',
     type=str,
-    default='/outbrain/tfrecords/train_*',
+    default='/outbrain/tfrecords/train/part*',
     nargs='+'
   )
   parser.add_argument(
@@ -67,7 +68,7 @@ def create_parser():
     help='Pattern of eval file names. For example if eval files are eval_000.tfrecord, \
     eval_001.tfrecord then --eval_data_pattern is eval_*',
     type=str,
-    default='/outbrain/tfrecords/eval_*',
+    default='/outbrain/tfrecords/eval/part*',
     nargs='+'
   )
   parser.add_argument(
@@ -331,70 +332,6 @@ def get_feature_columns(use_all_columns=False, force_subset=None):
   tf.compat.v1.logging.warn('wide&deep intersection: {}'.format(len(set(wide_columns).intersection(set(deep_columns)))))
   return wide_columns, deep_columns
 
-def separate_input_fn(
-    tf_transform_output,
-    transformed_examples,
-    create_batches,
-    mode,
-    reader_num_threads=1,
-    parser_num_threads=2,
-    shuffle_buffer_size=10,
-    prefetch_buffer_size=1,
-    print_display_ids=False):
-  """
-  A version of the training + eval input function that uses dataset operations.
-  (For more straightforward tweaking.)
-  """
-  
-  tf.compat.v1.logging.warn('Shuffle buffer size: {}'.format(shuffle_buffer_size))
-
-  filenames_dataset = tf.data.Dataset.list_files(transformed_examples, shuffle=False)
-  
-  raw_dataset = tf.data.TFRecordDataset(filenames_dataset, 
-            num_parallel_reads=reader_num_threads)
-  
-  raw_dataset = raw_dataset.shuffle(shuffle_buffer_size) \
-                  if (mode==tf.estimator.ModeKeys.TRAIN and shuffle_buffer_size > 1) \
-                  else raw_dataset
-  raw_dataset = raw_dataset.repeat()
-  raw_dataset = raw_dataset.batch(create_batches)
-  
-  # this function appears to require each element to be a vector
-  # batching should mean that this is always true
-  # one possible alternative for any problematic case is tf.io.parse_single_example
-  parsed_dataset = raw_dataset.apply(tf.data.experimental.parse_example_dataset(
-            tf_transform_output.transformed_feature_spec(), 
-            num_parallel_calls=parser_num_threads))
-  
-  # a function mapped over each dataset element
-  # will separate label, ensure that elements are two-dimensional (batch size, elements per record)
-  # adds print_display_ids injection
-  def consolidate_batch(elem):
-    label = elem.pop('label')
-    reshaped_label = tf.reshape(label, [-1, label.shape[-1]])
-    reshaped_elem = {key: tf.reshape(elem[key], [-1, elem[key].shape[-1]]) for key in elem}
-    if print_display_ids:
-      elem['ad_id'] = tf.Print(input_=elem['ad_id'], 
-        data=[tf.reshape(elem['display_id'], [-1])], 
-        message='display_id', name='print_display_ids', summarize=FLAGS.eval_batch_size)
-      elem['ad_id'] = tf.Print(input_=elem['ad_id'], 
-        data=[tf.reshape(elem['ad_id'], [-1])],
-        message='ad_id', name='print_ad_ids', summarize=FLAGS.eval_batch_size)
-      elem['ad_id'] = tf.Print(input_=elem['ad_id'], 
-        data=[tf.reshape(elem['is_leak'], [-1])],
-        message='is_leak', name='print_is_leak', summarize=FLAGS.eval_batch_size)
-
-    return reshaped_elem, reshaped_label
-  
-  if mode == tf.estimator.ModeKeys.EVAL:
-    parsed_dataset = parsed_dataset.map(consolidate_batch, num_parallel_calls=None)
-  else:
-    parsed_dataset = parsed_dataset.map(consolidate_batch, 
-      num_parallel_calls=parser_num_threads)
-    parsed_dataset = parsed_dataset.prefetch(prefetch_buffer_size)
-
-  return parsed_dataset
-
 # rough approximation for MAP metric for measuring ad quality
 # roughness comes from batch sizes falling between groups of
 # display ids
@@ -694,11 +631,8 @@ def main(FLAGS):
     wide_optimizer = hvd.DistributedOptimizer(wide_optimizer)
     deep_optimizer = hvd.DistributedOptimizer(deep_optimizer)
 
-  stats_filename = os.path.join(FLAGS.transformed_metadata_path, 'stats.json')
-  embed_columns = None
-  
   # input functions to read data from disk
-  train_input_fn = lambda : separate_input_fn(
+  train_input_fn = lambda : dataset_utils.separate_input_fn(
     tf_transform_output,
     FLAGS.train_data_pattern,
     create_batches,
@@ -708,7 +642,7 @@ def main(FLAGS):
     shuffle_buffer_size=int(FLAGS.shuffle_percentage*create_batches),
     prefetch_buffer_size=FLAGS.prefetch_buffer_size,
     print_display_ids=FLAGS.print_display_ids)
-  eval_input_fn = lambda : separate_input_fn(
+  eval_input_fn = lambda : dataset_utils.separate_input_fn(
     tf_transform_output,
     FLAGS.eval_data_pattern,
     (FLAGS.eval_batch_size // FLAGS.prebatch_size),