|
|
@@ -21,13 +21,9 @@ OUTPUT_BUCKET_FOLDER = "/outbrain/preprocessed/"
|
|
|
DATA_BUCKET_FOLDER = "/outbrain/orig/"
|
|
|
SPARK_TEMP_FOLDER = "/outbrain/spark-temp/"
|
|
|
|
|
|
-
|
|
|
-from IPython.display import display
|
|
|
-
|
|
|
-
|
|
|
-from pyspark.sql.types import *
|
|
|
+from pyspark.sql.types import IntegerType, StringType, StructType, StructField, TimestampType, FloatType, ArrayType, MapType
|
|
|
import pyspark.sql.functions as F
|
|
|
-from pyspark.ml.linalg import Vectors, SparseVector, VectorUDT
|
|
|
+from pyspark.ml.linalg import SparseVector, VectorUDT
|
|
|
|
|
|
from pyspark.context import SparkContext, SparkConf
|
|
|
from pyspark.sql.session import SparkSession
|
|
|
@@ -38,17 +34,10 @@ sc = SparkContext(conf=conf)
|
|
|
spark = SparkSession(sc)
|
|
|
|
|
|
import numpy as np
|
|
|
-import scipy.sparse
|
|
|
|
|
|
import math
|
|
|
import datetime
|
|
|
import time
|
|
|
-import itertools
|
|
|
-
|
|
|
-import pickle
|
|
|
-
|
|
|
-import random
|
|
|
-random.seed(42)
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
@@ -461,7 +450,7 @@ else:
|
|
|
|
|
|
# # Training models
|
|
|
def is_null(value):
|
|
|
- return value == None or len(str(value).strip()) == 0
|
|
|
+ return value is None or len(str(value).strip()) == 0
|
|
|
|
|
|
LESS_SPECIAL_CAT_VALUE = 'less'
|
|
|
def get_category_field_values_counts(field, df, min_threshold=10):
|
|
|
@@ -490,7 +479,7 @@ len(doc_entity_id_values_counts)
|
|
|
|
|
|
# ## Processing average CTR by categories
|
|
|
def get_percentiles(df, field, quantiles_levels=None, max_error_rate=0.0):
|
|
|
- if quantiles_levels == None:
|
|
|
+ if quantiles_levels is None:
|
|
|
quantiles_levels = np.arange(0.0, 1.1, 0.1).tolist()
|
|
|
quantiles = df.approxQuantile(field, quantiles_levels, max_error_rate)
|
|
|
return dict(zip(quantiles_levels, quantiles))
|
|
|
@@ -896,7 +885,7 @@ def get_days_diff(newer_timestamp, older_timestamp):
|
|
|
return days_diff
|
|
|
|
|
|
def get_time_decay_factor(timestamp, timestamp_ref=None, alpha=0.001):
|
|
|
- if timestamp_ref == None:
|
|
|
+ if timestamp_ref is None:
|
|
|
timestamp_ref = time.time()
|
|
|
|
|
|
days_diff = get_days_diff(timestamp_ref, timestamp)
|
|
|
@@ -1146,7 +1135,7 @@ def cosine_similarity_dicts(dict1, dict2):
|
|
|
return sum_common_aspects / (dict1_norm * dict2_norm), intersections
|
|
|
|
|
|
def cosine_similarity_user_docs_aspects(user_aspect_profile, doc_aspect_ids, doc_aspects_confidence, aspect_docs_counts):
|
|
|
- if user_aspect_profile==None or len(user_aspect_profile) == 0 or doc_aspect_ids == None or len(doc_aspect_ids) == 0:
|
|
|
+ if user_aspect_profile is None or len(user_aspect_profile) == 0 or doc_aspect_ids is None or len(doc_aspect_ids) == 0:
|
|
|
return None, None
|
|
|
|
|
|
doc_aspects = dict(zip(doc_aspect_ids, doc_aspects_confidence))
|
|
|
@@ -1170,7 +1159,6 @@ def cosine_similarity_user_docs_aspects(user_aspect_profile, doc_aspect_ids, doc
|
|
|
random_error = math.pow(len(doc_aspects) / float(len(aspect_docs_counts)),
|
|
|
intersections) * math.pow(len(user_aspect_profile) / float(len(aspect_docs_counts)),
|
|
|
intersections)
|
|
|
- confidence = 1.0 - random_error
|
|
|
else:
|
|
|
#P(A not intersect B) = 1 - P(A intersect B)
|
|
|
random_error = 1 - ((len(doc_aspects) / float(len(aspect_docs_counts))) *
|
|
|
@@ -1183,8 +1171,8 @@ def cosine_similarity_user_docs_aspects(user_aspect_profile, doc_aspect_ids, doc
|
|
|
def cosine_similarity_doc_event_doc_ad_aspects(doc_event_aspect_ids, doc_event_aspects_confidence,
|
|
|
doc_ad_aspect_ids, doc_ad_aspects_confidence,
|
|
|
aspect_docs_counts):
|
|
|
- if doc_event_aspect_ids == None or len(doc_event_aspect_ids) == 0 \
|
|
|
- or doc_ad_aspect_ids == None or len(doc_ad_aspect_ids) == 0:
|
|
|
+ if doc_event_aspect_ids is None or len(doc_event_aspect_ids) == 0 \
|
|
|
+ or doc_ad_aspect_ids is None or len(doc_ad_aspect_ids) == 0:
|
|
|
return None, None
|
|
|
|
|
|
doc_event_aspects = dict(zip(doc_event_aspect_ids, doc_event_aspects_confidence))
|
|
|
@@ -1210,7 +1198,6 @@ def cosine_similarity_doc_event_doc_ad_aspects(doc_event_aspect_ids, doc_event_a
|
|
|
random_error = math.pow(len(doc_event_aspect_ids) / float(len(aspect_docs_counts)),
|
|
|
intersections) * math.pow(len(doc_ad_aspect_ids) / float(len(aspect_docs_counts)),
|
|
|
intersections)
|
|
|
- confidence = 1.0 - random_error
|
|
|
else:
|
|
|
#P(A not intersect B) = 1 - P(A intersect B)
|
|
|
random_error = 1 - ((len(doc_event_aspect_ids) / float(len(aspect_docs_counts))) *
|