YouTube Setl es un proyecto que tiene como objetivo proporcionar un punto de partida para practicar el marco SETL: https://github.com/setl-developers/setl. La idea es dar un proyecto de contexto que involucre operaciones de extracto, transformación y carga. Hay tres niveles de dificultad para el ejercicio: modo fácil, modo normal y modo duro.
Los datos que se usan son de Kaggle, https://www.kaggle.com/datasnaek/youtube-new.
Utilicé JetBrains IntelliJ Idea Community Edition para este proyecto, con Scala y Apache Spark.
Los datos se dividen en regiones múltiples: Canadá (CA), Alemania (DE), Francia (FR), Gran Bretaña (GB), India (IN), Japón (JP), Corea del Sur (KR), México (MX), Rusia (RU) y Estados Unidos (Estados Unidos). Para cada una de estas regiones, hay dos archivos:

Todos los días, YouTube ofrece alrededor de 200 de los videos más tendidos en cada país. YouTube mide cuánto un video está de moda en función de una combinación de factores que no se hacen completamente públicos. Este conjunto de datos consiste en una colección de los mejores videos de tendencias de todos los días. Como consecuencia, es posible que el mismo video aparezca varias veces, lo que significa que está en tendencia durante varios días.
Básicamente, los elementos de los campos de elementos nos permiten asignar la category_id del archivo CSV a la categoría de nombre completo.
Vamos a analizar este conjunto de datos y determinar videos "populares". Pero, ¿cómo definimos un video popular? Vamos a definir la popularidad de un video basado en su número de vistas, gustos, disgustos, número de comentarios y número de días de tendencia.
Esta definición es claramente discutible y arbitraria, y no estamos buscando descubrir la mejor definición para la popularidad de un video. Solo nos centraremos en el propósito de este proyecto: practicar con el marco SETL.
El objetivo de este proyecto es encontrar los 100 videos más "populares" y las categorías de video más "populares". Pero, ¿cómo definimos la popularidad de un video? La fórmula va a ser:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight .
El porcentaje de me gusta es la proporción de me gusta sobre disgustos. Esta relación se normaliza sobre el número de vistas. La misma normalización se realiza con el número de comentarios.
A continuación se presentan las instrucciones para cada nivel de dificultad para realizar el proyecto. Para cada nivel de dificultad, puede clonar el repositorio con la rama específica para tener un proyecto inicial.
Para este proyecto, suponemos que ya tiene un conocimiento básico de Scala y Apache Spark.
entity que contiene las clases de casos o los objetos; factory que contiene transformadores; y transformer que contiene las transformaciones de datos.Factory o Transformer SETL , puede usar Ctrl+i para crear automáticamente las funciones necesarias. Lo primero que vamos a hacer es, por supuesto, leer las entradas: los archivos CSV, que llamaré a los archivos de videos y los archivos JSON, los archivos de categorías.
Comencemos con los archivos de categorías. Todos los archivos de categorías son archivos JSON . Cree una clase de caso que represente una categoría , luego una Factory con un Transformer que procesará los archivos de categorías en la clase de casos.
local.conf . Ya se ha creado un objeto para leer los archivos de categorías.org.apache.spark.sql.functions .coalesce al guardar un archivo. Ahora podemos trabajar con los archivos de videos. Del mismo modo, cree una clase de casos que represente un video para leer las entradas, luego una Factory con uno o varios Transformers que harán el procesamiento. Debido a que los archivos de videos están separados de las regiones, no existe la información de la región para cada registro en el conjunto de datos. Intente agregar esta información utilizando otro videocuito de clase Case que es muy similar al video , y fusione todos los registros en un solo marco de datos/conjunto de datos.
Transformers serán útiles: uno para agregar la columna country y otro para fusionar todos los videos en un solo conjunto de datos.Debido a que un video puede ser una tendencia superior por un día y al día siguiente, es posible que un video tenga múltiples filas, donde cada una tiene diferentes números en términos de vistas, gustos, disgustos, comentarios ... como consecuencia, tenemos que recuperar las últimas estadísticas disponibles para un solo video, para cada región, porque estas estadísticas son incrementales. Al mismo tiempo, vamos a calcular el número de días de tendencia para cada video.
Cree un videos de videos de clase de casos, que es muy similar a las clases de casos anteriores, pero con la información de los días de tendencia.
Primero, calcule el número de días de tendencia de cada video.
window desde org.apache.spark.sql.functions .Para recuperar las últimas estadísticas, debe recuperar el último día de tendencias de cada video. De hecho, son las últimas estadísticas disponibles.
window . El primero fue para calcular el número de días de tendencia, y el segundo para recuperar las últimas estadísticas.rank .Ordene los resultados por región, número de días de tendencia, puntos de vista, me gusta y luego comentarios. Preparará los datos para el próximo logro.
Ahora vamos a calcular el puntaje de popularidad de cada video, después de obtener sus últimas estadísticas. Como se dijo anteriormente, nuestra fórmula es muy simple y puede no representar la realidad.
Normalizemos el número de me gusta/disgusto sobre el número de vistas. Para cada registro, divida el número de me gusta por el número de vistas, y luego el número de disgustos por el número de vistas. Después de eso, obtenga el porcentaje de me gusta "normalizados".
Ahora normalizamos el número de comentarios. Para cada registro, divida el número de comentarios por el número de vistas.
Ahora podemos calcular el puntaje de popularidad. Recuerde que la fórmula es: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Sin embargo, hay videos donde los comentarios están deshabilitados. En este caso, la fórmula se convierte en: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Decidimos arbitrariamente que los pesos son:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 Configurelos como Input para que puedan modificarse fácilmente.
when y funciona otherwise desde org.apache.spark.sql.functions . Ordene por el score en orden descendente y tome los 100 primeros registros. Ahora tiene los 100 videos más "populares" de las 10 regiones.
Lo primero que vamos a hacer es, por supuesto, leer las entradas: los archivos CSV, que llamaré a los archivos de videos y los archivos JSON, los archivos de categorías.
Comencemos con los archivos de categorías. Todos los archivos de categorías son archivos JSON. Aquí está el flujo de trabajo: vamos a definir un archivo de configuración que indique los archivos de categorías para leer; crear una clase de caso que represente una categoría; Luego, una Factory con un Transformer que procesará los archivos de categorías en la clase Case. Finalmente, vamos a agregar el Stage a la Pipeline para activar las transformaciones.
El objeto de configuración ya se ha creado en resources/local.conf . Preste atención a las opciones storage y path . Mueva los archivos de categorías en consecuencia. Si varios archivos están en la misma carpeta y la carpeta se usa como ruta, SETL considerará los archivos como particiones de un solo archivo. A continuación, consulte la App.scala . Puede ver que utilizamos los métodos setConnector() y setSparkRepository() . Cada vez que desee usar un repositorio, deberá agregar una configuración en la configuración y registrarla en el objeto setl .
Cree una clase de caso nombrada Category en la carpeta entity . Ahora examine, en los archivos de categorías, los campos que necesitaremos.
Necesitaremos la id y el title de la categoría. Asegúrese de verificar los archivos y usar la misma ortografía para crear la clase de casos Category .
El esqueleto de la Factory ya ha sido proporcionado. Asegúrese de comprender la estructura lógica.
Delivery en forma de un Connector nos permite recuperar las entradas. Otra Delivery actuará como un SparkRepository , donde escribiremos el resultado de la transformación. Consulte la id de cada Delivery y el deliveryId en App.scala . Se usan, por lo que no hay ambigüedad cuando setl obtiene los repositorios. Para poder leer las dos entregas anteriores, vamos a usar otras dos variables: un DataFrame para leer el Connector y un Dataset para almacenar la salida SparkRepository . La diferencia entre ellos es que se escribe un resumen SparkRepository , de ahí el Dataset .Factory SETL :read : La idea es tomar las entradas SparkRepository Delivery Connector o el depósito de chispa, preprocesarlas si es necesario, y almacenarlas en variables para usarlas en la siguiente función.process : Aquí es donde se realizarán todas las transformaciones de datos. Cree una instancia del Transformer que esté utilizando, llame al método transform() , use el Getter transformed y almacene el resultado en una variable.write : Como su nombre lo indica, se usa para guardar la salida de las transformaciones después de que se hayan hecho. Un Connector utiliza el método write() para guardar un DataFrame , y un SparkRepository usa el método save() para guardar un Dataset .get : esta función se utiliza para pasar la salida a la siguiente Stage de la Pipeline . Simplemente devuelva el Dataset .process , puede haber múltiples Transformer . Vamos a tratar de seguir esta estructura durante el resto del proyecto.Factory se transferirá automáticamente a la siguiente Stage a través de la función get . Sin embargo, escribir el resultado de cada Factory será más fácil para la visualización y la depuración. Nuevamente, el esqueleto del Transformer ya se ha proporcionado. Sin embargo, usted será el que escribirá la transformación de datos.
Transformer toma una discusión. Por lo general, es el DataFrame o el Dataset que queremos procesar. Dependiendo de su aplicación, puede agregar otros argumentos.transformedData es la variable que almacenará el resultado de la transformación de datos.transformed es el recibo que será llamado por una Factory para recuperar el resultado de la transformación de datos.transform() es el método que hará las transformaciones de datos.items . Si revisa los archivos de categorías, la información que necesitamos es en este campo.items es una matriz. Queremos explotar esta matriz y tomar solo el campo id y el campo title desde el campo snippet . Para hacer eso, use la función explode de org.apache.spark.sql.functions . Luego, para obtener campos específicos, use el método withColumn y el método getField() en id, snippet y title . No olvide lanzar los tipos en consecuencia a la clase de casos que creó.id y las columnas title . Luego, coloque el marco de datos en un conjunto de datos con as[T] .Transformer . Para ver qué hace, puede ejecutar el archivo App.scala que ya se ha creado. Simplemente ejecuta la Factory que contiene el Transformer que acaba de escribir, y generará el resultado en la ruta del archivo de configuración. Tenga en cuenta que la Factory correspondiente se ha agregado a través de addStage() que hace que la Pipeline la ejecute.Connector , usar la anotación @Delivery , con deliveryId ?Transformer en el método process de una Factory .write de una Factory . Vamos a procesar ahora los archivos de videos. Nos gustaría fusionar todos los archivos en un solo DataFrame / Dataset o en el mismo archivo CSV, mientras mantiene la información de la región para cada video. Todos los archivos de videos son archivos CSV y tienen las mismas columnas, como se indicó anteriormente en la sección de contexto . El flujo de trabajo es similar al último: configuración; clase de caso; Factory ; Transformer ; Agregue el Stage a la Pipeline . Esta vez, vamos a establecer múltiples objetos de configuración.
Vamos a establecer múltiples objetos de configuración en resources/local.conf , uno por región. En cada objeto de configuración, deberá establecer storage, path, inferSchema, delimiter, header, multiLine y dateFormat .
videos<region>Repository .Factory . Cree una clase de casos con nombre de Video en la carpeta entity . Ahora examine, en los archivos de videos, los campos que necesitaremos. Recuerde que el objetivo es calcular el puntaje de popularidad, y que la fórmula es number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight . Ayudará a seleccionar los campos.
Cree otra clase de caso llamada VideoCountry . Tendrá exactamente los mismos campos que Video , pero con el campo del país/región además.
@ColumnName del marco. Intente usarlo, ya que puede ser útil en algunas situaciones comerciales de la vida real.java.sql.Date para un campo de tipo de fecha. Nos gustaría tener el videoId , title , channel_title , category_id , trending_date , views , likes , dislikes , los campos comment_count , comments_disabled y video_error_or_removed Fields.
El objetivo de esta fábrica es fusionar todos los archivos de videos en uno solo, sin eliminar la información de la región. Eso significa que vamos a usar dos tipos de Transformer .
Delivery en forma de un SparkRepository[Video] . Establezca una última Delivery como SparkRepository[VideoCountry] , donde escribiremos el resultado de la transformación. Establezca tantas variables Dataset[Video] como el número de entradas.Factory :read : Preprocese el SparkRepository filtrando los videos que se eliminan o el error . Luego, "emitirlos" como Dataset[Video] y guárdelos en las variables correspondientes.process : aplique el primer Transformer para cada una de las entradas y aplique los resultados al segundo Transformer .write : Escriba la salida SparkRepository[VideoCountry] .get : solo devuelva el resultado del Transformer final.Connector para leer los archivos de entrada y un SparkRepository para la salida?SparkRepository para leer las entradas solo para proporcionar una estructura para los archivos de entrada.SparkRepository y muchas variables correspondientes, y no encuentro esto bonito/consisas. ¿No hay otra solución?Delivery en forma de SparkRepository , puede usar entregas en forma de un Dataset con la opción autoLoad = true . Entonces, en lugar de tener: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
El objetivo principal del primer Transformer es agregar la información de la región/país. Cree un Transformer que tome dos entradas, un Dataset[Video] y una cadena. Agregue el country de la columna y devuelva un Dataset[VideoCountry] . También puede filtrar los videos etiquetados como eliminados o error . Por supuesto, este último paso se puede colocar en otro lugar.
El objetivo principal del segundo Transformer es reagrupar todos los videos, mientras mantiene la información de la región.
reduce y union . Para verificar el resultado de su trabajo, vaya a App.scala , configure los SparkRepositories , agregue el VideoFactory del escenario y ejecute el código. Creará el archivo de salida en la ruta correspondiente.
Connector como SparkRepository .Deliveries en un Transformer o un Connector .Transformers en una Factory .Debido a que un video puede ser uno de los mejores tendencias por un día y al día siguiente, tendrá diferentes números en términos de vistas, me gusta, disgustos, comentarios ... Como consecuencia, tenemos que recuperar las últimas estadísticas disponibles para un solo video, para cada región. Al mismo tiempo, vamos a calcular el número de días de tendencia para cada video.
Pero, ¿cómo vamos a hacer eso? En primer lugar, vamos a agrupar los registros que corresponden al mismo video y contaremos el número de registros, que es básicamente el número de días de tendencia. Luego, vamos a clasificar estos registros agrupados y tomar los últimos, para recuperar las últimas estadísticas.
El archivo de configuración para la salida de VideoFactory ya está configurado en el logro anterior para que se pueda guardar. Deberá leerlo y procesarlo para obtener las últimas estadísticas de videos. No olvide agregar un archivo de configuración para la salida de esta nueva Factory .
Cree una clase de casos con nombre de VideoStats que tengan campos similares al VideoCountry , pero debe tener en cuenta el número de días de tendencia.
En esta fábrica, todo lo que necesita hacer es leer la entrada, pasarla al Transformer que hará el procesamiento de datos y escribir la salida. Debería ser bastante simple; Puedes intentar imitar las otras Factories .
Deliveries de entradas y salidas. Como se dijo anteriormente, vamos a agrupar los videos. Para eso, vamos a usar org.apache.spark.sql.expressions.Window . Asegúrese de saber lo que hace una Window de antemano.
Window que dividirá para contar el número de días de tendencia para cada video. Para saber por qué campos va a dividir, mire qué campos será el mismo para un solo video.Window que se utilizará para clasificar los videos por su fecha de tendencia. Al seleccionar la fecha más reciente, podemos recuperar las últimas estadísticas de cada video.Windows , ahora puede agregar nuevas columnas trendingDays para el número de días de tendencia y rank para la clasificación de la fecha de tendencia por orden descendente.rank , tomando solo los discos con el rank 1.DataFrame a Dataset[VideoStats] .partitionBy y orderBy para la Window ; y los métodos de count rank de org.apache.spark.sql.functions al trabajar con el Dataset . Para verificar el resultado de su trabajo, vaya a App.scala , configure los SparkRepositories , agregue el escenario y ejecute el código. Creará el archivo de salida en la ruta correspondiente.
Pipeline .Connector y un SparkRepository , y cómo establecer Deliveries de ellos. Ahora vamos a calcular el puntaje de popularidad de cada video, después de obtener sus últimas estadísticas. Como se dijo anteriormente, nuestra fórmula es muy simple y puede no representar la realidad. Recordemos que la fórmula es views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight . Usando el resultado anterior de VideoStats , simplemente vamos a aplicar la fórmula y ordenar los datos por la puntuación más alta al más bajo.
Esta es la última transformación de datos. Establezca la configuración para que pueda guardar este último Dataset[VideoStats] . Para agregar las constantes utilizadas para la fórmula, deberá establecer Inputs en la Pipeline . Antes de agregar etapas en la Pipeline , use setInput[T](<value>, <id>) para establecer las constantes. Estas entradas se pueden recuperar en cualquier momento en cualquier Factories una vez agregada a la Pipeline .
No se necesitará ninguna entidad aquí. Simplemente ordenaremos los datos anteriores y eliminaremos las columnas utilizadas para calcular la puntuación para que aún podamos usar la entidad VideoStats .
En esta fábrica, todo lo que necesita hacer es leer la entrada, pasarla al Transformer que hará el procesamiento de datos y escribir la salida. Debería ser bastante simple; Puedes intentar imitar las otras Factories .
Deliverable : Connector , SparkRepository y/o Input .Normalizemos el número de me gusta/disgusto sobre el número de vistas. Para cada registro, divida el número de me gusta por el número de vistas, y luego el número de disgustos por el número de vistas. Después de eso, obtenga el porcentaje de me gusta "normalizados".
Ahora normalizamos el número de comentarios. Para cada registro, divida el número de comentarios por el número de vistas.
Ahora podemos calcular el puntaje de popularidad. Recuerde que la fórmula es: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Sin embargo, hay videos donde los comentarios están deshabilitados. En este caso, la fórmula se convierte en: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Decidimos arbitrariamente que los pesos son:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when y funciona otherwise desde org.apache.spark.sql.functions . Ordene por el score en orden descendente y tome los 100 primeros registros. Ahora tiene los 100 videos más "populares" de las 10 regiones.
Para verificar el resultado de su trabajo, vaya a App.scala , establezca las Inputs si aún no están configuradas, configure la salida SparkRepository , agregue la etapa y ejecute el código. Creará el archivo de salida en la ruta correspondiente.
Deliveries : Input , Connector y SparkRepository , con deliveryId .Stage , incluida la Factory y los Transformer(s) .Si le gustó este proyecto, consulte el marco Setl aquí: https://github.com/setl-developers/setl, ¡y por qué no traer su contribución!