YouTube SETL是一個旨在提供練習SETL框架的起點:https://github.com/setl-developers/setl。這個想法是給出一個涉及提取,轉換和加載操作的上下文項目。練習有三個級別的困難:簡單模式,正常模式和硬模式。
使用的數據來自Kaggle,https://www.kaggle.com/datasnaek/youtube-new。
我將Jetbrains Intellij Idea Community Edition與Scala和Apache Spark一起用於該項目。
數據分為多個地區:加拿大(CA),德國(DE),法國(FR),英國(GB),印度(IN),日本(JP),韓國(KR),墨西哥(MX),俄羅斯(RU),俄羅斯(RU)(RU)和美國(美國)。對於這些區域中的每個區域,都有兩個文件:

每天,YouTube提供了每個國家 /地區大約200個最流行的視頻。 YouTube根據未完全公開的因素的組合來衡量視頻有多少流行。該數據集由每天的頂級流行視頻集合。結果,同一視頻可能會多次出現,這意味著它正在趨勢多天。
基本上,項目字段的元素允許我們將CSV文件的category_id映射到全名類別。
我們將分析此數據集並確定“流行”視頻。但是,我們如何定義一個受歡迎的視頻?我們將根據視頻的觀點數量,喜歡,不喜歡,評論數量和趨勢天數來定義視頻的受歡迎程度。
這個定義顯然是有爭議的和任意的,我們並不希望找到視頻受歡迎程度的最佳定義。我們只專注於該項目的目的:使用SETL框架練習。
該項目的目的是找到100個最受歡迎的視頻和最“流行”的視頻類別。但是,我們如何定義視頻的受歡迎程度?公式將是:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight 。
喜歡百分比是喜歡的比例與不喜歡的比例。該比率在視圖數量上歸一化。評論數量進行相同的歸一化。
以下是每個難度級別實現項目的說明。對於每個難度級別,您可以將存儲庫與特定的分支一起克隆以進行起始項目。
對於此項目,我們假設您已經有了Scala和Apache Spark的基本知識。
entity ;包含變壓器的factory ;和包含數據變換的transformer 。Factory或Transformer時,可以使用Ctrl+i自動創建所需的功能。 當然,我們要做的第一件事是讀取輸入:CSV文件,我將調用視頻文件和JSON文件,類別文件。
讓我們從類別文件開始。所有類別文件都是JSON文件。創建一個代表類別的案例類,然後是具有Transformer的Factory ,該工廠將將類別文件處理為案例類。
local.conf文件。為了讀取類別文件,已經創建了一個對象。org.apache.spark.sql.functions查看爆炸函數。coalesce 。現在,我們可以使用視頻文件。同樣,創建一個代表用於讀取輸入的視頻的案例類,然後是一個或幾個可以執行Transformers的Factory 。由於視頻文件與區域分開,因此數據集中的每個記錄都沒有區域信息。嘗試通過使用與視頻非常相似的另一個案例類錄像帶添加此信息,並將所有記錄合併在單個數據框/數據集中。
Transformers將很有用:一個用於添加country列,一個用於將所有視頻合併到一個數據集中。由於視頻可以是一天和第二天的最佳趨勢,因此視頻有可能具有多行,其中每個視頻在觀點,喜歡,不喜歡,評論方面都有不同的數字……因此,我們必須檢索每個區域可用於單個視頻的最新統計信息,因為這些統計數據是遞增的。同時,我們將計算每個視頻的趨勢天數。
創建一個案例類視頻儀,它與以前的案例類非常相似,但是有趨勢的信息。
首先,計算每個視頻的趨勢天數。
org.apache.spark.sql.functions查看window函數。要檢索最新的統計數據,您必須檢索每個視頻的最新趨勢。實際上,這是最新的可用統計信息。
window 。第一個是用於計算趨勢天數,第二個用於檢索最新統計數據。rank函數。按區域,趨勢天數,視圖,喜歡以及評論的結果對結果進行排序。它將為下一個成就準備數據。
在獲得最新統計數據後,我們現在將計算每個視頻的受歡迎程度得分。如前所述,我們的公式非常簡單,可能不代表現實。
讓我們對視圖數量的喜歡/不喜歡的數量進行標準化。對於每個記錄,將喜歡的數量除以視圖數量,然後除以視圖數量。之後,獲得“歸一化”喜歡的百分比。
現在,讓我們將評論數量歸一化。對於每個記錄,將註釋數除以視圖數。
現在,我們可以計算受歡迎程度得分。提醒該公式為: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 。
但是,有些視頻被禁用了評論。在這種情況下,公式將變為: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) 。我們任意決定權重為:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05將它們設置為Input ,以便可以輕鬆修改它們。
org.apache.spark.sql.functions中查看when和otherwise功能。按降序按score排序,並獲取100個第一記錄。現在,您擁有10個地區的100個最“流行”的視頻。
當然,我們要做的第一件事是閱讀輸入:CSV文件,我將調用視頻文件和JSON文件,類別文件。
讓我們從類別文件開始。所有類別文件都是JSON文件。這是工作流程:我們將定義一個配置文件,該文件將指示要讀取的類別文件;創建代表類別的案例類;然後是帶有Transformer的Factory ,該工廠將將類別文件處理到案例類中。最後,我們將在Pipeline中添加Stage以觸發轉換。
配置對像已經在resources/local.conf中創建。注意storage和path選項。相應地移動類別文件。如果多個文件位於同一文件夾中,並且該文件夾用作路徑,則SETL將將文件視為單個文件的分區。接下來,查看App.scala 。您可以看到我們使用了setConnector()和setSparkRepository()方法。每次您要使用存儲庫時,都需要在配置中添加配置並將其註冊在setl對像中。
在entity文件夾中創建一個名為Category的案例類。現在,在類別文件中檢查我們將需要的字段。
我們將需要類別的id和title 。確保檢查文件並使用相同的拼寫來創建Category案例類。
已經提供了Factory的骨骼。確保您了解邏輯結構。
Connector形式的Delivery使我們能夠檢索輸入。另一個Delivery將充當SparkRepository ,我們將在其中編寫轉換的輸出。查看App.scala中每個Delivery的id和deliveryId的ID。它們被使用,因此Setl獲取存儲庫時沒有歧義。為了能夠讀取之前的兩個交付,我們將使用其他兩個變量:用於讀取Connector DataFrame ,以及用於存儲輸出SparkRepository Dataset 。它們之間的區別在於, SparkRepository鍵入了Dataset 。Factory需要四個功能:read :這個想法是要接收Connector或SparkRepository Delivery輸入輸入,如果需要,預處理預處理,然後將它們存儲到變量中以將它們在下一個功能中使用。process :這是所有數據轉換將進行的地方。創建您使用的Transformer的實例,調用transform()方法,使用transformed Getter並將結果存儲到變量中。write :顧名思義,它用於保存轉換完成後的輸出。 Connector使用write()方法保存DataFrame ,而SparkRepository則使用save()方法來保存Dataset 。get :此功能用於將輸出傳遞到Pipeline的下一個Stage 。只需返回Dataset即可。process函數中,可以有多個Transformer 。我們將嘗試在整個項目的其餘部分遵循這種結構。Factory的結果將通過get功能自動傳輸到下一Stage 。但是,編寫每個Factory的輸出將更容易進行可視化和調試。同樣,已經提供了Transformer的骨骼。但是,您將是編寫數據轉換的人。
Transformer提出了爭議。通常,是我們要處理的DataFrame或Dataset 。根據您的應用程序,您可以添加其他參數。transformedData是將存儲數據轉換結果的變量。transformed是Factory將調用以檢索數據轉換結果的Getter。transform()是將進行數據轉換的方法。items ”字段。如果您查看類別文件,我們需要的信息在此字段上。items字段是一個數組。我們想爆炸此數組,並僅從snippet字段中獲取id字段和title字段。為此,請使用org.apache.spark.sql.functions的explode函數。然後,要獲取特定字段,請在id, snippet和title上使用withColumn方法和getField()方法。不要忘記將類型相應地投入您創建的案例類。id和title列。然後,將數據框放入as[T]數據集中。Transformer 。要查看它的作用,您可以運行已經創建的App.scala文件。它只是運行包含您剛剛編寫的Transformer的Factory ,然後將結果輸出到配置文件的路徑。請注意,已通過addStage()添加了相應的Factory ,使Pipeline運行。@Delivery註釋設置Connector ,並使用deliveryId 。Factory的process方法中使用Transformer 。Factory的write方法。現在讓我們處理視頻文件。我們想在單個DataFrame / Dataset或同一CSV文件中合併所有文件,同時保留每個視頻的區域信息。所有視頻文件都是CSV文件,它們具有相同的列,如上下文部分中所述。工作流與最後一個類似:配置;案例類; Factory ; Transformer ;將Stage添加到Pipeline中。這次,我們將設置多個配置對象。
我們將在每個區域中設置resources/local.conf中的多個配置對象。在每個配置對像中,您必須設置storage, path, inferSchema, delimiter, header, multiLine和dateFormat 。
videos<region>Repository 。Factory輸出的配置對象。在entity文件夾中創建一個名為Video的案例類。現在,在視頻文件中檢查我們將需要的字段。提醒目標是計算流行度得分,並且公式是number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight 。它將有助於選擇字段。
創建另一個名為VideoCountry的案例類。它的字段將與Video完全相同,但還有國家/地區領域。
@ColumnName註釋。嘗試使用它,因為它在某些現實生活中很有用。java.sql.Date用於日期類型字段。我們想擁有videoId , title , channel_title , category_id , trending_date , views , likes ,likes, dislikes , comment_count , comments_disabled和video_error_or_removed fields。
該工廠的目的是將所有視頻文件合併到單個視頻文件中,而無需刪除區域信息。這意味著我們將使用兩種Transformer 。
Delivery SparkRepository[Video] 。將最後一個Delivery作為SparkRepository[VideoCountry] ,我們將在其中編寫轉換的輸出。將盡可能多的變量Dataset[Video]設置為輸入數量。Factory的四個功能:read :通過過濾被刪除或錯誤的視頻,預處理SparkRepository 。然後,將它們“鑄造”為Dataset[Video] ,並將它們存儲到相應的變量中。process :將第一個Transformer應用於每個輸入,然後將結果應用於第二個Transformer 。write :編寫輸出SparkRepository[VideoCountry] 。get :只需返回最終Transformer的結果即可。Connector讀取輸入文件和輸出的SparkRepository ?SparkRepository讀取輸入只是為了提供輸入文件的結構。SparkRepository和許多相應的變量,而且我認為這很漂亮。沒有其他解決方案嗎?autoLoad = true Option的Dataset的形式使用SparkRepository的Delivery方式。因此,而不是有: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
第一個Transformer的主要目標是添加區域/國家信息。構建一個輸入兩個輸入的Transformer ,一個Dataset[Video]和一個字符串。添加專欄country並返回Dataset[VideoCountry] 。您還可以過濾被標記為刪除或錯誤的視頻。當然,最後一步可以放置在其他地方。
第二個Transformer的主要目標是將所有視頻一起重新組合在一起,同時保留該區域信息。
reduce和union功能。要檢查工作的結果,請轉到App.scala ,設置SparkRepositories ,添加舞台VideoFactory ,然後運行代碼。它將在相應的路徑中創建輸出文件。
Connector和SparkRepository 。Deliveries讀取到Transformer或Connector中。Factory中使用多個Transformers 。由於視頻可以是一天的最佳趨勢,因此在觀點,喜歡,不喜歡,評論方面,它將具有不同的數字……因此,我們必須檢索每個區域可用於單個視頻的最新統計信息。同時,我們將計算每個視頻的趨勢天數。
但是我們將如何做呢?首先,我們將對與同一視頻相對應的記錄進行分組,併計算記錄的數量,這基本上是趨勢天數。然後,我們將對這些分組記錄進行排名,並採用最新記錄,以檢索最新的統計數據。
在上一個成就中已經設置了VideoFactory輸出的配置文件,因此可以保存。您將需要閱讀並進行處理以獲取最新的視頻統計信息。不要忘記為該新Factory的輸出添加配置文件。
創建一個名為VideoStats的案例類別,該類別具有與VideoCountry相似的字段,但您需要考慮到趨勢天數。
在這個工廠中,您需要做的就是閱讀輸入,將其傳遞給可以進行數據處理的Transformer並寫入輸出。應該很簡單;您可以嘗試模仿其他Factories 。
Deliveries 。如前所述,我們將將視頻分組在一起。為此,我們將使用org.apache.spark.sql.expressions.Window 。確保您知道Window事先做什麼。
Window ,您將通過計算每個視頻的趨勢天數來分區。要知道您要通過哪些字段進行分區,請查看單個視頻的哪些字段相同。Window ,該窗口將在其趨勢日期之前用於對視頻進行排名。通過選擇最新日期,我們可以檢索每個視頻的最新統計信息。Windows後,您現在可以在趨勢天數中添加新的列trendingDays ,並通過降序訂購趨勢日期rank 。rank過濾視頻,只獲取rank 1的記錄。DataFrame投放到Dataset[VideoStats] 。Window使用partitionBy和orderBy方法;和count ,從org.apache.spark.sql.functions使用Dataset集時的rank方法。要檢查工作的結果,請轉到App.scala ,設置SparkRepositories ,添加階段並運行代碼。它將在相應的路徑中創建輸出文件。
Pipeline 。Connector和一個SparkRepository ,以及如何設置它們的Deliveries 。在獲得最新統計數據後,我們現在將計算每個視頻的受歡迎程度得分。如前所述,我們的公式非常簡單,可能不代表現實。讓我們想起,該公式是views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 。使用VideoStats的先前結果,我們將僅應用公式,然後將數據最高分為最低。
這是最後一個數據轉換。設置配置,以便您可以保存最後一個Dataset[VideoStats] 。要添加用於公式的常數,您需要在Pipeline中設置Inputs 。在添加Pipeline中的階段之前,請使用setInput[T](<value>, <id>)設置常數。這些輸入都可以在一旦添加到Pipeline中的任何Factories中可以檢索。
這裡不需要實體。我們將簡單地對先前的數據進行排序,然後刪除用於計算分數的列,以便我們仍然可以使用VideoStats實體。
在這個工廠中,您需要做的就是閱讀輸入,將其傳遞給可以進行數據處理的Transformer並寫入輸出。應該很簡單;您可以嘗試模仿其他Factories 。
Deliverable : Connector , SparkRepository和/或Input 。讓我們對視圖數量的喜歡/不喜歡的數量進行標準化。對於每個記錄,將喜歡的數量除以視圖數量,然後除以視圖數量。之後,獲得“歸一化”喜歡的百分比。
現在,讓我們將評論數量歸一化。對於每個記錄,將註釋數除以視圖數。
現在,我們可以計算受歡迎程度得分。提醒該公式為: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 。
但是,有些視頻被禁用了評論。在這種情況下,公式將變為: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) 。我們任意決定權重為:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05org.apache.spark.sql.functions中查看when和otherwise功能。按降序按score排序,並獲取100個第一記錄。現在,您擁有10個地區的100個最“流行”的視頻。
要檢查工作的結果,請轉到App.scala ,設置Inputs如果尚未設置),請設置輸出SparkRepository ,添加階段並運行代碼。它將在相應的路徑中創建輸出文件。
Deliveries : Input , Connector和SparkRepository ,以及deliveryId 。Stage ,包括Factory和Transformer(s) 。如果您喜歡此項目,請在此處查看SETL框架:https://github.com/setl-developers/setl,為什麼不帶您的貢獻!