YouTube Setl ist ein Projekt, das einen Ausgangspunkt für das Üben des SETL-Frameworks anbietet: https://github.com/setl-developers/setl. Die Idee ist, ein Kontextprojekt zu geben, das Extrakt-, Transformations- und Lastvorgänge beinhaltet. Für die Übung gibt es drei Schwierigkeitsgrade: Easy -Modus, normaler Modus und Hard -Modus.
Die verwendeten Daten stammen von Kaggle, https://www.kaggle.com/datasnaek/youtube-new.
Ich habe JetBrains Intellij Idea Community Edition für dieses Projekt mit Scala und Apache Spark verwendet.
Die Daten sind in Mehrfachregionen unterteilt: Kanada (CA), Deutschland (DE), Frankreich (FR), Großbritannien (GB), Indien (IN), Japan (JP), Südkorea (KR), Mexiko (MX), Russland (Ru) und den Vereinigten Staaten (USA). Für jede dieser Regionen gibt es zwei Dateien:

Jeden Tag bietet YouTube etwa 200 der trendigsten Videos in jedem Land. YouTube misst, wie viel ein Video auf der Grundlage einer Kombination von Faktoren, die nicht vollständig veröffentlicht werden, trendy ist. Dieser Datensatz besteht aus einer Sammlung von Top -Trendvideos des Alltags. Infolgedessen ist es möglich, dass dasselbe Video mehrmals angezeigt wird, was bedeutet, dass es mehrere Tage im Trend ist.
Grundsätzlich ermöglichen die Elemente der Elemente Felder es uns, die category_id der CSV -Datei der vollständigen Namenskategorie zuzuordnen.
Wir werden diesen Datensatz analysieren und "beliebte" Videos bestimmen. Aber wie definieren wir ein beliebtes Video? Wir werden die Popularität eines Videos auf der Grundlage seiner Anzahl von Ansichten, Vorlieben, Abneigungen, Anzahl der Kommentare und der Anzahl der Trendtage definieren.
Diese Definition ist eindeutig umstritten und willkürlich, und wir suchen nicht nach der besten Definition für die Popularität eines Videos. Wir werden uns nur auf den Zweck dieses Projekts konzentrieren: Übung mit dem SETL -Framework.
Ziel dieses Projekts ist es, die 100 "beliebtesten" Videos und die "beliebtesten" Videokategorien zu finden. Aber wie haben wir die Popularität eines Videos definiert? Die Formel wird sein:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight .
Der Likes -Prozentsatz ist das Verhältnis von Likes gegenüber Abneigungen. Dieses Verhältnis wird über die Anzahl der Ansichten normalisiert. Die gleiche Normalisierung erfolgt mit der Anzahl der Kommentare.
Im Folgenden finden Sie die Anweisungen für jedes Schwierigkeitsgrad, um das Projekt zu realisieren. Für jeden Schwierigkeitsgrad können Sie das Repo mit dem spezifischen Zweig klonen, um ein Startprojekt zu haben.
Für dieses Projekt gehen wir davon aus, dass Sie bereits grundlegende Kenntnisse in Scala und Apache Spark haben.
entity , die die Fallklassen oder die Objekte enthält; factory , die Transformatoren enthält; und transformer , der die Datentransformationen enthält.Ctrl+i Sie beim Erstellen einer SETL Factory oder Transformer die erforderlichen Funktionen automatisch erstellen. Das erste, was wir tun werden, ist natürlich die Eingaben: Die CSV -Dateien, die ich die Videosdateien und die JSON -Dateien, die Kategoriendateien aufrufen werde.
Beginnen wir mit den Kategoriendateien. Alle Kategoriendateien sind JSON -Dateien. Erstellen Sie eine Fallklasse, die eine Kategorie darstellt, und dann eine Factory mit einem Transformer , das die Kategoriendateien in der Fallklasse verarbeitet.
local.conf -Datei an. Es wurde bereits ein Objekt erstellt, um die Kategoriendateien zu lesen.org.apache.spark.sql.functions zu betrachten.coalesce zu verwenden, wenn Sie eine Datei speichern. Wir können jetzt mit den Videosdateien arbeiten. Erstellen Sie in ähnlicher Weise eine Fallklasse, die ein Video zum Lesen der Eingänge darstellt, und dann eine Factory mit einem oder mehreren Transformers , die die Verarbeitung durchführen. Da die Videosdateien von Regionen getrennt sind, gibt es nicht die Regionsinformationen für jeden Datensatz im Datensatz. Versuchen Sie, diese Informationen mit einer anderen Fallklassen -Videokounnismen hinzuzufügen, die dem Video sehr ähnlich ist, und fungieren Sie alle Datensätze in einem einzelnen Datenrahmen/Datensatz.
Transformers werden nützlich sein: eine zum Hinzufügen der country und eine zum Zusammenführen aller Videos in einen einzelnen Datensatz.Da ein Video für einen Tag und am nächsten Tag ein Top -Trend sein kann, kann ein Video mehrere Zeilen haben, bei denen jeweils jeweils unterschiedliche Zahlen in Bezug auf Ansichten, Vorlieben, Abneigungen, Kommentare ... als Folge die neuesten Statistiken für ein einzelnes Video für jede Region abrufen müssen, da diese Statistiken inkrementell sind. Gleichzeitig werden wir die Anzahl der Trendtage für jedes Video berechnen.
Erstellen Sie eine Case -Class -Videostats , die den vorherigen Fallklassen sehr ähnlich ist, jedoch mit den Informationen über die Trendtage.
Berechnen Sie zunächst die Anzahl der Trendtage jedes Videos.
window von org.apache.spark.sql.functions an.Um die neuesten Statistiken abzurufen, müssen Sie den neuesten Trendtag jedes Videos abrufen. Es ist in der Tat die neuesten verfügbaren Statistiken.
window erstellen. Der erste war für die Berechnung der Anzahl der Trendtage und der zweite, um die neuesten Statistiken abzurufen.rank .Sortieren Sie die Ergebnisse nach Region, Anzahl der Trendtage, Ansichten, Likes und dann Kommentare. Es wird die Daten für die nächste Leistung vorbereiten.
Wir werden jetzt die Popularitätsbewertung jedes Videos berechnen, nachdem wir ihre neuesten Statistiken erhalten haben. Wie bereits gesagt, ist unsere Formel sehr einfach und kann die Realität nicht darstellen.
Normalisieren wir die Anzahl der Likes/Abneigungen über die Anzahl der Ansichten. Teilen Sie für jeden Datensatz die Anzahl der Likes durch die Anzahl der Ansichten und dann die Anzahl der Abneigungen durch die Anzahl der Ansichten. Danach erhalten Sie den Prozentsatz der "normalisierten" Likes.
Normalisieren wir nun die Anzahl der Kommentare. Teilen Sie für jeden Datensatz die Anzahl der Kommentare nach der Anzahl der Ansichten.
Wir können jetzt die Popularitätsbewertung berechnen. Erinnern Sie sich daran, dass die Formel lautet: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Es gibt jedoch Videos, in denen Kommentare deaktiviert sind. In diesem Fall wird die Formel: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Wir haben willkürlich entschieden, die Gewichte zu sein:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 Richten Sie sie als Input ein, damit sie leicht geändert werden können.
when und otherwise von org.apache.spark.sql.functions funktioniert. Sortieren Sie die score in absteigender Reihenfolge und nehmen Sie die 100 ersten Rekorde. Sie haben jetzt die 100 "beliebtesten" Videos aus den 10 Regionen.
Das erste, was wir tun werden, ist natürlich das Lesen der Eingaben: die CSV -Dateien, die ich die Videosdateien und die JSON -Dateien, die Kategoriendateien aufrufen werde.
Beginnen wir mit den Kategoriendateien. Alle Kategoriendateien sind JSON -Dateien. Hier ist der Workflow: Wir werden eine Konfigurationsdatei definieren, die die zu lesen von den Kategoriendateien angibt. Erstellen Sie eine Fallklasse, die eine Kategorie darstellt. Dann eine Factory mit einem Transformer , der die Kategoriendateien in der Fallklasse verarbeitet. Schließlich werden wir die Stage in die Pipeline hinzufügen, um die Transformationen auszulösen.
Das Konfigurationsobjekt wurde bereits in resources/local.conf erstellt. Achten Sie auf die storage und path . Verschieben Sie die Kategoriendateien entsprechend. Wenn sich mehrere Dateien im selben Ordner befinden und der Ordner als Pfad verwendet wird, betrachtet SETL die Dateien als Partitionen einer einzelnen Datei. Schauen Sie sich als Nächstes die App.scala an. Sie können sehen, dass wir die Methoden setConnector() und setSparkRepository() verwendet haben. Jedes Mal, wenn Sie ein Repository verwenden möchten, müssen Sie eine Konfiguration in die Konfiguration hinzufügen und im setl -Objekt registrieren.
Erstellen Sie eine Fallklasse mit dem Namen Category im entity . Untersuchen Sie nun in den Kategoriendateien die Felder, die wir benötigen.
Wir brauchen die id und den title der Kategorie. Überprüfen Sie die Dateien und verwenden Sie die gleiche Rechtschreibung, um die Category -Fallklasse zu erstellen.
Das Skelett der Factory wurde bereits zur Verfügung gestellt. Stellen Sie sicher, dass Sie die logische Struktur verstehen.
Delivery in Form eines Connector , die Eingänge abzurufen. Eine andere Delivery fungiert als SparkRepository , wo wir die Ausgabe der Transformation schreiben werden. Schauen Sie sich die id jeder Delivery und die deliveryId in App.scala an. Sie werden verwendet, so dass es keine Unklarheiten gibt, wenn SETL die Repositorys abrufen. Um die beiden früheren Lieferungen lesen zu können, werden wir zwei weitere Variablen verwenden: einen DataFrame zum Lesen des Connector und einen Dataset zum Speichern des Ausgabe SparkRepository . Der Unterschied zwischen ihnen besteht darin, dass ein SparkRepository getippt wird, daher der Dataset .Factory sind vier Funktionen erforderlich:read : Die Idee ist, den Connector oder die Abgabeeingänge SparkRepository Delivery zu nehmen, sie bei Bedarf vorzuarbeiten und in Variablen zu speichern, um sie in der nächsten Funktion zu verwenden.process : Hier werden alle Datentransformationen durchgeführt. Erstellen Sie eine Instanz des von Ihnen verwendeten Transformer , rufen Sie die transform() -Methode auf, verwenden Sie den transformed Getter und speichern Sie das Ergebnis in eine Variable.write : Wie der Name schon sagt, wird es verwendet, um die Ausgabe der Transformationen nach ihrer Abschluss zu speichern. Ein Connector verwendet die write() -Methode, um einen DataFrame zu speichern, und ein SparkRepository verwendet die save() -Methode, um einen Dataset zu speichern.get : Diese Funktion wird verwendet, um die Ausgabe in die nächste Stage der Pipeline zu übergeben. Geben Sie einfach den Dataset zurück.process kann es mehrere Transformer geben. Wir werden versuchen, diese Struktur während des Restes des Projekts zu befolgen.Factory automatisch durch die get -Funktion auf die nächste Stage übertragen. Das Schreiben der Ausgabe jeder Factory ist jedoch leichter für die Visualisierung und Debuggierung. Auch hier wurde das Skelett des Transformer bereits zur Verfügung gestellt. Sie sind jedoch derjenige, der die Datenumwandlung schreibt.
Transformer nimmt ein Argument. Normalerweise ist es der DataFrame oder der Dataset , den wir verarbeiten möchten. Abhängig von Ihrer Bewerbung können Sie andere Argumente hinzufügen.transformedData ist die Variable, die das Ergebnis der Datenumwandlung speichert.transformed ist der Getter, der von einer Factory aufgerufen wird, um das Ergebnis der Datenumwandlung abzurufen.transform() ist die Methode, die die Datentransformationen durchführt.items auswählen. Wenn Sie sich die Kategoriendateien ansehen, finden Sie die Informationen, die wir benötigen, in diesem Feld.items ist jedoch ein Array. Wir möchten dieses Array explodieren und nur das id -Feld und das title aus dem snippet -Feld nehmen. Verwenden Sie dazu die explode von org.apache.spark.sql.functions . Um bestimmte Felder zu erhalten, verwenden Sie dann die withColumn -Methode und die getField() -Methode auf id, snippet und title . Vergessen Sie nicht, die Typen entsprechend der von Ihnen erstellten Fallklasse zu gießen.id und die title aus. Geben Sie dann den DataFrame in einen Datensatz mit as[T] in einen Datensatz.Transformer geschrieben. Um zu sehen, was es tut, können Sie die bereits erstellte App.scala -Datei ausführen. Es wird einfach die Factory ausgeführt, die den gerade geschriebenen Transformer enthält, und es wird das Ergebnis auf den Pfad der Konfigurationsdatei ausgeben. Beachten Sie, dass die entsprechende Factory über addStage() hinzugefügt wurde, wodurch die Pipeline es ausführt.@Delivery Connector deliveryIdTransformer in der process einer Factory .write einer Factory . Verarbeiten wir nun die Videosdateien. Wir möchten alle Dateien in einem einzelnen DataFrame / Dataset oder in derselben CSV -Datei zusammenführen, während die Informationen der Region für jedes Video aufbewahrt werden. Alle Videosdateien sind CSV -Dateien und haben dieselben Spalten, wie bereits im Kontextabschnitt angegeben. Der Workflow ähnelt dem letzten: Konfiguration; Fallklasse; Factory ; Transformer ; Fügen Sie die Stage in die Pipeline hinzu. Dieses Mal setzen wir mehrere Konfigurationsobjekte ein.
Wir werden mehrere Konfigurationsobjekte in resources/local.conf , eine pro Region, festlegen. In jedem Konfigurationsobjekt müssen Sie storage, path, inferSchema, delimiter, header, multiLine und dateFormat festlegen.
videos<region>Repository anzugeben.Factory festzulegen. Erstellen Sie eine Fallklasse mit dem Namen Video im entity . Untersuchen Sie nun in den Videosdateien die Felder, die wir benötigen. Erinnern Sie daran, dass das Ziel darin besteht, die Beliebtheit zu berechnen und dass die Formel number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight . Es wird helfen, die Felder auszuwählen.
Erstellen Sie eine andere Fallklasse namens VideoCountry . Es wird genau die gleichen Felder wie Video haben, aber mit dem Feld Land/Region zusätzlich.
@ColumnName Annotation des Framework ansehen. Versuchen Sie, es zu verwenden, da es in einigen Geschäftssituationen im realen Leben nützlich sein kann.java.sql.Date für ein Datumstypfeld. Wir möchten den videoId , title , channel_title , die category_id , trending_date , views , likes , dislikes , comment_count , comments_disabled und video_error_or_removed -Felder haben.
Das Ziel dieser Fabrik ist es, alle Videosdateien in eine einzelne zusammenzuführen, ohne die Regionsinformationen zu entfernen. Das bedeutet, dass wir zwei Arten von Transformer verwenden werden.
Delivery in Form eines SparkRepository[Video] ein. Legen Sie eine letzte Delivery als SparkRepository[VideoCountry] fest, wo wir die Ausgabe der Transformation schreiben werden. Legen Sie so viele Variablen Dataset[Video] fest wie die Anzahl der Eingänge.Factory :read : Vorbereitet das SparkRepository durch Filtern der entfernten Videos oder Fehler . Dann "wirken" sie als Dataset[Video] und speichern Sie sie in die entsprechenden Variablen.process : Wenden Sie den ersten Transformer für jede der Eingänge an und wenden Sie die Ergebnisse auf den zweiten Transformer an.write : Schreiben Sie den Ausgabe SparkRepository[VideoCountry] .get : Geben Sie einfach das Ergebnis des endgültigen Transformer zurück.Connector verwendet, um die Eingabedateien und einen SparkRepository für die Ausgabe zu lesen?SparkRepository verwendet, um die Eingänge zu lesen, um eine Struktur für die Eingabedateien bereitzustellen.SparkRepository und viele entsprechende Variablen gibt, und ich finde das nicht hübsch/konsistent. Gibt es nicht eine andere Lösung?Delivery in Form eines SparkRepository zu verwenden, können Sie Lieferungen in Form eines Dataset mit autoLoad = true verwenden. Also, anstatt: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
Das Hauptziel des ersten Transformer ist es, die Region/Länderinformationen hinzuzufügen. Erstellen Sie einen Transformer , der zwei Eingänge nimmt, einen Dataset[Video] und eine Zeichenfolge. Fügen Sie das country hinzu und geben Sie einen Dataset[VideoCountry] . Sie können auch die Videos filtern, die als entfernt oder fehlerhaft gekennzeichnet sind. Natürlich kann dieser letzte Schritt anderswo platziert werden.
Das Hauptziel des zweiten Transformer ist es, alle Videos gemeinsam zu gruppieren und gleichzeitig die Regionsinformationen beizubehalten.
reduce und union . Um das Ergebnis Ihrer Arbeit zu überprüfen, gehen Sie zu App.scala , setzen Sie die SparkRepositories , fügen Sie die VideoFactory hinzu und führen Sie den Code aus. Es wird die Ausgabedatei im entsprechenden Pfad erstellt.
Connector als auch SparkRepository .Deliveries in einen Transformer oder einen Connector .Transformers in einer Factory .Da ein Video für einen Tag und am nächsten Tag ein Top -Trend sein kann, haben es unterschiedliche Zahlen in Bezug auf Ansichten, Vorlieben, Abneigungen, Kommentare ... als Folge müssen wir die neuesten Statistiken für ein einzelnes Video für jede Region abrufen. Gleichzeitig werden wir die Anzahl der Trendtage für jedes Video berechnen.
Aber wie machen wir das? Zunächst werden wir die Datensätze gruppieren, die demselben Video entsprechen, und die Anzahl der Datensätze zählen, was im Grunde die Anzahl der Trendtage ist. Dann werden wir diese gruppierten Aufzeichnungen einstufen und die neueste nehmen, um die neuesten Statistiken abzurufen.
Die Konfigurationsdatei für die Ausgabe von VideoFactory ist bereits in der vorherigen Leistung festgelegt, damit sie gespeichert werden kann. Sie müssen es lesen und verarbeiten, um die neuesten Videos -Statistiken zu erhalten. Vergessen Sie nicht, eine Konfigurationsdatei für die Ausgabe dieser neuen Factory hinzuzufügen.
Erstellen Sie eine Fallklasse mit dem Namen VideoStats , die ähnliche Felder wie VideoCountry haben. Sie müssen jedoch die Anzahl der Trendtage berücksichtigen.
In dieser Fabrik müssen Sie lediglich die Eingabe lesen, an den Transformer weitergeben, der die Datenverarbeitung durchführt und die Ausgabe schreibt. Es sollte ziemlich einfach sein; Sie können versuchen, die anderen Factories zu imitieren.
Deliveries einzustellen. Wie bereits erwähnt, werden wir die Videos zusammen gruppieren. Dafür verwenden wir org.apache.spark.sql.expressions.Window . Stellen Sie sicher, dass Sie wissen, was ein Window im Voraus macht.
Window , durch das Sie sich partitionieren, um die Anzahl der Trendtage für jedes Video zu zählen. Um zu wissen, auf welchen Feldern Sie sich partitionieren möchten, schauen Sie sich an, welche Felder für ein einzelnes Video gleich sein werden.Window , mit dem die Videos nach ihrem Trenddatum eingestuft werden. Durch die Auswahl des letzten Datums können wir die neuesten Statistiken jedes Videos abrufen.Windows erstellt haben, können Sie jetzt neue Spalten trendingDays für die Anzahl der Trendtage hinzufügen und für die Rangliste des Trenddatums durch absteigende Reihenfolge rank .rank und nehmen Sie nur die Aufzeichnungen mit dem rank 1.DataFrame in Dataset[VideoStats] .partitionBy und orderBy -Methoden für das Window verwenden. und die count der rank von org.apache.spark.sql.functions bei der Arbeit mit dem Dataset . Um das Ergebnis Ihrer Arbeit zu überprüfen, gehen Sie zu App.scala , setzen Sie die SparkRepositories , fügen Sie die Bühne hinzu und führen Sie den Code aus. Es wird die Ausgabedatei im entsprechenden Pfad erstellt.
Pipeline betreibt.Connector und ein SparkRepository ist und wie Sie Deliveries von ihnen festlegen. Wir werden jetzt die Popularitätsbewertung jedes Videos berechnen, nachdem wir ihre neuesten Statistiken erhalten haben. Wie bereits gesagt, ist unsere Formel sehr einfach und kann die Realität nicht darstellen. Erinnern Sie sich daran, dass die Formel views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight . Mit dem vorherigen Ergebnis von VideoStats werden wir einfach die Formel anwenden und die Daten nach der höchsten Punktzahl auf die niedrigste sortieren.
Dies ist die letzte Datenumwandlung. Legen Sie die Konfiguration so ein, dass Sie diesen letzten Dataset[VideoStats] . Um die für die Formel verwendeten Konstanten hinzuzufügen, müssen Sie Inputs in der Pipeline einstellen. Verwenden Sie vor dem Hinzufügen von Stufen in der Pipeline setInput[T](<value>, <id>) , um die Konstanten festzulegen. Diese Eingaben können jederzeit in Factories abgerufen werden, sobald der Pipeline hinzugefügt wurde.
Hier wird kein Unternehmen benötigt. Wir werden einfach die vorherigen Daten sortieren und die für die Berechnung der Punktzahl verwendeten Spalten fallen, damit wir die VideoStats -Stufe weiterhin verwenden können.
In dieser Fabrik müssen Sie lediglich die Eingabe lesen, an den Transformer weitergeben, der die Datenverarbeitung durchführt und die Ausgabe schreibt. Es sollte ziemlich einfach sein; Sie können versuchen, die anderen Factories zu imitieren.
Deliverable Eingänge und Ausgänge einzustellen: Connector , SparkRepository und/oder Input .Normalisieren wir die Anzahl der Likes/Abneigungen über die Anzahl der Ansichten. Teilen Sie für jeden Datensatz die Anzahl der Likes durch die Anzahl der Ansichten und dann die Anzahl der Abneigungen durch die Anzahl der Ansichten. Danach erhalten Sie den Prozentsatz der "normalisierten" Likes.
Normalisieren wir nun die Anzahl der Kommentare. Teilen Sie für jeden Datensatz die Anzahl der Kommentare nach der Anzahl der Ansichten.
Wir können jetzt die Popularitätsbewertung berechnen. Erinnern Sie sich daran, dass die Formel lautet: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Es gibt jedoch Videos, in denen Kommentare deaktiviert sind. In diesem Fall wird die Formel: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Wir haben willkürlich entschieden, die Gewichte zu sein:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when und otherwise von org.apache.spark.sql.functions funktioniert. Sortieren Sie die score in absteigender Reihenfolge und nehmen Sie die 100 ersten Rekorde. Sie haben jetzt die 100 "beliebtesten" Videos aus den 10 Regionen.
Um das Ergebnis Ihrer Arbeit zu überprüfen, gehen Sie zu App.scala , setzen Sie die Inputs ein, wenn sie nicht bereits festgelegt sind, setzen Sie das Ausgabe SparkRepository ein, fügen Sie die Bühne hinzu und führen Sie den Code aus. Es wird die Ausgabedatei im entsprechenden Pfad erstellt.
Deliveries : Input , Connector und SparkRepository mit deliveryId .Stage , einschließlich der Factory und der Transformer(s) .Wenn Ihnen dieses Projekt gefallen hat, lesen Sie bitte das SETL-Framework hier: https://github.com/setl-developers/setl, und warum nicht Ihren Beitrag bringen!