YouTube SETL是一个旨在提供练习SETL框架的起点:https://github.com/setl-developers/setl。这个想法是给出一个涉及提取,转换和加载操作的上下文项目。练习有三个级别的困难:简单模式,正常模式和硬模式。
使用的数据来自Kaggle,https://www.kaggle.com/datasnaek/youtube-new。
我将Jetbrains Intellij Idea Community Edition与Scala和Apache Spark一起用于该项目。
数据分为多个地区:加拿大(CA),德国(DE),法国(FR),英国(GB),印度(IN),日本(JP),韩国(KR),墨西哥(MX),俄罗斯(RU),俄罗斯(RU)(RU)和美国(美国)。对于这些区域中的每个区域,都有两个文件:

每天,YouTube提供了每个国家 /地区大约200个最流行的视频。 YouTube根据未完全公开的因素的组合来衡量视频有多少流行。该数据集由每天的顶级流行视频集合。结果,同一视频可能会多次出现,这意味着它正在趋势多天。
基本上,项目字段的元素允许我们将CSV文件的category_id映射到全名类别。
我们将分析此数据集并确定“流行”视频。但是,我们如何定义一个受欢迎的视频?我们将根据视频的观点数量,喜欢,不喜欢,评论数量和趋势天数来定义视频的受欢迎程度。
这个定义显然是有争议的和任意的,我们并不希望找到视频受欢迎程度的最佳定义。我们只专注于该项目的目的:使用SETL框架练习。
该项目的目的是找到100个最受欢迎的视频和最“流行”的视频类别。但是,我们如何定义视频的受欢迎程度?公式将是:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight 。
喜欢百分比是喜欢的比例与不喜欢的比例。该比率在视图数量上归一化。评论数量进行相同的归一化。
以下是每个难度级别实现项目的说明。对于每个难度级别,您可以将存储库与特定的分支一起克隆以进行起始项目。
对于此项目,我们假设您已经有了Scala和Apache Spark的基本知识。
entity ;包含变压器的factory ;和包含数据变换的transformer 。Factory或Transformer时,可以使用Ctrl+i自动创建所需的功能。 当然,我们要做的第一件事是读取输入:CSV文件,我将调用视频文件和JSON文件,类别文件。
让我们从类别文件开始。所有类别文件都是JSON文件。创建一个代表类别的案例类,然后是具有Transformer的Factory ,该工厂将将类别文件处理为案例类。
local.conf文件。为了读取类别文件,已经创建了一个对象。org.apache.spark.sql.functions查看爆炸函数。coalesce 。现在,我们可以使用视频文件。同样,创建一个代表用于读取输入的视频的案例类,然后是一个或几个可以执行Transformers的Factory 。由于视频文件与区域分开,因此数据集中的每个记录都没有区域信息。尝试通过使用与视频非常相似的另一个案例类录像带添加此信息,并将所有记录合并在单个数据框/数据集中。
Transformers将很有用:一个用于添加country列,一个用于将所有视频合并到一个数据集中。由于视频可以是一天和第二天的最佳趋势,因此视频有可能具有多行,其中每个视频在观点,喜欢,不喜欢,评论方面都有不同的数字……因此,我们必须检索每个区域可用于单个视频的最新统计信息,因为这些统计数据是递增的。同时,我们将计算每个视频的趋势天数。
创建一个案例类视频仪,它与以前的案例类非常相似,但是有趋势的信息。
首先,计算每个视频的趋势天数。
org.apache.spark.sql.functions查看window函数。要检索最新的统计数据,您必须检索每个视频的最新趋势。实际上,这是最新的可用统计信息。
window 。第一个是用于计算趋势天数,第二个用于检索最新统计数据。rank函数。按区域,趋势天数,视图,喜欢以及评论的结果对结果进行排序。它将为下一个成就准备数据。
在获得最新统计数据后,我们现在将计算每个视频的受欢迎程度得分。如前所述,我们的公式非常简单,可能不代表现实。
让我们对视图数量的喜欢/不喜欢的数量进行标准化。对于每个记录,将喜欢的数量除以视图数量,然后除以视图数量。之后,获得“归一化”喜欢的百分比。
现在,让我们将评论数量归一化。对于每个记录,将注释数除以视图数。
现在,我们可以计算受欢迎程度得分。提醒该公式为: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 。
但是,有些视频被禁用了评论。在这种情况下,公式将变为: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) 。我们任意决定权重为:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05将它们设置为Input ,以便可以轻松修改它们。
org.apache.spark.sql.functions中查看when和otherwise功能。按降序按score排序,并获取100个第一记录。现在,您拥有10个地区的100个最“流行”的视频。
当然,我们要做的第一件事是阅读输入:CSV文件,我将调用视频文件和JSON文件,类别文件。
让我们从类别文件开始。所有类别文件都是JSON文件。这是工作流程:我们将定义一个配置文件,该文件将指示要读取的类别文件;创建代表类别的案例类;然后是带有Transformer的Factory ,该工厂将将类别文件处理到案例类中。最后,我们将在Pipeline中添加Stage以触发转换。
配置对象已经在resources/local.conf中创建。注意storage和path选项。相应地移动类别文件。如果多个文件位于同一文件夹中,并且该文件夹用作路径,则SETL将将文件视为单个文件的分区。接下来,查看App.scala 。您可以看到我们使用了setConnector()和setSparkRepository()方法。每次您要使用存储库时,都需要在配置中添加配置并将其注册在setl对象中。
在entity文件夹中创建一个名为Category的案例类。现在,在类别文件中检查我们将需要的字段。
我们将需要类别的id和title 。确保检查文件并使用相同的拼写来创建Category案例类。
已经提供了Factory的骨骼。确保您了解逻辑结构。
Connector形式的Delivery使我们能够检索输入。另一个Delivery将充当SparkRepository ,我们将在其中编写转换的输出。查看App.scala中每个Delivery的id和deliveryId的ID。它们被使用,因此Setl获取存储库时没有歧义。为了能够读取之前的两个交付,我们将使用其他两个变量:用于读取Connector DataFrame ,以及用于存储输出SparkRepository Dataset 。它们之间的区别在于, SparkRepository键入了Dataset 。Factory需要四个功能:read :这个想法是要接收Connector或SparkRepository Delivery输入输入,如果需要,预处理预处理,然后将它们存储到变量中以将它们在下一个功能中使用。process :这是所有数据转换将进行的地方。创建您使用的Transformer的实例,调用transform()方法,使用transformed Getter并将结果存储到变量中。write :顾名思义,它用于保存转换完成后的输出。 Connector使用write()方法保存DataFrame ,而SparkRepository则使用save()方法来保存Dataset 。get :此功能用于将输出传递到Pipeline的下一个Stage 。只需返回Dataset即可。process函数中,可以有多个Transformer 。我们将尝试在整个项目的其余部分遵循这种结构。Factory的结果将通过get功能自动传输到下一Stage 。但是,编写每个Factory的输出将更容易进行可视化和调试。同样,已经提供了Transformer的骨骼。但是,您将是编写数据转换的人。
Transformer提出了争议。通常,是我们要处理的DataFrame或Dataset 。根据您的应用程序,您可以添加其他参数。transformedData是将存储数据转换结果的变量。transformed是Factory将调用以检索数据转换结果的Getter。transform()是将进行数据转换的方法。items ”字段。如果您查看类别文件,我们需要的信息在此字段上。items字段是一个数组。我们想爆炸此数组,并仅从snippet字段中获取id字段和title字段。为此,请使用org.apache.spark.sql.functions的explode函数。然后,要获取特定字段,请在id, snippet和title上使用withColumn方法和getField()方法。不要忘记将类型相应地投入您创建的案例类。id和title列。然后,将数据框放入as[T]数据集中。Transformer 。要查看它的作用,您可以运行已经创建的App.scala文件。它只是运行包含您刚刚编写的Transformer的Factory ,然后将结果输出到配置文件的路径。请注意,已通过addStage()添加了相应的Factory ,使Pipeline运行。@Delivery注释设置Connector ,并使用deliveryId 。Factory的process方法中使用Transformer 。Factory的write方法。现在让我们处理视频文件。我们想在单个DataFrame / Dataset或同一CSV文件中合并所有文件,同时保留每个视频的区域信息。所有视频文件都是CSV文件,它们具有相同的列,如上下文部分中所述。工作流与最后一个类似:配置;案例类; Factory ; Transformer ;将Stage添加到Pipeline中。这次,我们将设置多个配置对象。
我们将在每个区域中设置resources/local.conf中的多个配置对象。在每个配置对象中,您必须设置storage, path, inferSchema, delimiter, header, multiLine和dateFormat 。
videos<region>Repository 。Factory输出的配置对象。在entity文件夹中创建一个名为Video的案例类。现在,在视频文件中检查我们将需要的字段。提醒目标是计算流行度得分,并且公式是number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight 。它将有助于选择字段。
创建另一个名为VideoCountry的案例类。它的字段将与Video完全相同,但还有国家/地区领域。
@ColumnName注释。尝试使用它,因为它在某些现实生活中很有用。java.sql.Date用于日期类型字段。我们想拥有videoId , title , channel_title , category_id , trending_date , views , likes ,likes, dislikes , comment_count , comments_disabled和video_error_or_removed fields。
该工厂的目的是将所有视频文件合并到单个视频文件中,而无需删除区域信息。这意味着我们将使用两种Transformer 。
Delivery SparkRepository[Video] 。将最后一个Delivery作为SparkRepository[VideoCountry] ,我们将在其中编写转换的输出。将尽可能多的变量Dataset[Video]设置为输入数量。Factory的四个功能:read :通过过滤被删除或错误的视频,预处理SparkRepository 。然后,将它们“铸造”为Dataset[Video] ,并将它们存储到相应的变量中。process :将第一个Transformer应用于每个输入,然后将结果应用于第二个Transformer 。write :编写输出SparkRepository[VideoCountry] 。get :只需返回最终Transformer的结果即可。Connector读取输入文件和输出的SparkRepository ?SparkRepository读取输入只是为了提供输入文件的结构。SparkRepository和许多相应的变量,而且我认为这很漂亮。没有其他解决方案吗?autoLoad = true Option的Dataset的形式使用SparkRepository的Delivery方式。因此,而不是有: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
第一个Transformer的主要目标是添加区域/国家信息。构建一个输入两个输入的Transformer ,一个Dataset[Video]和一个字符串。添加专栏country并返回Dataset[VideoCountry] 。您还可以过滤被标记为删除或错误的视频。当然,最后一步可以放置在其他地方。
第二个Transformer的主要目标是将所有视频一起重新组合在一起,同时保留该区域信息。
reduce和union功能。要检查工作的结果,请转到App.scala ,设置SparkRepositories ,添加舞台VideoFactory ,然后运行代码。它将在相应的路径中创建输出文件。
Connector和SparkRepository 。Deliveries读取到Transformer或Connector中。Factory中使用多个Transformers 。由于视频可以是一天的最佳趋势,因此在观点,喜欢,不喜欢,评论方面,它将具有不同的数字……因此,我们必须检索每个区域可用于单个视频的最新统计信息。同时,我们将计算每个视频的趋势天数。
但是我们将如何做呢?首先,我们将对与同一视频相对应的记录进行分组,并计算记录的数量,这基本上是趋势天数。然后,我们将对这些分组记录进行排名,并采用最新记录,以检索最新的统计数据。
在上一个成就中已经设置了VideoFactory输出的配置文件,因此可以保存。您将需要阅读并进行处理以获取最新的视频统计信息。不要忘记为该新Factory的输出添加配置文件。
创建一个名为VideoStats的案例类别,该类别具有与VideoCountry相似的字段,但您需要考虑到趋势天数。
在这个工厂中,您需要做的就是阅读输入,将其传递给可以进行数据处理的Transformer并写入输出。应该很简单;您可以尝试模仿其他Factories 。
Deliveries 。如前所述,我们将将视频分组在一起。为此,我们将使用org.apache.spark.sql.expressions.Window 。确保您知道Window事先做什么。
Window ,您将通过计算每个视频的趋势天数来分区。要知道您要通过哪些字段进行分区,请查看单个视频的哪些字段相同。Window ,该窗口将在其趋势日期之前用于对视频进行排名。通过选择最新日期,我们可以检索每个视频的最新统计信息。Windows后,您现在可以在趋势天数中添加新的列trendingDays ,并通过降序订购趋势日期rank 。rank过滤视频,只获取rank 1的记录。DataFrame投放到Dataset[VideoStats] 。Window使用partitionBy和orderBy方法;和count ,从org.apache.spark.sql.functions使用Dataset集时的rank方法。要检查工作的结果,请转到App.scala ,设置SparkRepositories ,添加阶段并运行代码。它将在相应的路径中创建输出文件。
Pipeline 。Connector和一个SparkRepository ,以及如何设置它们的Deliveries 。在获得最新统计数据后,我们现在将计算每个视频的受欢迎程度得分。如前所述,我们的公式非常简单,可能不代表现实。让我们想起,该公式是views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 。使用VideoStats的先前结果,我们将仅应用公式,然后将数据最高分为最低。
这是最后一个数据转换。设置配置,以便您可以保存最后一个Dataset[VideoStats] 。要添加用于公式的常数,您需要在Pipeline中设置Inputs 。在添加Pipeline中的阶段之前,请使用setInput[T](<value>, <id>)设置常数。这些输入都可以在一旦添加到Pipeline中的任何Factories中可以检索。
这里不需要实体。我们将简单地对先前的数据进行排序,然后删除用于计算分数的列,以便我们仍然可以使用VideoStats实体。
在这个工厂中,您需要做的就是阅读输入,将其传递给可以进行数据处理的Transformer并写入输出。应该很简单;您可以尝试模仿其他Factories 。
Deliverable : Connector , SparkRepository和/或Input 。让我们对视图数量的喜欢/不喜欢的数量进行标准化。对于每个记录,将喜欢的数量除以视图数量,然后除以视图数量。之后,获得“归一化”喜欢的百分比。
现在,让我们将评论数量归一化。对于每个记录,将注释数除以视图数。
现在,我们可以计算受欢迎程度得分。提醒该公式为: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 。
但是,有些视频被禁用了评论。在这种情况下,公式将变为: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) 。我们任意决定权重为:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05org.apache.spark.sql.functions中查看when和otherwise功能。按降序按score排序,并获取100个第一记录。现在,您拥有10个地区的100个最“流行”的视频。
要检查工作的结果,请转到App.scala ,设置Inputs如果尚未设置),请设置输出SparkRepository ,添加阶段并运行代码。它将在相应的路径中创建输出文件。
Deliveries : Input , Connector和SparkRepository ,以及deliveryId 。Stage ,包括Factory和Transformer(s) 。如果您喜欢此项目,请在此处查看SETL框架:https://github.com/setl-developers/setl,为什么不带您的贡献!