YouTube Setl est un projet qui vise à fournir un point de départ pour pratiquer le cadre SETL: https://github.com/setl-developers/setl. L'idée est de donner un projet de contexte impliquant des opérations d'extrait, de transformation et de charge. Il y a trois niveaux de difficulté pour l'exercice: mode facile, mode normal et mode dur.
Les données utilisées proviennent de Kaggle, https://www.kaggle.com/datasnaek/youtube-new.
J'ai utilisé JetBrains Intellij Idea Community Edition pour ce projet, avec Scala et Apache Spark.
Les données sont divisées en régions multiples: Canada (CA), Allemagne (DE), France (FR), Grande-Bretagne (GB), Inde (IN), Japon (JP), Corée du Sud (KR), Mexique (MX), Russie (RU) et États-Unis (États-Unis). Pour chacune de ces régions, il y a deux fichiers:

Chaque jour, YouTube propose environ 200 des vidéos les plus tendance de chaque pays. YouTube mesure combien une vidéo est à la mode en fonction d'une combinaison de facteurs qui ne sont pas rendus entièrement publics. Cet ensemble de données consiste en une collection de vidéos à tendance les plus importantes du quotidien. En conséquence, il est possible que la même vidéo apparaisse plusieurs fois, ce qui signifie qu'elle est tendance pendant plusieurs jours.
Fondamentalement, les éléments des champs des éléments nous permettent de cartographier la category_id du fichier CSV dans la catégorie du nom complet.
Nous allons analyser cet ensemble de données et déterminer les vidéos "populaires". Mais comment définissons-nous une vidéo populaire? Nous allons définir la popularité d'une vidéo basée sur son nombre de vues, goûts, dégoûts, nombre de commentaires et nombre de jours de tendance.
Cette définition est clairement discutable et arbitraire, et nous ne cherchons pas à trouver la meilleure définition de la popularité d'une vidéo. Nous nous concentrerons uniquement sur l'objectif de ce projet: pratiquer avec le cadre SETL.
L'objectif de ce projet est de trouver les 100 vidéos les plus "populaires" et les catégories de vidéos les plus "les plus". Mais comment avons-nous défini la popularité d'une vidéo? La formule va être:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight .
Le pourcentage de goûts est le rapport des goûts qui n'aiment pas. Ce rapport est normalisé sur le nombre de vues. La même normalisation se fait avec le nombre de commentaires.
Voici les instructions pour chaque niveau de difficulté pour réaliser le projet. Pour chaque niveau de difficulté, vous pouvez cloner le repo avec la branche spécifique pour avoir un projet de départ.
Pour ce projet, nous supposons que vous avez déjà une connaissance de base de Scala et Apache Spark.
entity qui contient les classes de cas ou les objets; factory qui contient des transformateurs; et transformer qui contient les transformations de données.Factory ou un Transformer SETL , vous pouvez utiliser Ctrl+i pour créer automatiquement les fonctions nécessaires. La première chose que nous allons faire est, bien sûr, de lire les entrées: les fichiers CSV, que j'appellerai les fichiers de vidéos et les fichiers JSON, les fichiers de catégories.
Commençons par les fichiers des catégories. Tous les fichiers de catégories sont des fichiers JSON . Créez une classe de cas qui représente une catégorie , puis une Factory avec un Transformer qui traitera les fichiers de catégories dans la classe de cas.
local.conf . Un objet a déjà été créé afin de lire les fichiers des catégories.org.apache.spark.sql.functions .coalesce lors de l'enregistrement d'un fichier. Nous pouvons maintenant travailler avec les fichiers vidéos. De même, créez une classe de cas qui représente une vidéo pour la lecture des entrées, puis une Factory avec un ou plusieurs Transformers qui fera le traitement. Étant donné que les fichiers de vidéos sont séparés des régions, il n'y a pas les informations de région pour chaque enregistrement dans l'ensemble de données. Essayez d'ajouter ces informations en utilisant une autre vidéo de classe de cas qui est très similaire à la vidéo et fusionnez tous les enregistrements dans un seul ensemble de données / données.
Transformers seront utiles: un pour ajouter la colonne country et une pour fusionner toutes les vidéos dans un seul ensemble de données.Parce qu'une vidéo peut être une tendance de haut niveau pendant une journée et le lendemain, il est possible qu'une vidéo ait plusieurs lignes, où chacune a des chiffres différents en termes de vues, de goûts, de dégoûts, de commentaires ... En conséquence, nous devons récupérer les dernières statistiques disponibles pour une seule vidéo, pour chaque région, car ces statistiques sont incrémentielles. Dans le même temps, nous allons calculer le nombre de jours de tendance pour chaque vidéo.
Créez un vidéostats de classe de cas, qui est très similaire aux classes de cas précédentes, mais avec les informations sur les jours de tendance.
Tout d'abord, calculez le nombre de jours de tendance de chaque vidéo.
window à partir d' org.apache.spark.sql.functions .Pour récupérer les dernières statistiques, vous devez récupérer le dernier jour de tendance de chaque vidéo. Il s'agit en fait des dernières statistiques disponibles.
window . Le premier était de calculer le nombre de jours de tendance et le second pour récupérer les dernières statistiques.rank .Triez les résultats par région, le nombre de jours de tendance, les vues, les goûts et les commentaires. Il préparera les données pour la prochaine réalisation.
Nous allons maintenant calculer le score de popularité de chaque vidéo, après avoir obtenu leurs dernières statistiques. Comme dit précédemment, notre formule est très simple et peut ne pas représenter la réalité.
Normalisons le nombre de goûts / aversions sur le nombre de vues. Pour chaque enregistrement, divisez le nombre de likes par le nombre de vues, puis le nombre de aversions par le nombre de vues. Après cela, obtenez le pourcentage de goûts "normalisés".
Normalisons maintenant le nombre de commentaires. Pour chaque enregistrement, divisez le nombre de commentaires par le nombre de vues.
Nous pouvons maintenant calculer le score de popularité. Rappelez que la formule est: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Cependant, il y a des vidéos où les commentaires sont désactivés. Dans ce cas, la formule devient: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Nous avons décidé arbitrairement les poids à être:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 Configurez-les Input afin qu'ils puissent être facilement modifiés.
when et fonctionne otherwise à partir d' org.apache.spark.sql.functions . Triez par la score en ordre décroissant et prenez les 100 premiers records. Vous avez maintenant les 100 vidéos les plus "populaires" des 10 régions.
La première chose que nous allons faire est, bien sûr, de lire les entrées: les fichiers CSV, que j'appellerai les fichiers de vidéos et les fichiers JSON, les fichiers de catégories.
Commençons par les fichiers des catégories. Tous les fichiers de catégories sont des fichiers JSON. Voici le workflow: nous allons définir un fichier de configuration qui indique les fichiers de catégories à lire; créer une classe de cas qui représente une catégorie; Ensuite, une Factory avec un Transformer qui traitera les fichiers des catégories dans la classe de cas. Enfin, nous allons ajouter la Stage dans le Pipeline pour déclencher les transformations.
L'objet de configuration a déjà été créé dans resources/local.conf . Faites attention aux options storage et path . Déplacez les fichiers des catégories en conséquence. Si plusieurs fichiers sont dans le même dossier et que le dossier est utilisé comme chemin, SETL considère les fichiers comme des partitions d'un seul fichier. Ensuite, consultez l' App.scala . Vous pouvez voir que nous avons utilisé les méthodes setConnector() et setSparkRepository() . Chaque fois que vous souhaitez utiliser un référentiel, vous devrez ajouter une configuration dans la configuration et l'enregistrer dans l'objet setl .
Créez une Category de classe nommée dans le dossier entity . Examinez maintenant, dans les fichiers des catégories, les champs dont nous aurons besoin.
Nous aurons besoin de l' id et du title de la catégorie. Assurez-vous de vérifier les fichiers et d'utiliser la même orthographe pour créer la classe de cas Category .
Le squelette de l' Factory a déjà été fourni. Assurez-vous de comprendre la structure logique.
Delivery sous la forme d'un Connector nous permet de récupérer les entrées. Une autre Delivery agira comme un SparkRepository , où nous rédigeons la sortie de la transformation. Consultez l' id de chaque Delivery et la deliveryId dans App.scala . Ils sont utilisés, il n'y a donc pas d'ambiguïté lorsque setL récupére les référentiels. Pour pouvoir lire les deux livraisons précédentes, nous allons utiliser deux autres variables: une DataFrame pour la lecture du Connector et un Dataset pour stocker la sortie SparkRepository . La différence entre eux est qu'une SparkRepository est tapée, d'où l' Dataset .Factory Setl :read : L'idée est de prendre le Connector ou les entrées SparkRepository Delivery , de les prétraiter si nécessaire, et de les stocker en variables pour les utiliser dans la fonction suivante.process : Voici où toutes les transformations de données seront effectuées. Créez une instance du Transformer que vous utilisez, appelez la méthode transform() , utilisez le Getter transformed et stockez le résultat en variable.write : Comme son nom l'indique, il est utilisé pour enregistrer la sortie des transformations une fois qu'ils ont été terminés. Un Connector utilise la méthode write() pour enregistrer un DataFrame , et un SparkRepository utilise la méthode save() pour enregistrer un Dataset .get : Cette fonction est utilisée pour passer la sortie dans l' Stage suivante du Pipeline . Renvoyez simplement l' Dataset .process , il peut y avoir plusieurs Transformer . Nous allons essayer de suivre cette structure tout au long du reste du projet.Factory sera automatiquement transféré à l' Stage suivante à travers la fonction get . Cependant, l'écriture de la sortie de chaque Factory sera plus facile pour la visualisation et le débogage. Encore une fois, le squelette du Transformer a déjà été fourni. Cependant, vous serez celui qui rédigera la transformation des données.
Transformer prend un argument. Habituellement, c'est le DataFrame ou l' Dataset que nous voulons traiter. Selon votre demande, vous pouvez ajouter d'autres arguments.transformedData est la variable qui stockera le résultat de la transformation des données.transformed est le Getter qui sera appelé par une Factory pour récupérer le résultat de la transformation des données.transform() est la méthode qui fera les transformations de données.items . Si vous consultez les fichiers des catégories, les informations dont nous avons besoin se trouvent sur ce champ.items est un tableau. Nous voulons exploser ce tableau et ne prendre que le champ id et le champ title du champ snippet . Pour ce faire, utilisez la fonction explode à partir d' org.apache.spark.sql.functions . Ensuite, pour obtenir des champs spécifiques, utilisez la méthode withColumn et la méthode getField() sur id, snippet et title . N'oubliez pas de lancer les types en conséquence à la classe de cas que vous avez créée.id et les colonnes title . Ensuite, lancez le DataFrame dans un ensemble de données avec as[T] .Transformer . Pour voir ce qu'il fait, vous pouvez exécuter le fichier App.scala qui a déjà été créé. Il exécute simplement l' Factory qui contient le Transformer que vous venez d'écrire, et il sortira le résultat sur le chemin du fichier de configuration. Notez que l' Factory correspondante a été ajoutée via addStage() qui fait l'exécuter le Pipeline .Connector , en utilisant l'annotation @Delivery , avec deliveryId .Transformer dans la méthode process d'une Factory .write d'une Factory . Traitons maintenant les fichiers de vidéos. Nous tenons à fusionner tous les fichiers dans un seul DataFrame / Dataset ou dans le même fichier CSV, tout en conservant les informations de la région pour chaque vidéo. Tous les fichiers de vidéos sont des fichiers CSV et ils ont les mêmes colonnes, comme indiqué précédemment dans la section de contexte . Le workflow est similaire à la dernière: configuration; classe de cas; Factory ; Transformer ; Ajoutez la Stage dans le Pipeline . Cette fois, nous allons définir plusieurs objets de configuration.
Nous allons définir plusieurs objets de configuration dans resources/local.conf , un par région. Dans chaque objet de configuration, vous devrez définir storage, path, inferSchema, delimiter, header, multiLine et dateFormat .
videos<region>Repository .Factory . Créez une classe de cas nommée Video dans le dossier entity . Examinez maintenant, dans les fichiers des vidéos, les champs dont nous aurons besoin. Rappelez que l'objectif est de calculer le score de popularité, et que la formule est number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight . Il aidera à sélectionner les champs.
Créez une autre classe de cas nommée VideoCountry . Il aura exactement les mêmes champs que Video , mais avec le champ de pays / région en plus.
@ColumnName du framework. Essayez de l'utiliser car il peut être utile dans certaines situations commerciales réelles.java.sql.Date pour un champ de type de date. Nous aimerions avoir le videoId , title , channel_title , category_id , trending_date , views , likes , dislikes , comment_count , comments_disabled et video_error_or_removed Fields.
L'objectif de cette usine est de fusionner tous les fichiers de vidéos en un seul, sans supprimer les informations de la région. Cela signifie que nous allons utiliser deux types de Transformer .
Delivery sous la forme d'un SparkRepository[Video] . Définissez une dernière Delivery en tant que SparkRepository[VideoCountry] , où nous rédigerons la sortie de la transformation. Définissez autant de variables Dataset[Video] que le nombre d'entrées.Factory :read : Prétraitez le SparkRepository en filtrant les vidéos qui sont supprimées ou erronées . Ensuite, les «lancer» en tant que Dataset[Video] et les stocker dans les variables correspondantes.process : appliquez le premier Transformer pour chacune des entrées et appliquez les résultats au deuxième Transformer .write : Écrivez la sortie SparkRepository[VideoCountry] .get : Renvoyez simplement le résultat du Transformer final.Connector pour lire les fichiers d'entrée et un SparkRepository pour la sortie?SparkRepository pour lire les entrées juste pour fournir une structure pour les fichiers d'entrée.SparkRepository et beaucoup de variables correspondantes, et je ne trouve pas cette jolie / concise. N'y a-t-il pas une autre solution?Delivery sous la forme d'un SparkRepository , vous pouvez utiliser des livraisons sous la forme d'un Dataset avec une option autoLoad = true . Donc, au lieu d'avoir: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
L'objectif principal du premier Transformer est d'ajouter les informations régionales / pays. Créez un Transformer qui prend deux entrées, un Dataset[Video] et une chaîne. Ajoutez le country de la colonne et renvoyez un Dataset[VideoCountry] . Vous pouvez également filtrer les vidéos qui sont étiquetées comme supprimées ou erreur . Bien sûr, cette dernière étape peut être placée ailleurs.
L'objectif principal du deuxième Transformer est de regrouper toutes les vidéos ensemble, tout en conservant les informations de la région.
reduce et union . Pour vérifier le résultat de votre travail, accédez à App.scala , définissez les SparkRepositories , ajoutez la VideoFactory sur scène et exécutez le code. Il créera le fichier de sortie dans le chemin d'accès correspondant.
Connector et SparkRepository .Deliveries dans un Transformer ou un Connector .Transformers dans une Factory .Parce qu'une vidéo peut être une tendance de haut niveau pendant une journée et le lendemain, elle aura des chiffres différents en termes de vues, de goûts, de dégoûts, de commentaires ... En conséquence, nous devons récupérer les dernières statistiques disponibles pour une seule vidéo, pour chaque région. Dans le même temps, nous allons calculer le nombre de jours de tendance pour chaque vidéo.
Mais comment allons-nous faire ça? Tout d'abord, nous allons regrouper les enregistrements qui correspondent à la même vidéo et comptent le nombre d'enregistrements, qui est essentiellement le nombre de jours de tendance. Ensuite, nous allons classer ces enregistrements groupés et prendre le dernier, pour récupérer les dernières statistiques.
Le fichier de configuration de la sortie de VideoFactory est déjà défini dans la réalisation précédente afin qu'elle puisse être enregistrée. Vous devrez le lire et le traiter pour obtenir les dernières statistiques des vidéos. N'oubliez pas d'ajouter un fichier de configuration pour la sortie de cette nouvelle Factory .
Créez une classe de cas nommée VideoStats qui ont des champs similaires à VideoCountry , mais vous devez prendre en compte le nombre de jours de tendance.
Dans cette usine, il vous suffit de lire l'entrée, de la transmettre au Transformer qui fera le traitement des données et d'écriture de la sortie. Cela devrait être assez simple; Vous pouvez essayer d'imiter les autres Factories .
Deliveries d'entrées et de sorties. Comme dit précédemment, nous allons regrouper les vidéos. Pour cela, nous allons utiliser org.apache.spark.sql.expressions.Window . Assurez-vous de savoir ce qu'une Window fait au préalable.
Window dont vous partirez pour compter le nombre de jours de tendance pour chaque vidéo. Pour savoir quels champs vous allez partitionner, regardez quels champs seront les mêmes pour une seule vidéo.Window qui sera utilisée pour classer les vidéos par leur date de tendance. En sélectionnant la date la plus récente, nous pouvons récupérer les dernières statistiques de chaque vidéo.Windows , vous pouvez désormais ajouter de nouvelles colonnes trendingDays pour le nombre de jours de tendance et rank pour le classement de la date de tendance par ordre décroissant.rank , ne prenant que les records avec le rank 1.DataFrame à Dataset[VideoStats] .partitionBy et orderBy pour la Window ; et le count de méthodes rank de org.apache.spark.sql.functions lorsque vous travaillez avec l' Dataset . Pour vérifier le résultat de votre travail, accédez à App.scala , définissez les SparkRepositories , ajoutez la scène et exécutez le code. Il créera le fichier de sortie dans le chemin d'accès correspondant.
Pipeline .Connector et un SparkRepository , et comment en faire Deliveries . Nous allons maintenant calculer le score de popularité de chaque vidéo, après avoir obtenu leurs dernières statistiques. Comme dit précédemment, notre formule est très simple et peut ne pas représenter la réalité. Rappelons que la formule est views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight . En utilisant le résultat précédent des VideoStats , nous allons simplement appliquer la formule et trier les données par le score le plus élevé au plus bas.
Il s'agit de la dernière transformation des données. Définissez la configuration afin que vous puissiez enregistrer ce dernier Dataset[VideoStats] . Pour ajouter les constantes utilisées pour la formule, vous devrez définir Inputs dans le Pipeline . Avant d'ajouter des étapes dans le Pipeline , utilisez setInput[T](<value>, <id>) pour définir les constantes. Ces entrées sont récupérables à tout moment dans toutes Factories une fois ajoutées au Pipeline .
Aucune entité ne sera nécessaire ici. Nous allons simplement trier les données précédentes et supprimer les colonnes utilisées pour calculer le score afin que nous puissions toujours utiliser l'entité VideoStats .
Dans cette usine, il vous suffit de lire l'entrée, de la transmettre au Transformer qui fera le traitement des données et d'écriture de la sortie. Cela devrait être assez simple; Vous pouvez essayer d'imiter les autres Factories .
Deliverable : Connector , SparkRepository et / ou Input .Normalisons le nombre de goûts / aversions sur le nombre de vues. Pour chaque enregistrement, divisez le nombre de likes par le nombre de vues, puis le nombre de aversions par le nombre de vues. Après cela, obtenez le pourcentage de goûts "normalisés".
Normalisons maintenant le nombre de commentaires. Pour chaque enregistrement, divisez le nombre de commentaires par le nombre de vues.
Nous pouvons maintenant calculer le score de popularité. Rappelez que la formule est: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Cependant, il y a des vidéos où les commentaires sont désactivés. Dans ce cas, la formule devient: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Nous avons décidé arbitrairement les poids à être:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when et fonctionne otherwise à partir d' org.apache.spark.sql.functions . Triez par la score en ordre décroissant et prenez les 100 premiers records. Vous avez maintenant les 100 vidéos les plus "populaires" des 10 régions.
Pour vérifier le résultat de votre travail, accédez à App.scala , définissez les Inputs s'ils ne sont pas déjà définis, définissez le SparkRepository de sortie, ajoutez l'étape et exécutez le code. Il créera le fichier de sortie dans le chemin d'accès correspondant.
Deliveries : Input , Connector et SparkRepository , avec deliveryId .Stage , y compris l' Factory et le Transformer(s) .Si vous avez aimé ce projet, veuillez consulter Setl Framework ici: https://github.com/setl-developers/setl, et pourquoi ne pas apporter votre contribution!