YouTube Setl-это проект, который направлен на предоставление отправной точки для практики Framework SETL: https://github.com/setl-developers/setl. Идея состоит в том, чтобы дать контекстный проект, включающий операции извлечения, преобразования и нагрузки. Для упражнений существует три уровня сложности: легкий режим, нормальный режим и жесткий режим.
Используемые данные от Kaggle, https://www.kaggle.com/datasnaek/youtube-new.
Я использовал Jetbrains Intellij Idea Community Edition для этого проекта, со Scala и Apache Spark.
Данные делятся в мультипликационных регионах: Канада (CA), Германия (DE), Франция (FR), Великобритания (GB), Индия (IN), Япония (JP), Южная Корея (KR), Мексика (MX), Россия (RU) и Соединенные Штаты (США). Для каждого из этих регионов есть два файла:

Каждый день YouTube предоставляет около 200 самых популярных видео в каждой стране. YouTube измеряет, сколько видео является модным, основываясь на комбинации факторов, которые не становятся полностью публичными. Этот набор данных состоит в коллекции лучших видеороликов повседневной жизни. Как следствие, одно и то же видео появляется несколько раз, что означает, что оно является трендом в течение нескольких дней.
По сути, элементы полей элементов позволяют нам сопоставить category_id файла CSV в категорию полного имени.
Мы собираемся проанализировать этот набор данных и определить «популярные» видео. Но как мы определяем популярное видео? Мы собираемся определить популярность видео, основанного на его количестве просмотров, лайков, антипатии, количества комментариев и количества трендовых дней.
Это определение явно спорно и произвольно, и мы не стремимся выяснить лучшее определение популярности видео. Мы сосредоточимся только на цели этого проекта: практиковать с помощью платформы SETL.
Цель этого проекта - найти 100 самых «популярных» видео и самые «популярные» категории видео. Но как мы определили популярность видео? Формула будет:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight .
Процент лайков - это соотношение лайков, а не антипатия. Это соотношение нормализуется по количеству видов. Та же нормализация выполняется с количеством комментариев.
Ниже приведены инструкции для каждого уровня сложности, чтобы реализовать проект. Для каждого уровня сложности вы можете клонировать репо с конкретной ветвью, чтобы иметь начальный проект.
Для этого проекта мы предполагаем, что у вас уже есть базовые знания Scala и Apache Spark.
entity , которая содержит классы дела или объекты; factory , которая содержит трансформаторы; и transformer , который содержит преобразования данных.Factory SETL или Transformer , вы можете использовать Ctrl+i для автоматического создания необходимых функций. Первое, что мы собираемся сделать, это, конечно, прочитать входные данные: файлы CSV, которые я позвоню в файлы видео, а файлы JSON - файлы категорий.
Давайте начнем с файлов категорий. Все файлы категорий являются файлами JSON . Создайте класс случаев, который представляет категорию , а затем Factory с Transformer , который будет обрабатывать файлы категорий в класс болезни.
local.conf . Объект уже был создан для чтения файлов категорий.org.apache.spark.sql.functions .coalesce при сохранении файла. Теперь мы можем работать с файлами видео. Точно так же создайте класс случаев, который представляет видео для чтения входов, затем Factory с одним или несколькими Transformers , которые будут выполнять обработку. Поскольку файлы видео отделены от регионов, в наборе данных нет информации о регионе для каждой записи. Попробуйте добавить эту информацию, используя еще одну видеокаунтри , которая очень похожа на видео , и объедините все записи в одном DataFrame/DataSet.
Transformers : один для добавления столбца country , а один для объединения всех видео в один набор данных.Поскольку видео может быть лучшим трендом на один день и на следующий день, видео может иметь несколько строк, где у каждого есть разные цифры с точки зрения представлений, лайков, не любителей, комментариев ... Как следствие, мы должны получить новейшие статистические данные, доступные для одного видео, для каждого региона, поскольку эти статистические данные являются постепенными. В то же время мы собираемся вычислить количество дней в тренде для каждого видео.
Создайте видеостаты класса корпуса, которые очень похожи на предыдущие классы дела, но с информацией о днях в тренде.
Во -первых, вычислите количество тенденций дни каждого видео.
window от org.apache.spark.sql.functions .Чтобы получить новейшую статистику, вы должны получить последний трендовый день каждого видео. На самом деле это последняя доступная статистика.
window . Первый был для вычисления количества дней в тренде, а второй, чтобы получить новейшую статистику.rank .Сортируйте результаты по региону, количество дний, просмотров, лайков, а затем комментариев. Он подготовит данные для следующего достижения.
Сейчас мы собираемся вычислить оценку популярности каждого видео после получения их последней статистики. Как говорилось ранее, наша формула очень проста и может не представлять реальность.
Давайте нормализуем количество лайков/не любит количество просмотров. Для каждой записи разделите количество лайков по количеству просмотров, а затем количество нежеланий на количество просмотров. После этого получите процент «нормализованных» лайков.
Давайте теперь нормализуем количество комментариев. Для каждой записи разделите количество комментариев по количеству просмотров.
Теперь мы можем вычислить оценку популярности. Напомните, что формула: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Тем не менее, есть видео, где комментарии отключены. В этом случае формула становится: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Мы произвольно решили, что веса будут:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 Установите их как Input , чтобы их можно было легко изменить.
when и otherwise функционирует из org.apache.spark.sql.functions . Сортировать по score в порядке убывания и возьмите 100 первых записей. Теперь у вас есть 100 самых «популярных» видео из 10 регионов.
Первое, что мы собираемся сделать, это, конечно, читать входные данные: файлы CSV, которые я позвоню в файлы видео, а файлы JSON - файлы категорий.
Давайте начнем с файлов категорий. Все файлы категорий являются файлами JSON. Вот рабочий процесс: мы собираемся определить файл конфигурации, который будет указывать файлы категорий для чтения; создать класс случаев, который представляет категорию; Затем Factory с Transformer , который будет обрабатывать файлы категорий в класс дела. Наконец, мы собираемся добавить Stage в Pipeline , чтобы вызвать преобразования.
Объект конфигурации уже был создан в resources/local.conf . Обратите внимание на варианты storage и path . Переместите файлы категорий соответственно. Если в одной папке есть несколько файлов, а папка используется в качестве пути, SETL рассматривает файлы как разделы одного файла. Далее, проверьте App.scala . Вы можете видеть, что мы использовали методы setConnector() и setSparkRepository() . Каждый раз, когда вы хотите использовать репозиторий, вам необходимо добавить конфигурацию в конфигурацию и зарегистрировать ее в объекте setl .
Создайте Category Case Class в папке entity . Теперь изучите в файлах категорий поля, которые нам понадобятся.
Нам понадобится id и title категории. Обязательно проверьте файлы и используйте одно и то же написание, чтобы создать класс Category .
Скелет Factory уже был предоставлен. Убедитесь, что вы понимаете логическую структуру.
Delivery в виде Connector позволяет нам извлечь входные данные. Другая Delivery будет действовать как SparkRepository , где мы напишем вывод преобразования. Проверьте id каждой Delivery и deliveryId в App.scala . Они используются, поэтому нет никакой двусмысленности, когда SETL складывает репозитории. Чтобы иметь возможность прочитать две предыдущие поставки, мы собираемся использовать две другие переменные: DataFrame для чтения Connector и Dataset для хранения выходного SparkRepository . Разница между ними состоит в том, что напечатана SparkRepository , отсюда и Dataset .Factory SETL :read : Идея состоит в том, чтобы взять входы поставки Connector или SparkRepository Delivery , при необходимости предварительно обработать их и хранить в переменных для использования в следующей функции.process : Вот где все преобразования данных будут выполнены. Создайте экземпляр используемого вами Transformer , вызовите метод transform() , используйте transformed Getter и сохраните результат в переменной.write : Как следует из названия, он используется для сохранения вывода преобразований после того, как они были сделаны. Connector использует метод write() для сохранения данных DataFrame , а SparkRepository использует метод save() для сохранения Dataset .get : эта функция используется для передачи вывода в следующую Stage Pipeline . Просто верните Dataset .process может быть несколько Transformer . Мы попытаемся следовать этой структуре на протяжении всей остальной части проекта.Factory будет автоматически передаваться на следующий Stage через функцию get . Тем не менее, написание вывода на каждой Factory будет проще для визуализации и отладки. Опять же, скелет Transformer уже был предоставлен. Тем не менее, вы будете тем, кто напишет преобразование данных.
Transformer берет на себя спор. Обычно это DataFrame или Dataset , который мы хотим обработать. В зависимости от вашего приложения вы можете добавить другие аргументы.transformedData - это переменная, которая будет хранить результат преобразования данных.transformed - это Getter, который будет вызван Factory для извлечения результата преобразования данных.transform() - это метод, который будет выполнять преобразование данных.items . Если вы посмотрите файлы категорий, информация, которая нам нужна, находится в этом поле.items является массивом. Мы хотим взорвать этот массив и взять только поле id и поле title из поля snippet . Для этого используйте функцию explode от org.apache.spark.sql.functions . Затем, чтобы получить конкретные поля, используйте метод withColumn и метод getField() на id, snippet и title . Не забудьте соответственно разыграть типы на класс, который вы создали.id и столбцы title . Затем раздайте DataFrame в набор данных с as[T] .Transformer . Чтобы увидеть, что он делает, вы можете запустить файл App.scala , который уже был создан. Он просто запускает Factory , которая содержит только что написанный вами Transformer , и он выведет результат в путь файла конфигурации. Обратите внимание, что соответствующая Factory была добавлена через addStage() , что заставляет Pipeline запустить его.Connector , используя аннотацию @Delivery , с помощью deliveryId .Transformer в методе process Factory .write Factory . Давайте теперь обработаем файлы видео. Мы хотели бы объединить все файлы в одном DataFrame / Dataset или в одном файле CSV, сохраняя при этом информацию региона для каждого видео. Все файлы видео представляют собой файлы CSV, и они имеют одинаковые столбцы, как ранее указывалось в разделе контекста . Рабочий процесс похож на последнюю: конфигурацию; Класс Кейс; Factory ; Transformer ; Добавьте Stage в Pipeline . На этот раз мы собираемся установить несколько объектов конфигурации.
Мы собираемся установить несколько объектов конфигурации в resources/local.conf , один на регион. В каждом объекте конфигурации вам придется установить storage, path, inferSchema, delimiter, header, multiLine и dateFormat .
videos<region>Repository .Factory . Создайте класс Case Class с именем Video в папке entity . Теперь изучите в файлах видео, поля, которые нам понадобятся. Напомните, что цель состоит в том, чтобы вычислить оценку популярности, и что формула представляет собой number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight . Это поможет выбрать поля.
Создайте еще один класс CASE с именем VideoCountry . Он будет иметь точно такие же поля, что и Video , но с полем страны/региона.
@ColumnName фонда. Постарайтесь использовать его, поскольку это может быть полезно в некоторых реальных бизнес-ситуациях.java.sql.Date для поля типа даты. Мы хотели бы, чтобы videoId , title , channel_title , category_id , trending_date , views , likes , dislikes , comment_count , comments_disabled и video_error_or_removed Поля.
Цель этой фабрики состоит в том, чтобы объединить все файлы видео в одну, не удаляя информацию о регионе. Это означает, что мы собираемся использовать два вида Transformer .
Delivery в форме SparkRepository[Video] . Установите последнюю Delivery в качестве SparkRepository[VideoCountry] , где мы напишем вывод преобразования. Установите Dataset[Video] , как и количество входов.Factory :read : предварительно обрабатывать SparkRepository , фильтраруя видео, которые удаляются или ошибочны . Затем «бросите» их в качестве Dataset[Video] и храните их в соответствующих переменных.process : примените первый Transformer для каждого из входов и примените результаты ко второму Transformer .write : Напишите вывод SparkRepository[VideoCountry] .get : просто верните результат окончательного Transformer .Connector для чтения входных файлов и SparkRepository для вывода?SparkRepository для чтения входов, чтобы предоставить структуру для входных файлов.SparkRepository и много соответствующих переменных, и я не нахожу это красивым/последовательным. Нет другого решения?Delivery в форме SparkRepository вы можете использовать доставки в форме Dataset с опцией autoLoad = true . Итак, вместо того, чтобы иметь: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
Основная цель первого Transformer - добавить информацию о регионе/стране. Создайте Transformer , который принимает два входа, Dataset[Video] и строку. Добавьте country столбца и верните Dataset[VideoCountry] . Вы также можете отфильтровать видео, которые помечены как удаленные или ошибку . Конечно, этот последний шаг может быть размещен в другом месте.
Основная цель второго Transformer - перегруппировать все видео вместе, сохраняя при этом информацию о регионе.
reduce и union . Чтобы проверить результат вашей работы, перейдите на App.scala , установите SparkRepositories , добавьте сцену VideoFactory и запустите код. Он создаст выходной файл в соответствующем пути.
Connector , так и SparkRepository .Deliveries в Transformer или Connector .Transformers на Factory .Поскольку видео может быть главным трендом на один день и на следующий день, оно будет иметь разные цифры с точки зрения представлений, лайков, антипатий, комментариев ... Как следствие, мы должны получить новейшую статистику, доступную для одного видео, для каждого региона. В то же время мы собираемся вычислить количество дней в тренде для каждого видео.
Но как мы собираемся это сделать? Прежде всего, мы собираемся сгруппировать записи, которые соответствуют одному и тому же видео, и подсчитывать количество записей, которые в основном представляют собой количество дней в тренде. Затем мы собираемся ранжировать эти сгруппированные записи и взять последние, чтобы получить последнюю статистику.
Файл конфигурации для вывода VideoFactory уже установлен в предыдущем достижении, поэтому его можно сохранить. Вам нужно будет прочитать его и обработать его, чтобы получить последнюю статистику видео. Не забудьте добавить файл конфигурации для вывода этой новой Factory .
Создайте класс CASE с именем VideoStats , которые имеют аналогичные поля для VideoCountry , но вам необходимо учитывать количество дней в тренде.
На этой фабрике все, что вам нужно сделать, это прочитать ввод, передать его Transformer , который будет выполнять обработку данных, и написать выход. Это должно быть довольно просто; Вы можете попытаться подражать другим Factories .
Deliveries . Как говорилось ранее, мы собираемся сгруппировать видео вместе. Для этого мы собираемся использовать org.apache.spark.sql.expressions.Window . Убедитесь, что вы знаете, что делает Window заранее.
Window , которое вы разделите, чтобы подсчитать количество дней в тренде для каждого видео. Чтобы узнать, какие поля вы собираетесь разбить, посмотрите, какие поля будут такими же для одного видео.Window , которое будет использоваться для ранжирования видео до даты их трендов. Выбирая самую последнюю дату, мы можем получить последнюю статистику каждого видео.Windows , теперь вы можете добавить новые столбцы trendingDays для количества дний и rank для ранжирования даты тренда путем убывающего порядка.rank , взяв только записи с rank 1.DataFrame на Dataset[VideoStats] .partitionBy и orderBy для Window ; и count , методы rank от org.apache.spark.sql.functions при работе с Dataset . Чтобы проверить результат вашей работы, перейдите на App.scala , установите SparkRepositories , добавьте сцену и запустите код. Он создаст выходной файл в соответствующем пути.
Pipeline .Connector и SparkRepository , и как установить их Deliveries . Сейчас мы собираемся вычислить оценку популярности каждого видео после получения их последней статистики. Как говорилось ранее, наша формула очень проста и может не представлять реальность. Давайте напомним, что формула - это views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight . Используя предыдущий результат VideoStats , мы просто собираемся применить формулу и сортировать данные с самым высоким баллом до самого низкого.
Это последнее преобразование данных. Установите конфигурацию, чтобы вы могли сохранить этот последний Dataset[VideoStats] . Чтобы добавить константы, используемые для формулы, вам нужно будет установить Inputs в Pipeline . Перед добавлением этапов в Pipeline используйте setInput[T](<value>, <id>) , чтобы установить константы. Эти входные данные можно найти в любое время на любых Factories , которые были добавлены в Pipeline .
Здесь не понадобится. Мы просто сортируем предыдущие данные и отбросим столбцы, используемые для вычисления оценки, чтобы мы все еще могли использовать объект VideoStats .
На этой фабрике все, что вам нужно сделать, это прочитать ввод, передать его Transformer , который будет выполнять обработку данных, и написать выход. Это должно быть довольно просто; Вы можете попытаться подражать другим Factories .
Deliverable : Connector , SparkRepository и/или Input .Давайте нормализуем количество лайков/не любит количество просмотров. Для каждой записи разделите количество лайков по количеству просмотров, а затем количество нежеланий на количество просмотров. После этого получите процент «нормализованных» лайков.
Давайте теперь нормализуем количество комментариев. Для каждой записи разделите количество комментариев по количеству просмотров.
Теперь мы можем вычислить оценку популярности. Напомните, что формула: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Тем не менее, есть видео, где комментарии отключены. В этом случае формула становится: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Мы произвольно решили, что веса будут:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when и otherwise функционирует из org.apache.spark.sql.functions . Сортировать по score в порядке убывания и возьмите 100 первых записей. Теперь у вас есть 100 самых «популярных» видео из 10 регионов.
Чтобы проверить результат вашей работы, перейдите на App.scala , установите Inputs , если они еще не установлены, установите выходной SparkRepository , добавьте стадию и запустите код. Он создаст выходной файл в соответствующем пути.
Deliveries : Input , Connector и SparkRepository , с deliveryId .Stage , включая Factory и Transformer(s) .Если вам понравился этот проект, пожалуйста, ознакомьтесь с Setl Framework здесь: https://github.com/setl-developers/setl, и почему бы не внести свой вклад!