O YouTube Setl é um projeto que visa fornecer um ponto de partida para praticar a estrutura setl: https://github.com/setl-developers/setl. A idéia é fornecer um projeto de contexto envolvendo operações de extração, transformação e carga. Existem três níveis de dificuldade para o exercício: modo fácil, modo normal e modo rígido.
Os dados usados são de kaggle, https://www.kaggle.com/datasnaek/youtube-new.
Usei o JetBrains Intellij Idea Community Edition para este projeto, com Scala e Apache Spark.
Os dados são divididos nas regiões múltiplas: Canadá (CA), Alemanha (DE), França (FR), Grã -Bretanha (GB), Índia (IN), Japão (JP), Coréia do Sul (KR), México (MX), Rússia (RU) e Estados Unidos (EUA). Para cada uma dessas regiões, existem dois arquivos:

Todos os dias, o YouTube oferece cerca de 200 dos vídeos mais tendentes em cada país. O YouTube mede o quanto um vídeo está na moda com base em uma combinação de fatores que não se tornam totalmente públicos. Esse conjunto de dados consiste em uma coleção dos principais vídeos de tendências do Everyday. Como conseqüência, é possível que o mesmo vídeo apareça várias vezes, o que significa que está tendendo por vários dias.
Basicamente, os elementos dos campos de itens nos permitem mapear a category_id do arquivo CSV para a categoria de nome completo.
Vamos analisar esse conjunto de dados e determinar vídeos "populares". Mas, como definimos um vídeo popular? Vamos definir a popularidade de um vídeo com base em seu número de visualizações, curtidas, desgostos, número de comentários e número de dias de tendência.
Essa definição é claramente discutível e arbitrária, e não estamos procurando descobrir a melhor definição para a popularidade de um vídeo. Vamos nos concentrar apenas no objetivo deste projeto: pratique com a estrutura setl.
O objetivo deste projeto é encontrar os 100 vídeos mais "populares" e as categorias de vídeo mais "populares". Mas como definimos a popularidade de um vídeo? A fórmula será:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight .
A porcentagem de gostos é a proporção de curtidas por não participar. Essa proporção é normalizada sobre o número de visualizações. A mesma normalização é feita com o número de comentários.
Abaixo estão as instruções para cada nível de dificuldade para realizar o projeto. Para cada nível de dificuldade, você pode clonar o repositório com a filial específica para ter um projeto inicial.
Para este projeto, assumimos que você já possui um conhecimento básico do Scala e do Apache Spark.
entity que contém as classes de caso ou os objetos; factory que contém transformadores; e transformer que contém as transformações de dados.Factory ou Transformer Setl , poderá usar Ctrl+i para criar automaticamente as funções necessárias. A primeira coisa que vamos fazer é, é claro, ler as entradas: os arquivos CSV, que chamarei os arquivos de vídeos e os arquivos JSON, os arquivos das categorias.
Vamos começar com os arquivos de categorias. Todos os arquivos de categorias são arquivos JSON . Crie uma classe de caso que represente uma categoria e, em seguida, uma Factory com um Transformer que processará os arquivos de categorias na classe Case.
local.conf . Um objeto já foi criado para ler os arquivos de categorias.org.apache.spark.sql.functions .coalesce ao salvar um arquivo. Agora podemos trabalhar com os arquivos de vídeos. Da mesma forma, crie uma classe de caso que represente um vídeo para ler as entradas e depois uma Factory com um ou vários Transformers que farão o processamento. Como os arquivos de vídeos são separados das regiões, não há as informações da região para cada registro no conjunto de dados. Tente adicionar essas informações usando outra classe de caso Videocountry, que é muito semelhante ao vídeo e mescla todos os registros em um único DataFrame/DataSet.
Transformers serão úteis: um para adicionar a coluna country e um para mesclar todos os vídeos em um único conjunto de dados.Como um vídeo pode ser uma das principais tendências por um dia e no dia seguinte, é possível que um vídeo tenha várias linhas, onde cada um tem números diferentes em termos de visualizações, curtidas, desgostos, comentários ... como conseqüência, precisamos recuperar as estatísticas mais recentes disponíveis para um único vídeo, para cada região, porque essas estatísticas são incrementais. Ao mesmo tempo, calcularemos o número de dias de tendência para cada vídeo.
Crie uma classe de caso Videostats , que é muito semelhante às classes de casos anteriores, mas com as informações dos dias de tendência.
Primeiro, calcule o número de dias de tendência de cada vídeo.
window de org.apache.spark.sql.functions .Para recuperar as estatísticas mais recentes, você deve recuperar o último dia de tendência de cada vídeo. É de fato as mais recentes estatísticas disponíveis.
window . O primeiro foi para calcular o número de dias de tendência e o segundo para recuperar as estatísticas mais recentes.rank .Classifique os resultados por região, número de dias de tendência, visualizações, curtidas e comentários. Ele preparará os dados para a próxima conquista.
Agora vamos calcular a pontuação de popularidade de cada vídeo, depois de obter suas últimas estatísticas. Como dito anteriormente, nossa fórmula é muito simples e pode não representar a realidade.
Vamos normalizar o número de curtidas/não gosta do número de visualizações. Para cada registro, divida o número de curtidas pelo número de visualizações e, em seguida, o número de desgostos pelo número de visualizações. Depois disso, obtenha a porcentagem de curtidas "normalizadas".
Agora vamos normalizar o número de comentários. Para cada registro, divida o número de comentários pelo número de visualizações.
Agora podemos calcular a pontuação de popularidade. Lembre que a fórmula é: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
No entanto, existem vídeos em que os comentários são desativados. Nesse caso, a fórmula se torna: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Decidimos arbitrariamente os pesos ser:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 Configure -os como Input para que possam ser facilmente modificados.
when funciona e otherwise funciona de org.apache.spark.sql.functions . Classifique pela score em ordem decrescente e pegue os 100 primeiros registros. Agora você tem os 100 vídeos mais "populares" das 10 regiões.
A primeira coisa que vamos fazer é, é claro, ler as entradas: os arquivos CSV, que chamarei os arquivos de vídeos e os arquivos JSON, os arquivos das categorias.
Vamos começar com os arquivos de categorias. Todos os arquivos de categorias são arquivos JSON. Aqui está o fluxo de trabalho: vamos definir um arquivo de configuração que indica os arquivos de categorias a serem lidos; Crie uma classe de caso que represente uma categoria; Em seguida, uma Factory com um Transformer que processará os arquivos de categorias na classe Case. Finalmente, vamos adicionar o Stage ao Pipeline para acionar as transformações.
O objeto de configuração já foi criado em resources/local.conf . Preste atenção nas opções de storage e path . Mova os arquivos de categorias de acordo. Se vários arquivos estiverem na mesma pasta e a pasta for usada como um caminho, o Setl considerará os arquivos como partições de um único arquivo. Em seguida, confira o App.scala . Você pode ver que usamos os métodos setConnector() e setSparkRepository() . Toda vez que você deseja usar um repositório, você precisará adicionar uma configuração na configuração e registrá -la no objeto setl .
Crie uma classe de caso denominada Category na pasta da entity . Agora examine, nos arquivos de categorias, os campos que precisaremos.
Precisamos do id e do title da categoria. Verifique os arquivos e use a mesma ortografia para criar a classe de caso de Category .
O esqueleto da Factory já foi fornecido. Certifique -se de entender a estrutura lógica.
Delivery na forma de um Connector nos permite recuperar as entradas. Outra Delivery atuará como um SparkRepository , onde escreveremos a saída da transformação. Confira o id de cada Delivery e o deliveryId no App.scala . Eles são usados para que não haja ambiguidade quando o Setl busca os repositórios. Para poder ler as duas entregas anteriores, usaremos duas outras variáveis: um DataFrame para ler o Connector e um Dataset para armazenar o SparkRepository de saída. A diferença entre eles é que um SparkRepository é digitado, daí o Dataset .Factory setl :read : A idéia é pegar as entradas de entrega Connector ou SparkRepository Delivery , pré -processá -las, se necessário, e armazená -las em variáveis para usá -las na próxima função.process : Aqui é onde todas as transformações de dados serão feitas. Crie uma instância do Transformer que você está usando, ligue para o método transform() , use o getter transformed e armazene o resultado em uma variável.write : Como o seu nome sugere, ele é usado para salvar a saída das transformações depois que elas foram feitas. Um Connector usa o método write() para salvar um DataFrame e um SparkRepository usa o método save() para salvar um Dataset .get : Esta função é usada para passar a saída para o próximo Stage do Pipeline . Basta retornar o Dataset .process , pode haver vários Transformer . Vamos tentar seguir essa estrutura durante o restante do projeto.Factory será transferido automaticamente para o próximo Stage através da função get . No entanto, escrever a saída de todas Factory será mais fácil para visualização e depuração. Novamente, o esqueleto do Transformer já foi fornecido. No entanto, você será quem escreverá a transformação dos dados.
Transformer leva uma discussão. Geralmente, é o DataFrame ou o Dataset que queremos processar. Dependendo do seu aplicativo, você pode adicionar outros argumentos.transformedData é a variável que armazenará o resultado da transformação de dados.transformed é o getter que será chamado por uma Factory para recuperar o resultado da transformação de dados.transform() é o método que fará as transformações de dados.items . Se você conferir os arquivos de categorias, as informações necessárias estão nesse campo.items é uma matriz. Queremos explodir essa matriz e pegar apenas o campo id e o campo title do campo snippet . Para fazer isso, use a função explode em org.apache.spark.sql.functions . Em seguida, para obter campos específicos, use o método withColumn e o método getField() em id, snippet e title . Não se esqueça de lançar os tipos de acordo com a classe de caso que você criou.id e as colunas title . Em seguida, envie o quadro de dados em um conjunto de dados com as[T] .Transformer . Para ver o que ele faz, você pode executar o arquivo App.scala que já foi criado. Ele simplesmente executa a Factory que contém o Transformer que você acabou de escrever e produzirá o resultado para o caminho do arquivo de configuração. Observe que a Factory correspondente foi adicionada via addStage() que faz com que o Pipeline o execute.Connector , usando a anotação @Delivery , com deliveryId .Transformer no método process de uma Factory .write de uma Factory . Agora vamos processar os arquivos de vídeos. Gostaríamos de mesclar todos os arquivos em um único DataFrame / Dataset ou no mesmo arquivo CSV, mantendo as informações da região para cada vídeo. Todos os arquivos de vídeos são arquivos CSV e eles têm as mesmas colunas, conforme declarado anteriormente na seção de contexto . O fluxo de trabalho é semelhante ao último: configuração; classe de caso; Factory ; Transformer ; Adicione o Stage no Pipeline . Desta vez, vamos definir vários objetos de configuração.
Vamos definir vários objetos de configuração em resources/local.conf , um por região. Em cada objeto de configuração, você precisará definir storage, path, inferSchema, delimiter, header, multiLine e dateFormat .
videos<region>Repository .Factory . Crie uma classe de caso chamada Video na pasta entity . Agora examine, nos arquivos de vídeos, os campos que precisaremos. Lembre que o objetivo é calcular a pontuação de popularidade e que a fórmula é number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight . Isso ajudará a selecionar os campos.
Crie outra classe de caso chamada VideoCountry . Ele terá exatamente os mesmos campos que Video , mas com o campo de país/região.
@ColumnName da estrutura. Tente usá-lo, pois pode ser útil em algumas situações de negócios da vida real.java.sql.Date para um campo de tipo de data. Gostaríamos de ter o videoId , title , channel_title , category_id , trending_date , views , likes , dislikes , comment_count , comments_disabled e video_error_or_removed Fields.
O objetivo desta fábrica é mesclar todos os arquivos de vídeos em um único, sem remover as informações da região. Isso significa que vamos usar dois tipos de Transformer .
Delivery de entradas na forma de um SparkRepository[Video] . Defina uma última Delivery como um SparkRepository[VideoCountry] , onde escreveremos a saída da transformação. Defina tantos Dataset[Video] quanto o número de entradas.Factory :read : Pré -processo o SparkRepository filtrando os vídeos que são removidos ou erros . Em seguida, "lance -os" como Dataset[Video] e armazene -os nas variáveis correspondentes.process : aplique o primeiro Transformer para cada uma das entradas e aplique os resultados ao segundo Transformer .write : Escreva o SparkRepository[VideoCountry] .get : Basta retornar o resultado do Transformer final.Connector para ler os arquivos de entrada e um SparkRepository para a saída?SparkRepository para ler as entradas apenas para fornecer uma estrutura para os arquivos de entrada.SparkRepository e muitas variáveis correspondentes, e não acho isso bonito/considero. Não há outra solução?Delivery na forma de um SparkRepository , você pode usar entregas na forma de um Dataset com a opção autoLoad = true . Então, em vez de ter: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
O principal objetivo do primeiro Transformer é adicionar as informações da região/país. Crie um Transformer que leva duas entradas, um Dataset[Video] e uma string. Adicione o country da coluna e devolva um Dataset[VideoCountry] . Você também pode filtrar os vídeos rotulados como removidos ou erros . Obviamente, este último passo pode ser colocado em outro lugar.
O principal objetivo do segundo Transformer é reagrupar todos os vídeos juntos, mantendo as informações da região.
reduce e union . Para verificar o resultado do seu trabalho, acesse App.scala , defina os SparkRepositories , adicione o VideoFactory do palco e execute o código. Ele criará o arquivo de saída no caminho correspondente.
Connector e SparkRepository .Deliveries em um Transformer ou um Connector .Transformers em uma Factory .Como um vídeo pode ser um dos principais tendências por um dia e no dia seguinte, ele terá números diferentes em termos de visualizações, curtidas, desgostos, comentários ... como conseqüência, temos que recuperar as estatísticas mais recentes disponíveis para um único vídeo, para cada região. Ao mesmo tempo, calcularemos o número de dias de tendência para cada vídeo.
Mas como vamos fazer isso? Primeiro de tudo, agruparemos os registros que correspondem ao mesmo vídeo e contamos o número de registros, que é basicamente o número de dias de tendência. Em seguida, vamos classificar esses registros agrupados e pegar o mais recente, para recuperar as estatísticas mais recentes.
O arquivo de configuração para a saída do VideoFactory já está definido na conquista anterior para que possa ser salvo. Você precisará lê -lo e processá -lo para obter as estatísticas mais recentes de vídeos. Não se esqueça de adicionar um arquivo de configuração para a saída desta nova Factory .
Crie uma classe de caso nomeada VideoStats que tenha campos semelhantes para VideoCountry , mas você precisa levar em consideração o número de dias de tendência.
Nesta fábrica, tudo o que você precisa fazer é ler a entrada, passá -la para o Transformer que fará o processamento de dados e gravará a saída. Deve ser bem simples; Você pode tentar imitar as outras Factories .
Deliveries de entrada e saída. Como disse anteriormente, vamos agrupar os vídeos. Para isso, vamos usar org.apache.spark.sql.expressions.Window . Certifique -se de saber o que uma Window faz de antemão.
Window pela qual você particionará para contar o número de dias de tendência para cada vídeo. Para saber por quais campos você vai participar, veja quais campos serão os mesmos para um único vídeo.Window que será usada para classificar os vídeos até a data de tendência. Ao selecionar a data mais recente, podemos recuperar as estatísticas mais recentes de cada vídeo.Windows , agora você pode adicionar novas trendingDays para o número de dias de tendência e rank para a classificação da data de tendência por ordem descendente.rank , pegando apenas os registros com a rank 1.DataFrame para Dataset[VideoStats] .partitionBy e orderBy para a Window ; e a count , os métodos rank de org.apache.spark.sql.functions ao trabalhar com o Dataset . Para verificar o resultado do seu trabalho, acesse App.scala , defina os SparkRepositories , adicione o palco e execute o código. Ele criará o arquivo de saída no caminho correspondente.
Pipeline .Connector e um SparkRepository e como definir Deliveries deles. Agora vamos calcular a pontuação de popularidade de cada vídeo, depois de obter suas últimas estatísticas. Como dito anteriormente, nossa fórmula é muito simples e pode não representar a realidade. Vamos lembrar que a fórmula é views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight . Usando o resultado anterior de VideoStats , simplesmente aplicaremos a fórmula e classificaremos os dados pela pontuação mais alta à menor.
Esta é a última transformação de dados. Defina a configuração para que você possa salvar este último Dataset[VideoStats] . Para adicionar as constantes usadas para a fórmula, você precisará definir Inputs no Pipeline . Antes de adicionar estágios no Pipeline , use setInput[T](<value>, <id>) para definir as constantes. Essas entradas são recuperáveis a qualquer momento em qualquer Factories , uma vez adicionadas ao Pipeline .
Nenhuma entidade será necessária aqui. Simplesmente classificaremos os dados anteriores e soltaremos as colunas usadas para calcular a pontuação, para que ainda possamos usar a entidade VideoStats .
Nesta fábrica, tudo o que você precisa fazer é ler a entrada, passá -la para o Transformer que fará o processamento de dados e gravará a saída. Deve ser bem simples; Você pode tentar imitar as outras Factories .
Deliverable : Connector , SparkRepository e/ou Input .Vamos normalizar o número de curtidas/não gosta do número de visualizações. Para cada registro, divida o número de curtidas pelo número de visualizações e, em seguida, o número de desgostos pelo número de visualizações. Depois disso, obtenha a porcentagem de curtidas "normalizadas".
Agora vamos normalizar o número de comentários. Para cada registro, divida o número de comentários pelo número de visualizações.
Agora podemos calcular a pontuação de popularidade. Lembre que a fórmula é: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
No entanto, existem vídeos em que os comentários são desativados. Nesse caso, a fórmula se torna: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Decidimos arbitrariamente os pesos ser:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when funciona e otherwise funciona de org.apache.spark.sql.functions . Classifique pela score em ordem decrescente e pegue os 100 primeiros registros. Agora você tem os 100 vídeos mais "populares" das 10 regiões.
Para verificar o resultado do seu trabalho, vá para App.scala , defina as Inputs , se elas ainda não estiverem definidas, defina o SparkRepository de saída, adicione o estágio e execute o código. Ele criará o arquivo de saída no caminho correspondente.
Deliveries : Input , Connector e SparkRepository , com deliveryId .Stage , incluindo a Factory e o Transformer(s) .Se você gostou deste projeto, consulte a estrutura Setl aqui: https://github.com/setl-developers/setl e por que não trazer sua contribuição!