YouTube Setlは、SETLフレームワークを練習するための出発点を提供することを目的とするプロジェクトです:https://github.com/setl-developers/setl。アイデアは、抽出、変換、荷重操作を含むコンテキストプロジェクトを提供することです。エクササイズには、イージーモード、通常モード、ハードモードの3つのレベルの難易度があります。
使用されるデータは、Kaggle、https://www.kaggle.com/datasnaek/youtube-newからです。
ScalaとApache Sparkを使用して、このプロジェクトにJetbrains Intellij Idea Community Editionを使用しました。
このデータは、カナダ(CA)、ドイツ(DE)、フランス(FR)、イギリス(GB)、インド(IN)、日本(JP)、韓国(KR)、メキシコ(MX)、ロシア(RU)、米国(米国)である倍数の地域で分割されています。これらの各地域には、2つのファイルがあります。

毎日、YouTubeは、各国で最もトレンドのビデオを約200枚提供しています。 YouTubeは、完全に公開されていない要因の組み合わせに基づいて、ビデオがどれだけ流行しているかを測定します。このデータセットは、日常のトップトレンドビデオのコレクションで構成されています。結果として、同じビデオが複数回表示される可能性があります。つまり、複数日にわたって流行しています。
基本的に、アイテムフィールドの要素を使用すると、CSVファイルのcategory_idフルネームカテゴリにマッピングできます。
このデータセットを分析し、「人気のある」ビデオを決定します。しかし、人気のあるビデオをどのように定義しますか?ビデオの人気を定義します。その数の見解、好き嫌い、コメントの数、トレンド日の数に基づいています。
この定義は明らかに議論の余地があり、arbitrary意的であり、ビデオの人気の最良の定義を調べようとはしていません。このプロジェクトの目的、Setlフレームワークの実践にのみ焦点を当てます。
このプロジェクトの目標は、100の最も人気のあるビデオと、最も人気のあるビデオカテゴリを見つけることです。しかし、ビデオの人気をどのように定義しましたか?フォーミュラは次のとおりです。
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight !
好きな割合は、嫌いなものよりもいいね!この比率は、ビューの数にわたって正規化されます。同じ正規化は、コメントの数で行われます。
以下は、プロジェクトを実現するための各難易度レベルの指示です。難易度レベルごとに、特定のブランチでレポをクローンして開始プロジェクトを作成できます。
このプロジェクトでは、ScalaとApache Sparkの基本的な知識がすでにあると想定しています。
entityで構成されています。変圧器を含むfactory 。データ変換を含むtransformer 。FactoryまたはTransformerを作成する場合、 Ctrl+iを使用して必要な関数を自動的に作成できます。 最初にやろうとしていることは、もちろん、入力を読むことです。CSVファイル、ビデオファイル、およびJSONファイルを呼び出し、カテゴリファイルです。
カテゴリファイルから始めましょう。すべてのカテゴリファイルはJSONファイルです。カテゴリを表すケースクラスを作成し、カテゴリファイルをケースクラスに処理するTransformerを備えたFactory作成します。
local.confファイルをご覧ください。カテゴリファイルを読み取るために、オブジェクトが既に作成されています。org.apache.spark.sql.functionsの爆発関数を見てみてください。coalesceを使用することを忘れないでください。これで、ビデオファイルを使用できます。同様に、入力を読むためのビデオを表すケースクラスを作成し、処理を行う1つまたは複数のTransformersを備えたFactory作成します。ビデオファイルは地域から分離されているため、データセット内の各レコードの領域情報はありません。ビデオと非常によく似た別のケースクラスのビデオカントリーを使用して、この情報を追加し、すべてのレコードを単一のデータフレーム/データセットにマージします。
Transformersが役立ちます。1つはcountryコラムを追加するため、もう1つはすべてのビデオを単一のデータセットに統合するためです。ビデオは1日と翌日のトップのトレンドになる可能性があるため、ビデオが複数の行を持つ可能性があります。それぞれがビュー、好き嫌い、コメントの点で異なる数字を持っています...結果として、これらの統計が漸進的であるため、各地域で1つのビデオで利用可能な最新の統計を取得する必要があります。同時に、すべてのビデオのトレンド日数を計算します。
以前のケースクラスと非常によく似たケースクラスのビデオスタットを作成しますが、トレンド日の情報があります。
まず、各ビデオのトレンド日数を計算します。
org.apache.spark.sql.functionsのwindow関数を見てください。最新の統計を取得するには、各ビデオの最新のトレンド日を取得する必要があります。実際、利用可能な最新の統計です。
windowを作成する必要があります。最初のものは、トレンド日数の数を計算するためのもので、2番目は最新の統計を取得することでした。rank関数を使用することです。地域ごとに結果を並べ替え、トレンドの日数、ビュー、いいね、コメントを並べ替えます。次の成果のためにデータを準備します。
最新の統計を取得した後、各ビデオの人気スコアを計算する予定です。前述のように、私たちの式は非常に単純で、現実を表していないかもしれません。
ビューの数にわたっていい/嫌いな人の数を正規化しましょう。各レコードについて、いいね!の数をビューの数で除算し、それから嫌いな数をビューの数で除算します。その後、「正規化された」いいね!の割合を取得します。
次に、コメントの数を正規化しましょう。各レコードについて、コメントの数をビューの数で除算します。
これで、人気スコアを計算できます。式は次のとおりです。 views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 。
ただし、コメントが無効になっているビデオがあります。この場合、式は次のようになります: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) 。私たちは、重みが次のとおりであることをarbitrarily意的に決定しました。
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05それらをInputとして設定して、簡単に変更できるようにします。
org.apache.spark.sql.functionsからwhen 、 otherwise機能をチェックしてください。 scoreで下降順序で並べ替え、100の最初のレコードを取得します。現在、10の地域から最も人気のある100のビデオがあります。
最初にやろうとしているのは、もちろん、入力を読むことです。CSVファイル、ビデオファイルとJSONファイルを呼び出し、カテゴリファイルです。
カテゴリファイルから始めましょう。すべてのカテゴリファイルはJSONファイルです。ワークフローは次のとおりです。読み取るカテゴリファイルを示す構成ファイルを定義します。カテゴリを表すケースクラスを作成します。次に、カテゴリファイルをケースクラスに処理するTransformerを備えたFactory 。最後に、 PipelineにStageを追加して変換をトリガーします。
構成オブジェクトは、すでにresources/local.confで作成されています。 storageとpathオプションで注意してください。それに応じてカテゴリファイルを移動します。複数のファイルが同じフォルダー内にあり、フォルダーがパスとして使用されている場合、 Setlはファイルを単一のファイルのパーティションとして考慮します。次に、 App.scalaをご覧ください。 setConnector()およびsetSparkRepository()メソッドを使用したことがわかります。リポジトリを使用するたびに、構成に構成を追加し、 setlオブジェクトに登録する必要があります。
entityフォルダーにCategoryという名前のケースクラスを作成します。ここで、カテゴリファイルで、必要なフィールドを調べます。
idとカテゴリのtitleが必要です。ファイルを確認し、同じスペルを使用してCategoryケースクラスを作成してください。
Factoryのスケルトンはすでに提供されています。論理構造を理解してください。
Connectorの形でDelivery 、入力を取得できます。別のDelivery 、 SparkRepositoryとして機能し、変換の出力を書き込みます。各DeliveryのidとApp.scalaのdeliveryIdチェックしてください。それらが使用されるため、 setlがリポジトリを取得するときに曖昧さはありません。以前の2つの配信を読み取るために、 Connectorを読むためのDataFrameと、出力SparkRepository保存するためのDataset 2つの他の変数を使用します。それらの違いは、 SparkRepositoryが入力されるため、 Datasetが入力されることです。Factoryには4つの機能が必要です。read :アイデアは、 ConnectorまたはSparkRepository Delivery入力を取得し、必要に応じてそれらを前処理し、それらを変数に保存して次の関数で使用することです。process :すべてのデータ変換が行われる場所です。使用しているTransformerのインスタンスを作成し、 transform()メソッドを呼び出し、 transformedゲッターを使用して、結果を変数に保存します。write :その名前が示すように、それは変換が完了した後に変換の出力を保存するために使用されます。 Connector write()メソッドを使用してDataFrameを保存し、 SparkRepositoryはsave()メソッドを使用してDatasetを保存します。get :この関数は、出力をPipelineの次のStageに渡すために使用されます。 Datasetを返すだけです。process関数には、複数のTransformerが存在する可能性があります。私たちは、プロジェクトの残りの部分を通してこの構造に従うようにします。Factoryの結果は、 get機能を介して次のStageに自動的に転送されます。ただし、すべてのFactoryの出力を作成するのは、視覚化とデバッグが簡単です。繰り返しますが、 Transformerの骨格はすでに提供されています。ただし、データ変換を書く人になります。
Transformerは議論をします。通常、処理するのはDataFrameまたはDatasetです。アプリケーションに応じて、他の引数を追加できます。transformedData 、データ変換の結果を保存する変数です。transformed 、データ変換の結果を取得するためにFactoryで呼び出されるゲッターです。transform()は、データ変換を行う方法です。itemsフィールドを選択します。カテゴリファイルをチェックすると、必要な情報はこのフィールドにあります。itemsフィールドは配列です。この配列を爆発させ、 snippetフィールドからidフィールドとtitleフィールドのみを採用したいと考えています。それを行うには、 org.apache.spark.sql.functionsのexplode関数を使用します。次に、特定のフィールドを取得するには、 id, snippet 、およびtitleでwithColumnメソッドとgetField()メソッドを使用します。作成したケースクラスにそれに応じてタイプをキャストすることを忘れないでください。idとtitle列を選択します。次に、データフレームをas[T]でデータセットにキャストします。Transformer書き終えました。それが何をするかを確認するには、すでに作成されているApp.scalaファイルを実行できます。作成したばかりのTransformerを含むFactoryを単に実行するだけで、結果を構成ファイルのパスに出力します。対応するFactory 、 Pipelineを実行するaddStage()を介して追加されていることに注意してください。Connectorのセットアップ、 @Deliveryアノテーションを使用して、 deliveryIdを使用します。Factoryのprocess方法でTransformerを使用します。Factoryのwrite方法。ビデオファイルを処理しましょう。各ビデオの情報を保持しながら、単一のDataFrame / Datasetまたは同じCSVファイルにすべてのファイルをマージします。すべてのビデオファイルはCSVファイルであり、コンテキストセクションで前述したように同じ列があります。ワークフローは、最後のワークフローに似ています。構成。ケースクラス; Factory ; Transformer ; StageをPipelineに追加します。今回は、複数の構成オブジェクトを設定します。
resources/local.confに複数の構成オブジェクトを設定します。各構成オブジェクトでは、 storage, path, inferSchema, delimiter, header, multiLine 、 dateFormatを設定する必要があります。
videos<region>Repositoryのような一般的な名前を付けてください。Factoryの出力を書き込むための構成オブジェクトを設定することを忘れないでください。entityフォルダーにVideoという名前のケースクラスを作成します。ここで、ビデオファイルで、必要なフィールドを調べます。目的は人気スコアを計算することであり、フォーミュラnumber of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight !フィールドを選択するのに役立ちます。
VideoCountryという名前の別のケースクラスを作成します。 Videoとまったく同じフィールドがありますが、さらに国/地域のフィールドがあります。
@ColumnNameアノテーションを見ることができます。実際のビジネス状況では役立つ可能性があるため、使用してください。java.sql.Dateを使用します。videoId 、 title 、 channel_title 、 category_id 、 trending_date 、 views 、 likes 、 dislikes 、 comment_count 、 comments_disabled 、 video_error_or_removed vededフィールドが必要です。
この工場の目標は、地域の情報を削除せずに、すべてのビデオファイルを単一のビデオに統合することです。つまり、2種類のTransformerを使用することを意味します。
Delivery SparkRepository[Video]の形式で設定します。最後のDelivery SparkRepository[VideoCountry]として設定し、変換の出力を書きます。入力数と同じ数の変数Dataset[Video]を設定します。Factoryの4つの機能について説明しましょう。read :削除された動画またはエラーをフィルタリングすることにより、 SparkRepository前処理します。次に、それらをDataset[Video]として「キャスト」し、対応する変数に保存します。process :各入力に最初のTransformerを適用し、結果を2番目のTransformerに適用します。write :出力SparkRepository[VideoCountry]を書きます。get :最終Transformerの結果を返してください。Connectorを使用して出力用の入力ファイルとSparkRepositoryを読み取らなかったのはなぜですか?SparkRepositoryを使用して、入力ファイルの構造を提供するためだけに入力を読み取りました。SparkRepositoryと対応する変数がたくさんあると感じていますが、これはきれい/考慮事項が見つかりません。別の解決策はありませんか?SparkRepositoryの形でDeliveryを使用する代わりに、 autoLoad = trueオプションを使用してDatasetの形で配信を使用できます。だから、次のようにする代わりに @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
最初のTransformerの主な目標は、地域/国の情報を追加することです。 Dataset[Video]と文字列の2つの入力を取得するTransformerを構築します。列のcountryを追加して、 Dataset[VideoCountry]を返します。削除またはエラーとしてラベル付けされたビデオをフィルタリングすることもできます。もちろん、この最後のステップは他の場所に配置できます。
2番目のTransformerの主な目標は、地域の情報を保持しながら、すべてのビデオを一緒に再編成することです。
reduceとunion機能を使用します。作業の結果を確認するには、 App.scalaに移動し、 SparkRepositoriesを設定し、ステージVideoFactoryを追加し、コードを実行します。対応するパスに出力ファイルが作成されます。
ConnectorとSparkRepository両方を使用します。Deliveries TransformerまたはConnectorに読んでください。Factoryで複数のTransformersを使用してください。ビデオは1日と翌日にトップのトレンドになる可能性があるため、見解、好き嫌い、コメントの観点から異なる数字があります...結果として、各地域で1つのビデオで利用可能な最新の統計を取得する必要があります。同時に、すべてのビデオのトレンド日数を計算します。
しかし、どうやってそれをするつもりですか?まず、同じビデオに対応するレコードをグループ化し、基本的にトレンド日数であるレコードの数をカウントします。次に、これらのグループ化されたレコードをランク付けし、最新のレコードを取得して、最新の統計を取得します。
VideoFactoryの出力の構成ファイルは、以前の成果で既に設定されているため、保存できます。最新のビデオ統計を取得するには、それを読んで処理する必要があります。この新しいFactoryの出力に構成ファイルを追加することを忘れないでください。
VideoCountryと同様の分野を持つVideoStatsという名前のケースクラスを作成しますが、トレンド日数を考慮する必要があります。
この工場では、入力を読み取り、データ処理を行うTransformerに渡し、出力を書き込むだけです。それは非常に簡単なはずです。他のFactoriesを模倣しようとすることができます。
Deliveriesを設定することを忘れないでください。前述のように、ビデオをグループ化します。そのために、 org.apache.spark.sql.expressions.Windowを使用します。 Windowが事前に何をするかを確認してください。
Windowを作成します。どのフィールドが分割するかを知るには、単一のビデオでどのフィールドが同じになるかを見てください。Windowを作成します。最新の日付を選択することにより、各ビデオの最新の統計を取得できます。Windowsを作成した後、トレンド日数の新しい列trendingDaysを追加し、下降順序でトレンド日のランキングにrankことができます。rank 1のレコードのみを取得して、 rankでビデオをフィルタリングするだけです。DataFrame Dataset[VideoStats] 。WindowにpartitionByおよびorderByメソッドを使用する必要があります。 count 、 org.apache.spark.sql.functions Dataset rank方法。作業の結果を確認するには、 App.scalaに移動し、 SparkRepositoriesを設定し、ステージを追加して、コードを実行します。対応するパスに出力ファイルが作成されます。
Pipelineの実行方法。ConnectorとSparkRepositoryとは何か、そしてそれらのDeliveriesを設定する方法を理解してください。最新の統計を取得した後、各ビデオの人気スコアを計算する予定です。前述のように、私たちの式は非常に単純で、現実を表していないかもしれません。フォーミュラviews * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeightあることを思い出しましょう。 VideoStatsの以前の結果を使用して、単に式を適用し、最高のスコアでデータを最低に並べ替えます。
これが最後のデータ変換です。この最後のDataset[VideoStats] 。式に使用される定数を追加するには、 PipelineにInputs設定する必要があります。 Pipelineにステージを追加する前に、 setInput[T](<value>, <id>)を使用して定数を設定します。これらの入力は、 Pipelineに追加された任意のFactoriesでいつでも取得可能です。
ここではエンティティは必要ありません。以前のデータを並べ替えて、スコアの計算に使用される列をドロップして、 VideoStatsエンティティを使用できるようにします。
この工場では、入力を読み取り、データ処理を行うTransformerに渡し、出力を書き込むだけです。それは非常に簡単なはずです。他のFactoriesを模倣しようとすることができます。
Deliverableすることを忘れないでください: Connector 、 SparkRepository 、および/またはInput 。ビューの数にわたっていい/嫌いな人の数を正規化しましょう。各レコードについて、いいね!の数をビューの数で除算し、それから嫌いな数をビューの数で除算します。その後、「正規化された」いいね!の割合を取得します。
次に、コメントの数を正規化しましょう。各レコードについて、コメントの数をビューの数で除算します。
これで、人気スコアを計算できます。式は次のとおりです。 views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 。
ただし、コメントが無効になっているビデオがあります。この場合、式は次のようになります: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) 。私たちは、重みが次のとおりであることをarbitrarily意的に決定しました。
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05org.apache.spark.sql.functionsからwhen 、 otherwise機能をチェックしてください。 scoreで下降順序で並べ替え、100の最初のレコードを取得します。現在、10の地域から最も人気のある100のビデオがあります。
作業の結果を確認するには、 App.scalaに移動し、まだ設定されていない場合はInputsを設定し、出力SparkRepositoryを設定し、ステージを追加してコードを実行します。対応するパスに出力ファイルが作成されます。
deliveryIdを使用して、 Input 、 Connector 、 SparkRepository 3種類のDeliveriesを使用します。FactoryやTransformer(s)を含むStageを書きます。このプロジェクトが気に入ったら、setlフレームワークをご覧ください:https://github.com/setl-developers/setl、そしてあなたの貢献をもたらさないのはなぜですか!