Python scripts for each major use case
Suggest changes
The following three Python scripts correspond to the three major use cases tested. First is sentiment_analysis_sparknlp.py
.
# TR-4570 Refresh NLP testing by Rick Huang from sys import argv import os import sparknlp import pyspark.sql.functions as F from sparknlp import Finisher from pyspark.ml import Pipeline from sparknlp.base import * from sparknlp.annotator import * from sparknlp.pretrained import PretrainedPipeline from sparknlp import Finisher # Start Spark Session with Spark NLP spark = sparknlp.start() print("Spark NLP version:") print(sparknlp.version()) print("Apache Spark version:") print(spark.version) spark = sparknlp.SparkSession.builder \ .master("yarn") \ .appName("test_hdfs_read_write") \ .config("spark.executor.cores", "1") \ .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.3")\ .config('spark.executor.memory', '5gb') \ .config('spark.executor.memoryOverhead','1000')\ .config('spark.driver.memoryOverhead','1000')\ .config("spark.sql.shuffle.partitions", "480")\ .getOrCreate() sc = spark.sparkContext from pyspark.sql import SQLContext sql = SQLContext(sc) sqlContext = SQLContext(sc) # Download pre-trained pipelines & sequence classifier explain_pipeline_model = PretrainedPipeline('explain_document_dl', lang='en').model#pipeline_sa = PretrainedPipeline("classifierdl_bertwiki_finance_sentiment_pipeline", lang="en") # pipeline_finbert = BertForSequenceClassification.loadSavedModel('/sparkusecase/bert_sequence_classifier_finbert_en_3', spark) sequenceClassifier = BertForSequenceClassification \ .pretrained('bert_sequence_classifier_finbert', 'en') \ .setInputCols(['token', 'document']) \ .setOutputCol('class') \ .setCaseSensitive(True) \ .setMaxSentenceLength(512) def process_sentence_df(data): # Pre-process: begin print("1. Begin DataFrame pre-processing...\n") print(f"\n\t2. Attaching DocumentAssembler Transformer to the pipeline") documentAssembler = DocumentAssembler() \ .setInputCol("text") \ .setOutputCol("document") \ .setCleanupMode("inplace_full") #.setCleanupMode("shrink", "inplace_full") doc_df = documentAssembler.transform(data) doc_df.printSchema() doc_df.show(truncate=50) # Pre-process: get rid of blank lines clean_df = doc_df.withColumn("tmp", F.explode("document")) \ .select("tmp.result").where("tmp.end != -1").withColumnRenamed("result", "text").dropna() print("[OK!] DataFrame after initial cleanup:\n") clean_df.printSchema() clean_df.show(truncate=80) # for FinBERT tokenizer = Tokenizer() \ .setInputCols(['document']) \ .setOutputCol('token') print(f"\n\t3. Attaching Tokenizer Annotator to the pipeline") pipeline_finbert = Pipeline(stages=[ documentAssembler, tokenizer, sequenceClassifier ]) # Use Finisher() & construct PySpark ML pipeline finisher = Finisher().setInputCols(["token", "lemma", "pos", "entities"]) print(f"\n\t4. Attaching Finisher Transformer to the pipeline") pipeline_ex = Pipeline() \ .setStages([ explain_pipeline_model, finisher ]) print("\n\t\t\t ---- Pipeline Built Successfully ----") # Loading pipelines to annotate #result_ex_df = pipeline_ex.transform(clean_df) ex_model = pipeline_ex.fit(clean_df) annotations_finished_ex_df = ex_model.transform(clean_df) # result_sa_df = pipeline_sa.transform(clean_df) result_finbert_df = pipeline_finbert.fit(clean_df).transform(clean_df) print("\n\t\t\t ----Document Explain, Sentiment Analysis & FinBERT Pipeline Fitted Successfully ----") # Check the result entities print("[OK!] Simple explain ML pipeline result:\n") annotations_finished_ex_df.printSchema() annotations_finished_ex_df.select('text', 'finished_entities').show(truncate=False) # Check the result sentiment from FinBERT print("[OK!] Sentiment Analysis FinBERT pipeline result:\n") result_finbert_df.printSchema() result_finbert_df.select('text', 'class.result').show(80, False) sentiment_stats(result_finbert_df) return def sentiment_stats(finbert_df): result_df = finbert_df.select('text', 'class.result') sa_df = result_df.select('result') sa_df.groupBy('result').count().show() # total_lines = result_clean_df.count() # num_neutral = result_clean_df.where(result_clean_df.result == ['neutral']).count() # num_positive = result_clean_df.where(result_clean_df.result == ['positive']).count() # num_negative = result_clean_df.where(result_clean_df.result == ['negative']).count() # print(f"\nRatio of neutral sentiment = {num_neutral/total_lines}") # print(f"Ratio of positive sentiment = {num_positive / total_lines}") # print(f"Ratio of negative sentiment = {num_negative / total_lines}\n") return def process_input_file(file_name): # Turn input file to Spark DataFrame print("START processing input file...") data_df = spark.read.text(file_name) data_df.show() # rename first column 'text' for sparknlp output_df = data_df.withColumnRenamed("value", "text").dropna() output_df.printSchema() return output_dfdef process_local_dir(directory): filelist = [] for subdir, dirs, files in os.walk(directory): for filename in files: filepath = subdir + os.sep + filename print("[OK!] Will process the following files:") if filepath.endswith(".txt"): print(filepath) filelist.append(filepath) return filelist def process_local_dir_or_file(dir_or_file): numfiles = 0 if os.path.isfile(dir_or_file): input_df = process_input_file(dir_or_file) print("Obtained input_df.") process_sentence_df(input_df) print("Processed input_df") numfiles += 1 else: filelist = process_local_dir(dir_or_file) for file in filelist: input_df = process_input_file(file) process_sentence_df(input_df) numfiles += 1 return numfiles def process_hdfs_dir(dir_name): # Turn input files to Spark DataFrame print("START processing input HDFS directory...") data_df = spark.read.option("recursiveFileLookup", "true").text(dir_name) data_df.show() print("[DEBUG] total lines in data_df = ", data_df.count()) # rename first column 'text' for sparknlp output_df = data_df.withColumnRenamed("value", "text").dropna() print("[DEBUG] output_df looks like: \n") output_df.show(40, False) print("[DEBUG] HDFS dir resulting data_df schema: \n") output_df.printSchema() process_sentence_df(output_df) print("Processed HDFS directory: ", dir_name) returnif __name__ == '__main__': try: if len(argv) == 2: print("Start processing input...\n") except: print("[ERROR] Please enter input text file or path to process!\n") exit(1) # This is for local file, not hdfs: numfiles = process_local_dir_or_file(str(argv[1])) # For HDFS single file & directory: input_df = process_input_file(str(argv[1])) print("Obtained input_df.") process_sentence_df(input_df) print("Processed input_df") numfiles += 1 # For HDFS directory of subdirectories of files: input_parse_list = str(argv[1]).split('/') print(input_parse_list) if input_parse_list[-2:-1] == ['Transcripts']: print("Start processing HDFS directory: ", str(argv[1])) process_hdfs_dir(str(argv[1])) print(f"[OK!] All done. Number of files processed = {numfiles}")
The second script is keras_spark_horovod_rossmann_estimator.py
.
# Copyright 2022 NetApp, Inc. # Authored by Rick Huang # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== # The below code was modified from: https://www.kaggle.com/c/rossmann-store-sales import argparse import datetime import os import sys from distutils.version import LooseVersion import pyspark.sql.types as T import pyspark.sql.functions as F from pyspark import SparkConf, Row from pyspark.sql import SparkSession import tensorflow as tf import tensorflow.keras.backend as K from tensorflow.keras.layers import Input, Embedding, Concatenate, Dense, Flatten, Reshape, BatchNormalization, Dropout import horovod.spark.keras as hvd from horovod.spark.common.backend import SparkBackend from horovod.spark.common.store import Store from horovod.tensorflow.keras.callbacks import BestModelCheckpoint parser = argparse.ArgumentParser(description='Horovod Keras Spark Rossmann Estimator Example', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--master', help='spark cluster to use for training. If set to None, uses current default cluster. Cluster' 'should be set up to provide a Spark task per multiple CPU cores, or per GPU, e.g. by' 'supplying `-c <NUM_GPUS>` in Spark Standalone mode') parser.add_argument('--num-proc', type=int, help='number of worker processes for training, default: `spark.default.parallelism`') parser.add_argument('--learning_rate', type=float, default=0.0001, help='initial learning rate') parser.add_argument('--batch-size', type=int, default=100, help='batch size') parser.add_argument('--epochs', type=int, default=100, help='number of epochs to train') parser.add_argument('--sample-rate', type=float, help='desired sampling rate. Useful to set to low number (e.g. 0.01) to make sure that ' 'end-to-end process works') parser.add_argument('--data-dir', default='file://' + os.getcwd(), help='location of data on local filesystem (prefixed with file://) or on HDFS') parser.add_argument('--local-submission-csv', default='submission.csv', help='output submission predictions CSV') parser.add_argument('--local-checkpoint-file', default='checkpoint', help='model checkpoint') parser.add_argument('--work-dir', default='/tmp', help='temporary working directory to write intermediate files (prefix with hdfs:// to use HDFS)') if __name__ == '__main__': args = parser.parse_args() # ================ # # DATA PREPARATION # # ================ # print('================') print('Data preparation') print('================') # Create Spark session for data preparation. conf = SparkConf() \ .setAppName('Keras Spark Rossmann Estimator Example') \ .set('spark.sql.shuffle.partitions', '480') \ .set("spark.executor.cores", "1") \ .set('spark.executor.memory', '5gb') \ .set('spark.executor.memoryOverhead','1000')\ .set('spark.driver.memoryOverhead','1000') if args.master: conf.setMaster(args.master) elif args.num_proc: conf.setMaster('local[{}]'.format(args.num_proc)) spark = SparkSession.builder.config(conf=conf).getOrCreate() train_csv = spark.read.csv('%s/train.csv' % args.data_dir, header=True) test_csv = spark.read.csv('%s/test.csv' % args.data_dir, header=True) store_csv = spark.read.csv('%s/store.csv' % args.data_dir, header=True) store_states_csv = spark.read.csv('%s/store_states.csv' % args.data_dir, header=True) state_names_csv = spark.read.csv('%s/state_names.csv' % args.data_dir, header=True) google_trend_csv = spark.read.csv('%s/googletrend.csv' % args.data_dir, header=True) weather_csv = spark.read.csv('%s/weather.csv' % args.data_dir, header=True) def expand_date(df): df = df.withColumn('Date', df.Date.cast(T.DateType())) return df \ .withColumn('Year', F.year(df.Date)) \ .withColumn('Month', F.month(df.Date)) \ .withColumn('Week', F.weekofyear(df.Date)) \ .withColumn('Day', F.dayofmonth(df.Date)) def prepare_google_trend(): # Extract week start date and state. google_trend_all = google_trend_csv \ .withColumn('Date', F.regexp_extract(google_trend_csv.week, '(.*?) -', 1)) \ .withColumn('State', F.regexp_extract(google_trend_csv.file, 'Rossmann_DE_(.*)', 1)) # Map state NI -> HB,NI to align with other data sources. google_trend_all = google_trend_all \ .withColumn('State', F.when(google_trend_all.State == 'NI', 'HB,NI').otherwise(google_trend_all.State)) # Expand dates. return expand_date(google_trend_all) def add_elapsed(df, cols): def add_elapsed_column(col, asc): def fn(rows): last_store, last_date = None, None for r in rows: if last_store != r.Store: last_store = r.Store last_date = r.Date if r[col]: last_date = r.Date fields = r.asDict().copy() fields[('After' if asc else 'Before') + col] = (r.Date - last_date).days yield Row(**fields) return fn df = df.repartition(df.Store) for asc in [False, True]: sort_col = df.Date.asc() if asc else df.Date.desc() rdd = df.sortWithinPartitions(df.Store.asc(), sort_col).rdd for col in cols: rdd = rdd.mapPartitions(add_elapsed_column(col, asc)) df = rdd.toDF() return df def prepare_df(df): num_rows = df.count() # Expand dates. df = expand_date(df) df = df \ .withColumn('Open', df.Open != '0') \ .withColumn('Promo', df.Promo != '0') \ .withColumn('StateHoliday', df.StateHoliday != '0') \ .withColumn('SchoolHoliday', df.SchoolHoliday != '0') # Merge in store information. store = store_csv.join(store_states_csv, 'Store') df = df.join(store, 'Store') # Merge in Google Trend information. google_trend_all = prepare_google_trend() df = df.join(google_trend_all, ['State', 'Year', 'Week']).select(df['*'], google_trend_all.trend) # Merge in Google Trend for whole Germany. google_trend_de = google_trend_all[google_trend_all.file == 'Rossmann_DE'].withColumnRenamed('trend', 'trend_de') df = df.join(google_trend_de, ['Year', 'Week']).select(df['*'], google_trend_de.trend_de) # Merge in weather. weather = weather_csv.join(state_names_csv, weather_csv.file == state_names_csv.StateName) df = df.join(weather, ['State', 'Date']) # Fix null values. df = df \ .withColumn('CompetitionOpenSinceYear', F.coalesce(df.CompetitionOpenSinceYear, F.lit(1900))) \ .withColumn('CompetitionOpenSinceMonth', F.coalesce(df.CompetitionOpenSinceMonth, F.lit(1))) \ .withColumn('Promo2SinceYear', F.coalesce(df.Promo2SinceYear, F.lit(1900))) \ .withColumn('Promo2SinceWeek', F.coalesce(df.Promo2SinceWeek, F.lit(1))) # Days & months competition was open, cap to 2 years. df = df.withColumn('CompetitionOpenSince', F.to_date(F.format_string('%s-%s-15', df.CompetitionOpenSinceYear, df.CompetitionOpenSinceMonth))) df = df.withColumn('CompetitionDaysOpen', F.when(df.CompetitionOpenSinceYear > 1900, F.greatest(F.lit(0), F.least(F.lit(360 * 2), F.datediff(df.Date, df.CompetitionOpenSince)))) .otherwise(0)) df = df.withColumn('CompetitionMonthsOpen', (df.CompetitionDaysOpen / 30).cast(T.IntegerType())) # Days & weeks of promotion, cap to 25 weeks. df = df.withColumn('Promo2Since', F.expr('date_add(format_string("%s-01-01", Promo2SinceYear), (cast(Promo2SinceWeek as int) - 1) * 7)')) df = df.withColumn('Promo2Days', F.when(df.Promo2SinceYear > 1900, F.greatest(F.lit(0), F.least(F.lit(25 * 7), F.datediff(df.Date, df.Promo2Since)))) .otherwise(0)) df = df.withColumn('Promo2Weeks', (df.Promo2Days / 7).cast(T.IntegerType())) # Check that we did not lose any rows through inner joins. assert num_rows == df.count(), 'lost rows in joins' return df def build_vocabulary(df, cols): vocab = {} for col in cols: values = [r[0] for r in df.select(col).distinct().collect()] col_type = type([x for x in values if x is not None][0]) default_value = col_type() vocab[col] = sorted(values, key=lambda x: x or default_value) return vocab def cast_columns(df, cols): for col in cols: df = df.withColumn(col, F.coalesce(df[col].cast(T.FloatType()), F.lit(0.0))) return df def lookup_columns(df, vocab): def lookup(mapping): def fn(v): return mapping.index(v) return F.udf(fn, returnType=T.IntegerType()) for col, mapping in vocab.items(): df = df.withColumn(col, lookup(mapping)(df[col])) return df if args.sample_rate: train_csv = train_csv.sample(withReplacement=False, fraction=args.sample_rate) test_csv = test_csv.sample(withReplacement=False, fraction=args.sample_rate) # Prepare data frames from CSV files. train_df = prepare_df(train_csv).cache() test_df = prepare_df(test_csv).cache() # Add elapsed times from holidays & promos, the data spanning training & test datasets. elapsed_cols = ['Promo', 'StateHoliday', 'SchoolHoliday'] elapsed = add_elapsed(train_df.select('Date', 'Store', *elapsed_cols) .unionAll(test_df.select('Date', 'Store', *elapsed_cols)), elapsed_cols) # Join with elapsed times. train_df = train_df \ .join(elapsed, ['Date', 'Store']) \ .select(train_df['*'], *[prefix + col for prefix in ['Before', 'After'] for col in elapsed_cols]) test_df = test_df \ .join(elapsed, ['Date', 'Store']) \ .select(test_df['*'], *[prefix + col for prefix in ['Before', 'After'] for col in elapsed_cols]) # Filter out zero sales. train_df = train_df.filter(train_df.Sales > 0) print('===================') print('Prepared data frame') print('===================') train_df.show() categorical_cols = [ 'Store', 'State', 'DayOfWeek', 'Year', 'Month', 'Day', 'Week', 'CompetitionMonthsOpen', 'Promo2Weeks', 'StoreType', 'Assortment', 'PromoInterval', 'CompetitionOpenSinceYear', 'Promo2SinceYear', 'Events', 'Promo', 'StateHoliday', 'SchoolHoliday' ] continuous_cols = [ 'CompetitionDistance', 'Max_TemperatureC', 'Mean_TemperatureC', 'Min_TemperatureC', 'Max_Humidity', 'Mean_Humidity', 'Min_Humidity', 'Max_Wind_SpeedKm_h', 'Mean_Wind_SpeedKm_h', 'CloudCover', 'trend', 'trend_de', 'BeforePromo', 'AfterPromo', 'AfterStateHoliday', 'BeforeStateHoliday', 'BeforeSchoolHoliday', 'AfterSchoolHoliday' ] all_cols = categorical_cols + continuous_cols # Select features. train_df = train_df.select(*(all_cols + ['Sales', 'Date'])).cache() test_df = test_df.select(*(all_cols + ['Id', 'Date'])).cache() # Build vocabulary of categorical columns. vocab = build_vocabulary(train_df.select(*categorical_cols) .unionAll(test_df.select(*categorical_cols)).cache(), categorical_cols) # Cast continuous columns to float & lookup categorical columns. train_df = cast_columns(train_df, continuous_cols + ['Sales']) train_df = lookup_columns(train_df, vocab) test_df = cast_columns(test_df, continuous_cols) test_df = lookup_columns(test_df, vocab) # Split into training & validation. # Test set is in 2015, use the same period in 2014 from the training set as a validation set. test_min_date = test_df.agg(F.min(test_df.Date)).collect()[0][0] test_max_date = test_df.agg(F.max(test_df.Date)).collect()[0][0] one_year = datetime.timedelta(365) train_df = train_df.withColumn('Validation', (train_df.Date > test_min_date - one_year) & (train_df.Date <= test_max_date - one_year)) # Determine max Sales number. max_sales = train_df.agg(F.max(train_df.Sales)).collect()[0][0] # Convert Sales to log domain train_df = train_df.withColumn('Sales', F.log(train_df.Sales)) print('===================================') print('Data frame with transformed columns') print('===================================') train_df.show() print('================') print('Data frame sizes') print('================') train_rows = train_df.filter(~train_df.Validation).count() val_rows = train_df.filter(train_df.Validation).count() test_rows = test_df.count() print('Training: %d' % train_rows) print('Validation: %d' % val_rows) print('Test: %d' % test_rows) # ============== # # MODEL TRAINING # # ============== # print('==============') print('Model training') print('==============') def exp_rmspe(y_true, y_pred): """Competition evaluation metric, expects logarithic inputs.""" pct = tf.square((tf.exp(y_true) - tf.exp(y_pred)) / tf.exp(y_true)) # Compute mean excluding stores with zero denominator. x = tf.reduce_sum(tf.where(y_true > 0.001, pct, tf.zeros_like(pct))) y = tf.reduce_sum(tf.where(y_true > 0.001, tf.ones_like(pct), tf.zeros_like(pct))) return tf.sqrt(x / y) def act_sigmoid_scaled(x): """Sigmoid scaled to logarithm of maximum sales scaled by 20%.""" return tf.nn.sigmoid(x) * tf.math.log(max_sales) * 1.2 CUSTOM_OBJECTS = {'exp_rmspe': exp_rmspe, 'act_sigmoid_scaled': act_sigmoid_scaled} # Disable GPUs when building the model to prevent memory leaks if LooseVersion(tf.__version__) >= LooseVersion('2.0.0'): # See https://github.com/tensorflow/tensorflow/issues/33168 os.environ['CUDA_VISIBLE_DEVICES'] = '-1' else: K.set_session(tf.Session(config=tf.ConfigProto(device_count={'GPU': 0}))) # Build the model. inputs = {col: Input(shape=(1,), name=col) for col in all_cols} embeddings = [Embedding(len(vocab[col]), 10, input_length=1, name='emb_' + col)(inputs[col]) for col in categorical_cols] continuous_bn = Concatenate()([Reshape((1, 1), name='reshape_' + col)(inputs[col]) for col in continuous_cols]) continuous_bn = BatchNormalization()(continuous_bn) x = Concatenate()(embeddings + [continuous_bn]) x = Flatten()(x) x = Dense(1000, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.00005))(x) x = Dense(1000, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.00005))(x) x = Dense(1000, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.00005))(x) x = Dense(500, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.00005))(x) x = Dropout(0.5)(x) output = Dense(1, activation=act_sigmoid_scaled)(x) model = tf.keras.Model([inputs[f] for f in all_cols], output) model.summary() opt = tf.keras.optimizers.Adam(lr=args.learning_rate, epsilon=1e-3) # Checkpoint callback to specify options for the returned Keras model ckpt_callback = BestModelCheckpoint(monitor='val_loss', mode='auto', save_freq='epoch') # Horovod: run training. store = Store.create(args.work_dir) backend = SparkBackend(num_proc=args.num_proc, stdout=sys.stdout, stderr=sys.stderr, prefix_output_with_timestamp=True) keras_estimator = hvd.KerasEstimator(backend=backend, store=store, model=model, optimizer=opt, loss='mae', metrics=[exp_rmspe], custom_objects=CUSTOM_OBJECTS, feature_cols=all_cols, label_cols=['Sales'], validation='Validation', batch_size=args.batch_size, epochs=args.epochs, verbose=2, checkpoint_callback=ckpt_callback) keras_model = keras_estimator.fit(train_df).setOutputCols(['Sales_output']) history = keras_model.getHistory() best_val_rmspe = min(history['val_exp_rmspe']) print('Best RMSPE: %f' % best_val_rmspe) # Save the trained model. keras_model.save(args.local_checkpoint_file) print('Written checkpoint to %s' % args.local_checkpoint_file) # ================ # # FINAL PREDICTION # # ================ # print('================') print('Final prediction') print('================') pred_df=keras_model.transform(test_df) pred_df.printSchema() pred_df.show(5) # Convert from log domain to real Sales numbers pred_df=pred_df.withColumn('Sales_pred', F.exp(pred_df.Sales_output)) submission_df = pred_df.select(pred_df.Id.cast(T.IntegerType()), pred_df.Sales_pred).toPandas() submission_df.sort_values(by=['Id']).to_csv(args.local_submission_csv, index=False) print('Saved predictions to %s' % args.local_submission_csv) spark.stop()
The third script is run_classification_criteo_spark.py
.
import tempfile, string, random, os, uuid import argparse, datetime, sys, shutil import csv import numpy as np from sklearn.model_selection import train_test_split from tensorflow.keras.callbacks import EarlyStopping from pyspark import SparkContext from pyspark.sql import SparkSession, SQLContext, Row, DataFrame from pyspark.mllib import linalg as mllib_linalg from pyspark.mllib.linalg import SparseVector as mllibSparseVector from pyspark.mllib.linalg import VectorUDT as mllibVectorUDT from pyspark.mllib.linalg import Vector as mllibVector, Vectors as mllibVectors from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.classification import LogisticRegressionWithSGD from pyspark.ml import linalg as ml_linalg from pyspark.ml.linalg import VectorUDT as mlVectorUDT from pyspark.ml.linalg import SparseVector as mlSparseVector from pyspark.ml.linalg import Vector as mlVector, Vectors as mlVectors from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import OneHotEncoder from math import log from math import exp # exp(-t) = e^-t from operator import add from pyspark.sql.functions import udf, split, lit from pyspark.sql.functions import size, sum as sqlsum import pyspark.sql.functions as F import pyspark.sql.types as T from pyspark.sql.types import ArrayType, StructType, StructField, LongType, StringType, IntegerType, FloatType from pyspark.sql.functions import explode, col, log, when from collections import defaultdict import pandas as pd import pyspark.pandas as ps from sklearn.metrics import log_loss, roc_auc_score from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder, MinMaxScaler from deepctr.models import DeepFM from deepctr.feature_column import SparseFeat, DenseFeat, get_feature_names spark = SparkSession.builder \ .master("yarn") \ .appName("deep_ctr_classification") \ .config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") \ .config("spark.executor.cores", "1") \ .config('spark.executor.memory', '5gb') \ .config('spark.executor.memoryOverhead', '1500') \ .config('spark.driver.memoryOverhead', '1500') \ .config("spark.sql.shuffle.partitions", "480") \ .config("spark.sql.execution.arrow.enabled", "true") \ .config("spark.driver.maxResultSize", "50gb") \ .getOrCreate() # spark.conf.set("spark.sql.execution.arrow.enabled", "true") # deprecated print("Apache Spark version:") print(spark.version) sc = spark.sparkContext sqlContext = SQLContext(sc) parser = argparse.ArgumentParser(description='Spark DCN CTR Prediction Example', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--data-dir', default='file://' + os.getcwd(), help='location of data on local filesystem (prefixed with file://) or on HDFS') def process_input_file(file_name, sparse_feat, dense_feat): # Need this preprocessing to turn Criteo raw file into CSV: print("START processing input file...") # only convert the file ONCE # sample = open(file_name) # sample = '\n'.join([str(x.replace('\n', '').replace('\t', ',')) for x in sample]) # # Add header in data file and save as CSV # header = ','.join(str(x) for x in (['label'] + dense_feat + sparse_feat)) # with open('/sparkdemo/tr-4570-data/ctr_train.csv', mode='w', encoding="utf-8") as f: # f.write(header + '\n' + sample) # f.close() # print("Raw training file processed and saved as CSV: ", f.name) raw_df = sqlContext.read.option("header", True).csv(file_name) raw_df.show(5, False) raw_df.printSchema() # convert columns I1 to I13 from string to integers conv_df = raw_df.select(col('label').cast("double"), *(col(i).cast("float").alias(i) for i in raw_df.columns if i in dense_feat), *(col(c) for c in raw_df.columns if c in sparse_feat)) print("Schema of raw_df with integer columns type changed:") conv_df.printSchema() # result_pdf = conv_df.select("*").toPandas() tmp_df = conv_df.na.fill(0, dense_feat) result_df = tmp_df.na.fill('-1', sparse_feat) result_df.show() return result_df if __name__ == "__main__": args = parser.parse_args() # Pandas read CSV # data = pd.read_csv('%s/criteo_sample.txt' % args.data_dir) # print("Obtained Pandas df.") dense_features = ['I' + str(i) for i in range(1, 14)] sparse_features = ['C' + str(i) for i in range(1, 27)] # Spark read CSV # process_input_file('%s/train.txt' % args.data_dir, sparse_features, dense_features) # run only ONCE spark_df = process_input_file('%s/data.txt' % args.data_dir, sparse_features, dense_features) # sample data # spark_df = process_input_file('%s/ctr_train.csv' % args.data_dir, sparse_features, dense_features) print("Obtained Spark df and filled in missing features.") data = spark_df # Pandas #data[sparse_features] = data[sparse_features].fillna('-1', ) #data[dense_features] = data[dense_features].fillna(0, ) target = ['label'] label_npa = data.select("label").toPandas().to_numpy() print("label numPy array has length = ", len(label_npa)) # 45,840,617 w/ 11GB dataset label_npa.ravel() label_npa.reshape(len(label_npa), ) # 1.Label Encoding for sparse features,and do simple Transformation for dense features print("Before LabelEncoder():") data.printSchema() # label: float (nullable = true) for feat in sparse_features: lbe = LabelEncoder() tmp_pdf = data.select(feat).toPandas().to_numpy() tmp_ndarray = lbe.fit_transform(tmp_pdf) print("After LabelEncoder(), tmp_ndarray[0] =", tmp_ndarray[0]) # print("Data tmp PDF after lbe transformation, the output ndarray has length = ", len(tmp_ndarray)) # 45,840,617 for 11GB dataset tmp_ndarray.ravel() tmp_ndarray.reshape(len(tmp_ndarray), ) out_ndarray = np.column_stack([label_npa, tmp_ndarray]) pdf = pd.DataFrame(out_ndarray, columns=['label', feat]) s_df = spark.createDataFrame(pdf) s_df.printSchema() # label: double (nullable = true) print("Before joining data df with s_df, s_df example rows:") s_df.show(1, False) data = data.drop(feat).join(s_df, 'label').drop('label') print("After LabelEncoder(), data df example rows:") data.show(1, False) print("Finished processing sparse_features: ", feat) print("Data DF after label encoding: ") data.show() data.printSchema() mms = MinMaxScaler(feature_range=(0, 1)) # data[dense_features] = mms.fit_transform(data[dense_features]) # for Pandas df tmp_pdf = data.select(dense_features).toPandas().to_numpy() tmp_ndarray = mms.fit_transform(tmp_pdf) tmp_ndarray.ravel() tmp_ndarray.reshape(len(tmp_ndarray), len(tmp_ndarray[0])) out_ndarray = np.column_stack([label_npa, tmp_ndarray]) pdf = pd.DataFrame(out_ndarray, columns=['label'] + dense_features) s_df = spark.createDataFrame(pdf) s_df.printSchema() data.drop(*dense_features).join(s_df, 'label').drop('label') print("Finished processing dense_features: ", dense_features) print("Data DF after MinMaxScaler: ") data.show() # 2.count #unique features for each sparse field,and record dense feature field name fixlen_feature_columns = [SparseFeat(feat, vocabulary_size=data.select(feat).distinct().count() + 1, embedding_dim=4) for i, feat in enumerate(sparse_features)] + \ [DenseFeat(feat, 1, ) for feat in dense_features] dnn_feature_columns = fixlen_feature_columns linear_feature_columns = fixlen_feature_columns feature_names = get_feature_names(linear_feature_columns + dnn_feature_columns) # 3.generate input data for model # train, test = train_test_split(data.toPandas(), test_size=0.2, random_state=2020) # Pandas; might hang for 11GB data train, test = data.randomSplit(weights=[0.8, 0.2], seed=200) print("Training dataset size = ", train.count()) print("Testing dataset size = ", test.count()) # Pandas: # train_model_input = {name: train[name] for name in feature_names} # test_model_input = {name: test[name] for name in feature_names} # Spark DF: train_model_input = {} test_model_input = {} for name in feature_names: if name.startswith('I'): tr_pdf = train.select(name).toPandas() train_model_input[name] = pd.to_numeric(tr_pdf[name]) ts_pdf = test.select(name).toPandas() test_model_input[name] = pd.to_numeric(ts_pdf[name]) # 4.Define Model,train,predict and evaluate model = DeepFM(linear_feature_columns, dnn_feature_columns, task='binary') model.compile("adam", "binary_crossentropy", metrics=['binary_crossentropy'], ) lb_pdf = train.select(target).toPandas() history = model.fit(train_model_input, pd.to_numeric(lb_pdf['label']).values, batch_size=256, epochs=10, verbose=2, validation_split=0.2, ) pred_ans = model.predict(test_model_input, batch_size=256) print("test LogLoss", round(log_loss(pd.to_numeric(test.select(target).toPandas()).values, pred_ans), 4)) print("test AUC", round(roc_auc_score(pd.to_numeric(test.select(target).toPandas()).values, pred_ans), 4))