Résultats des tests
Nous avons utilisé les scripts Terasort et TeraValidate de l'outil de test TeraGen pour mesurer la validation des performances Spark avec les configurations E5760, E5724 et AFF-A800. En outre, trois cas d'utilisation majeurs ont été testés : les pipelines Spark NLP et l'entraînement TensorFlow distribué, la formation Horovod distribuée et l'apprentissage profond multi-travailleur à l'aide de Keras pour la prédiction par CTR avec DeepFM.
Pour la validation des environnements E-Series et StorageGRID, nous avons utilisé le facteur de réplication Hadoop 2. Pour la validation AFF, nous n'utilisons qu'une seule source de données.
Le tableau suivant répertorie la configuration matérielle pour la validation des performances Spark.
Type | Nœuds workers Hadoop | Type de disque | Disques par nœud | Contrôleur de stockage |
---|---|---|---|---|
SG6060 |
4 |
SAS |
12 |
Paire haute disponibilité unique |
E5760 |
4 |
SAS |
60 |
Une seule paire HA |
E5724 |
4 |
SAS |
24 |
Une seule paire HA |
AFF800 |
4 |
SSD |
6 |
Une seule paire HA |
Le tableau suivant répertorie la configuration logicielle requise.
Logiciel | Version |
---|---|
RHEL |
7.9 |
Environnement d'exécution OpenJDK |
1.8.0 |
Serveur virtuel OpenJDK 64 bits |
25.302 |
GIT |
2.24.1 |
GCC/G++ |
11.2.1 |
Étincelle |
3.2.1 |
PySpark |
3.1.2 |
SparkNLP |
3.4.2 |
TensorFlow |
2.9.0 |
Keras |
2.9.0 |
Horovod |
0.24.3 |
Analyse du sentiment financier
Nous avons publié "Tr-4910 : analyse du sentiment issue des communications avec le client sur NetApp ai", Dans lequel un pipeline d'IA conversationnel de bout en bout a été créé à l'aide du "Kit NetApp DataOps", Stockage AFF et système NVIDIA DGX. Le pipeline exécute le traitement du signal audio par lots, la reconnaissance vocale automatique (ASR), l'apprentissage par transfert et l'analyse de sentiment en utilisant le kit DataOps, "SDK NVIDIA Riva", et le "Cadre Tao". Élargissement de l'utilisation de l'analyse des sentiments à l'industrie des services financiers, nous avons conçu un workflow SparkNLP, chargé de trois modèles BERT pour diverses tâches du NLP, telles que la reconnaissance des entités nommées et obtenu un sentiment de phrase pour les 10 meilleures entreprises du NASDAQ appels trimestriels sur les bénéfices.
Le script suivant sentiment_analysis_spark. py
Utilise le modèle FinBERT pour traiter les transcriptions dans HDFS et produire des comptages positifs, neutres et négatifs, comme le montre le tableau suivant :
-bash-4.2$ time ~/anaconda3/bin/spark-submit --packages com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.3 --master yarn --executor-memory 5g --executor-cores 1 --num-executors 160 --conf spark.driver.extraJavaOptions="-Xss10m -XX:MaxPermSize=1024M" --conf spark.executor.extraJavaOptions="-Xss10m -XX:MaxPermSize=512M" /sparkusecase/tr-4570-nlp/sentiment_analysis_spark.py hdfs:///data1/Transcripts/ > ./sentiment_analysis_hdfs.log 2>&1 real13m14.300s user557m11.319s sys4m47.676s
Le tableau ci-dessous répertorie les résultats de l'analyse du sentiment au niveau des phrases pour les 10 premières entreprises du NASDAQ de 2016 à 2020.
Le nombre et le pourcentage de sentiments | Toutes les 10 entreprises | AAPL | AIRBUS | AMZN | CSCO | GOGL | INTC | MSFT | NVDA |
---|---|---|---|---|---|---|---|---|---|
Comptes positifs |
7447 |
1567 |
743 |
290 |
682 |
826 |
824 |
904 |
417 |
Compte neutre |
64067 |
6856 |
7596 |
5086 |
6650 |
5914 |
6099 |
5715 |
6189 |
Comptes négatifs |
1787 |
253 |
213 |
84 |
189 |
97 |
282 |
202 |
89 |
Décomptes sans catégorie |
196 |
0 |
0 |
76 |
0 |
0 |
0 |
1 |
0 |
(nombre total) |
73497 |
8676 |
8552 |
5536 |
7521 |
6837 |
7205 |
6822 |
6695 |
En termes de pourcentages, la plupart des phrases prononcées par les PDG et les DAF sont factuelles et portent donc un sentiment neutre. Lors d'un appel sur les gains, les analystes posent des questions qui peuvent transmettre un sentiment positif ou négatif. Il est intéressant d'examiner de manière quantitative la façon dont le sentiment négatif ou positif affecte le cours des actions le même jour ou le jour suivant de la négociation.
Le tableau ci-dessous répertorie les 10 premières entreprises du NASDAQ, exprimées en pourcentage, au niveau des phrases.
Pourcentage de sentiment | Toutes les 10 entreprises | AAPL | AIRBUS | AMZN | CSCO | GOGL | INTC | MSFT | NVDA |
---|---|---|---|---|---|---|---|---|---|
Positif |
10.13 % |
18.06 % |
8.69 % |
5.24 % |
9.07 % |
12.08 % |
11.44 % |
13.25 % |
6.23 % |
Neutre |
87.17 % |
79.02 % |
88.82 % |
91.87 % |
88.42 % |
86.50 % |
84.65 % |
83.77 % |
92.44 % |
Négatif |
2.43 % |
2.92 % |
2.49 % |
1.52 % |
2.51 % |
1.42 % |
3.91 % |
2.96 % |
1.33 % |
Sans catégorie |
0.27 % |
0 % |
0 % |
1.37 % |
0 % |
0 % |
0 % |
0.01 % |
0 % |
En termes d'exécution du flux de travail, nous avons constaté une nette amélioration de 4,6 fois par rapport à local
Le mode vers un environnement distribué dans HDFS et une amélioration encore de 0.14 % avec NFS.
-bash-4.2$ time ~/anaconda3/bin/spark-submit --packages com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.3 --master yarn --executor-memory 5g --executor-cores 1 --num-executors 160 --conf spark.driver.extraJavaOptions="-Xss10m -XX:MaxPermSize=1024M" --conf spark.executor.extraJavaOptions="-Xss10m -XX:MaxPermSize=512M" /sparkusecase/tr-4570-nlp/sentiment_analysis_spark.py file:///sparkdemo/sparknlp/Transcripts/ > ./sentiment_analysis_nfs.log 2>&1 real13m13.149s user537m50.148s sys4m46.173s
Comme le montre la figure suivante, le parallélisme des données et des modèles a amélioré le traitement des données et la vitesse d'inférence des modèles TensorFlow distribués. L'emplacement des données dans NFS a permis une exécution légèrement supérieure, car le goulot d'étranglement du flux de travail correspond au téléchargement des modèles pré-entraînés. Si nous augmentons la taille des jeux de données de transcription, l'avantage du protocole NFS est plus évident.
Formation distribuée avec la performance Horovod
La commande suivante a produit des informations d'exécution et un fichier journal dans notre cluster Spark à l'aide d'un seul master
nœud avec 160 exécuteurs avec chacun un noyau. La mémoire de l'exécuteur était limitée à 5 Go pour éviter les erreurs de mémoire insuffisante. Voir la section "“Scripts Python pour chaque cas d’utilisation majeur”" pour obtenir plus de détails sur le traitement des données, l'entraînement du modèle et le calcul de la précision du modèle dans keras_spark_horovod_rossmann_estimator.py
.
(base) [root@n138 horovod]# time spark-submit --master local --executor-memory 5g --executor-cores 1 --num-executors 160 /sparkusecase/horovod/keras_spark_horovod_rossmann_estimator.py --epochs 10 --data-dir file:///sparkusecase/horovod --local-submission-csv /tmp/submission_0.csv --local-checkpoint-file /tmp/checkpoint/ > /tmp/keras_spark_horovod_rossmann_estimator_local. log 2>&1
Le temps d'exécution résultant avec dix séries de tests d'entraînement était le suivant :
real43m34.608s user12m22.057s sys2m30.127s
Il fallait plus de 43 minutes pour traiter les données d'entrée, entraîner un modèle DNN, calculer la précision et produire des points de contrôle TensorFlow et un fichier CSV pour les résultats de prédiction. Nous avons limité le nombre de tests d'entraînement à 10, qui dans la pratique est souvent réglé à 100 pour assurer une précision satisfaisante du modèle. La durée d'entraînement évolue généralement de manière linéaire avec le nombre de séries de tests.
Nous avons ensuite utilisé les quatre nœuds workers disponibles dans le cluster et exécuté le même script dans yarn
Mode avec données dans HDFS :
(base) [root@n138 horovod]# time spark-submit --master yarn --executor-memory 5g --executor-cores 1 --num-executors 160 /sparkusecase/horovod/keras_spark_horovod_rossmann_estimator.py --epochs 10 --data-dir hdfs:///user/hdfs/tr-4570/experiments/horovod --local-submission-csv /tmp/submission_1.csv --local-checkpoint-file /tmp/checkpoint/ > /tmp/keras_spark_horovod_rossmann_estimator_yarn.log 2>&1
Le temps d'exécution obtenu a été amélioré comme suit :
real8m13.728s user7m48.421s sys1m26.063s
Avec le modèle et le parallélisme des données de Horovod dans Spark, nous avons vu une vitesse d'exécution de 5,29x yarn
contre local
mode avec dix séries de tests d'entraînement. Ceci est illustré dans la figure suivante avec les légendes HDFS
et Local
. L'entraînement du modèle DNN sous-jacent peut être accéléré au moyen de processeurs graphiques, le cas échéant. Nous prévoyons de mener ces tests et de publier les résultats dans un futur rapport technique.
Notre prochain test a comparé les temps d'exécution avec les données d'entrée résidant dans NFS et HDFS. Le volume NFS du AFF A800 a été monté sur /sparkdemo/horovod
Sur les cinq nœuds (un maître, quatre travailleurs) de notre cluster Spark Nous avons exécuté une commande similaire à celle des tests précédents avec --data- dir
Paramètre maintenant pointant vers le montage NFS :
(base) [root@n138 horovod]# time spark-submit --master yarn --executor-memory 5g --executor-cores 1 --num-executors 160 /sparkusecase/horovod/keras_spark_horovod_rossmann_estimator.py --epochs 10 --data-dir file:///sparkdemo/horovod --local-submission-csv /tmp/submission_2.csv --local-checkpoint-file /tmp/checkpoint/ > /tmp/keras_spark_horovod_rossmann_estimator_nfs.log 2>&1
Le temps d'exécution avec NFS obtenu est le suivant :
real 5m46.229s user 5m35.693s sys 1m5.615s
Il y a eu une accélération supplémentaire de 1,43 fois, comme le montre la figure suivante. Par conséquent, avec un système de stockage 100 % Flash NetApp connecté à leur cluster, les clients profitent des avantages du transfert et de la distribution rapides des données pour les workflows Horovod Spark, avec une vitesse de 7,5 fois supérieure à celle d'un seul nœud.
Modèles de deep learning pour les performances de prévision CTR
Pour les systèmes de recommandation conçus pour optimiser le CTR, vous devez apprendre les interactions de fonctionnalités sophistiquées derrière les comportements utilisateur qui peuvent être calculées mathématiquement de bas en haut de gamme. Les interactions de type faible et élevé avec les fonctionnalités doivent être tout aussi importantes pour un bon modèle d'apprentissage profond, sans biasing vers l'un ou l'autre. Le Deep Factorisation machine (DeepFM), un réseau neuronal basé sur la factorisation, combine les machines d'automatisation à des fins de recommandation et d'apprentissage profond afin d'apprendre les fonctionnalités dans une nouvelle architecture de réseaux neuronaux.
Bien que les machines de factorisation conventionnelles utilisent des interactions de composants pairées en tant que produit interne de vecteurs latents entre les fonctionnalités et permettent théoriquement de capturer des informations de gros ordre, en pratique, les professionnels de l'apprentissage machine n'utilisent généralement que des interactions de fonctionnalités de second ordre du fait de la complexité élevée des calculs et du stockage. Des variantes de réseau neuronal profondes comme celle de Google "Modèles larges et profonds" en revanche, elle apprend des interactions de fonctionnalités sophistiquées dans une structure de réseau hybride en combinant un modèle à large linéaire et un modèle profond.
Il existe deux entrées pour ce modèle large et profond, l'une pour le modèle large sous-jacent et l'autre pour le plus profond, dont la dernière partie nécessite toujours une ingénierie de fonctionnalité experte et rend ainsi la technique moins généralisable pour d'autres domaines. Contrairement au modèle large et profond, DeepFM peut être efficacement formé avec des fonctions brutes sans aucune technique de fonction car sa grande partie et sa pièce profonde partagent la même entrée et le même vecteur d'intégration.
Nous avons d'abord traité le Criteo train.txt
(11 Go) dans un fichier CSV nommé ctr_train.csv
Stocké dans un montage NFS /sparkdemo/tr-4570-data
à l'aide de run_classification_criteo_spark.py
dans la section "“Scripts Python pour chaque cas d’utilisation majeur.”" Dans ce script, la fonction process_input_file
effectue plusieurs méthodes de chaîne pour supprimer les onglets et les insérer ‘,’
comme séparateur et ‘\n’
en tant que réseau. Notez que vous n'avez besoin que de traiter l'original train.txt
une fois, de sorte que le bloc de code soit affiché comme commentaires.
Pour les tests suivants sur les différents modèles d'apprentissage profond, nous avons utilisé ctr_train.csv
comme fichier d'entrée. Lors des tests suivants, le fichier CSV d'entrée a été lu dans un Spark DataFrame avec un schéma contenant un champ de ‘label’
, composants denses entiers ['I1', 'I2', 'I3', …, 'I13']
, et des caractéristiques parsemées ['C1', 'C2', 'C3', …, 'C26']
. Les éléments suivants spark-submit
La commande prend dans un CSV d'entrée, forme des modèles DeepFM avec une répartition à 20 % pour la validation croisée, et sélectionne le meilleur modèle après dix séries de tests d'entraînement pour calculer la précision de prédiction sur le jeu de tests :
(base) [root@n138 ~]# time spark-submit --master yarn --executor-memory 5g --executor-cores 1 --num-executors 160 /sparkusecase/DeepCTR/examples/run_classification_criteo_spark.py --data-dir file:///sparkdemo/tr-4570-data > /tmp/run_classification_criteo_spark_local.log 2>&1
Notez que depuis le fichier de données ctr_train.csv
Est supérieur à 11 Go, vous devez définir une quantité suffisante spark.driver.maxResultSize
supérieure à la taille du jeu de données pour éviter toute erreur.
spark = SparkSession.builder \ .master("yarn") \ .appName("deep_ctr_classification") \ .config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") \ .config("spark.executor.cores", "1") \ .config('spark.executor.memory', '5gb') \ .config('spark.executor.memoryOverhead', '1500') \ .config('spark.driver.memoryOverhead', '1500') \ .config("spark.sql.shuffle.partitions", "480") \ .config("spark.sql.execution.arrow.enabled", "true") \ .config("spark.driver.maxResultSize", "50gb") \ .getOrCreate()
Dans le ci-dessus SparkSession.builder
configuration que nous avons également activée "Flèche Apache", Qui convertit un Spark DataFrame en un Pandas DataFrame avec le df.toPandas()
méthode.
22/06/17 15:56:21 INFO scheduler.DAGScheduler: Job 2 finished: toPandas at /sparkusecase/DeepCTR/examples/run_classification_criteo_spark.py:96, took 627.126487 s Obtained Spark DF and transformed to Pandas DF using Arrow.
Après la division aléatoire, le dataset d'entraînement contient plus de 36 rangées et des échantillons de 9 millions dans le dataset de test :
Training dataset size = 36672493 Testing dataset size = 9168124
Ce rapport technique étant axé sur les tests CPU sans utiliser de GPU, il est impératif de construire TensorFlow avec des indicateurs de compilateur appropriés. Cette étape évite d'appeler des bibliothèques à accélération graphique et tire pleinement parti des instructions AVX (Advanced Vector Extensions) et AVX2 de TensorFlow. Ces fonctionnalités sont conçues pour les calculs algébriques linéaires tels que l'ajout vectorisé, les multiproduits matriciels dans un entraînement DNN d'avance ou de contre-propagation. L'instruction FMA (Multiply Add) avec AVX2 utilisant des registres à virgule flottante 256 bits est idéale pour les types de code entier et de données, ce qui permet d'obtenir une vitesse de 2 fois plus élevée. Pour le code FP et les types de données, AVX2 atteint 8 % de vitesse supérieure à AVX.
2022-06-18 07:19:20.101478: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 FMA To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Pour créer TensorFlow à partir d'une source, NetApp vous recommande d'utiliser "Bazel". Pour notre environnement, nous avons exécuté les commandes suivantes dans l'invite du shell pour l'installation dnf
, dnf-plugins
, Et Bazel.
yum install dnf dnf install 'dnf-command(copr)' dnf copr enable vbatts/bazel dnf install bazel5
Vous devez activer GCC 5 ou version ultérieure pour utiliser les fonctions C++17 pendant le processus de création, qui est fourni par RHEL avec la bibliothèque de collections logicielles (SCL). Les commandes suivantes s'installent devtoolset
Et GCC 11.2.1 sur notre cluster RHEL 7.9 :
subscription-manager repos --enable rhel-server-rhscl-7-rpms yum install devtoolset-11-toolchain yum install devtoolset-11-gcc-c++ yum update scl enable devtoolset-11 bash . /opt/rh/devtoolset-11/enable
Notez que les deux dernières commandes sont en cours d'activation devtoolset-11
, qui utilise /opt/rh/devtoolset-11/root/usr/bin/gcc
(GCC 11.2.1). Assurez-vous également que votre git
La version est supérieure à 1.8.3 (fournie avec RHEL 7.9). Se reporter à ceci "article" pour mise à jour git
à 2.24.1.
Nous supposons que vous avez déjà cloné le dernier référentiel TensorFlow maître. Créez ensuite un workspace
répertoire avec un WORKSPACE
Fichier pour créer TensorFlow à partir de la source avec AVX, AVX2 et FMA. Exécutez le configure
Et spécifiez l'emplacement binaire Python correct. "CUDA" Est désactivé pour nos tests car nous n'avons pas utilisé de GPU. A .bazelrc
le fichier est généré en fonction de vos paramètres. De plus, nous avons modifié le fichier et l'ensemble build --define=no_hdfs_support=false
Pour activer la prise en charge de HDFS. Reportez-vous à la section .bazelrc
dans la section "“Scripts Python pour chaque cas d’utilisation majeur,”" pour obtenir une liste complète des paramètres et des indicateurs.
./configure bazel build -c opt --copt=-mavx --copt=-mavx2 --copt=-mfma --copt=-mfpmath=both -k //tensorflow/tools/pip_package:build_pip_package
Après avoir créé TensorFlow avec les indicateurs appropriés, exécutez le script suivant pour traiter le jeu de données Criteo Display Ads, former un modèle DeepFM et calculer la zone sous la courbe caractéristique d'exploitation du récepteur (ROC CASC) à partir des notes de prédiction.
(base) [root@n138 examples]# ~/anaconda3/bin/spark-submit --master yarn --executor-memory 15g --executor-cores 1 --num-executors 160 /sparkusecase/DeepCTR/examples/run_classification_criteo_spark.py --data-dir file:///sparkdemo/tr-4570-data > . /run_classification_criteo_spark_nfs.log 2>&1
Après dix tests d'entraînement, nous avons obtenu le score AUC sur le jeu de données de test :
Epoch 1/10 125/125 - 7s - loss: 0.4976 - binary_crossentropy: 0.4974 - val_loss: 0.4629 - val_binary_crossentropy: 0.4624 Epoch 2/10 125/125 - 1s - loss: 0.3281 - binary_crossentropy: 0.3271 - val_loss: 0.5146 - val_binary_crossentropy: 0.5130 Epoch 3/10 125/125 - 1s - loss: 0.1948 - binary_crossentropy: 0.1928 - val_loss: 0.6166 - val_binary_crossentropy: 0.6144 Epoch 4/10 125/125 - 1s - loss: 0.1408 - binary_crossentropy: 0.1383 - val_loss: 0.7261 - val_binary_crossentropy: 0.7235 Epoch 5/10 125/125 - 1s - loss: 0.1129 - binary_crossentropy: 0.1102 - val_loss: 0.7961 - val_binary_crossentropy: 0.7934 Epoch 6/10 125/125 - 1s - loss: 0.0949 - binary_crossentropy: 0.0921 - val_loss: 0.9502 - val_binary_crossentropy: 0.9474 Epoch 7/10 125/125 - 1s - loss: 0.0778 - binary_crossentropy: 0.0750 - val_loss: 1.1329 - val_binary_crossentropy: 1.1301 Epoch 8/10 125/125 - 1s - loss: 0.0651 - binary_crossentropy: 0.0622 - val_loss: 1.3794 - val_binary_crossentropy: 1.3766 Epoch 9/10 125/125 - 1s - loss: 0.0555 - binary_crossentropy: 0.0527 - val_loss: 1.6115 - val_binary_crossentropy: 1.6087 Epoch 10/10 125/125 - 1s - loss: 0.0470 - binary_crossentropy: 0.0442 - val_loss: 1.6768 - val_binary_crossentropy: 1.6740 test AUC 0.6337
De la même manière que dans les précédents cas d'utilisation, nous avons comparé le temps d'exécution du flux de production Spark avec des données résidant sur différents emplacements. La figure suivante montre une comparaison des prédictions CTR d'apprentissage profond pour le temps d'exécution des workflows Spark.