YouTube Setl은 SETL 프레임 워크를 연습하기위한 시작점을 제공하는 것을 목표로하는 프로젝트입니다 : https://github.com/setl-developers/setl. 아이디어는 추출, 변환 및로드 작업과 관련된 컨텍스트 프로젝트를 제공하는 것입니다. 운동에는 쉬운 모드, 일반 모드 및 하드 모드의 세 가지 수준이 있습니다.
사용되는 데이터는 Kaggle, https://www.kaggle.com/datasnaek/youtube-new에서 나온 것입니다.
나는이 프로젝트에 Scala와 Apache Spark와 함께 JetBrains Intellij Idea Community Edition을 사용했습니다.
이 데이터는 다중 지역 : 캐나다 (CA), 독일 (DE), 프랑스 (FR), 영국 (GB), 인도 (IN), 일본 (JP), 한국 (KR), 멕시코 (MX), 러시아 (RU) 및 미국 (미국)으로 나뉩니다. 이 지역마다 두 개의 파일이 있습니다.

매일 YouTube는 각 국가에서 가장 유행하는 비디오 중 약 200 개를 제공합니다. YouTube는 완전히 공개되지 않은 요소의 조합을 기반으로 비디오가 얼마나 트렌디한지 측정합니다. 이 데이터 세트는 Everyday 's Top Trending 비디오 모음으로 구성됩니다. 결과적으로 동일한 비디오가 여러 번 나타날 수 있으므로 며칠 동안 유행이 발생합니다.
기본적으로 항목 필드의 요소를 사용하면 CSV 파일의 category_id 전체 이름 범주에 매핑 할 수 있습니다.
우리는이 데이터 세트를 분석하고 "인기있는"비디오를 결정할 것입니다. 그러나 인기있는 비디오를 어떻게 정의합니까? 우리는 동영상의 인기를 횟수, 좋아요, 싫어함, 댓글 수 및 추세 일수를 기반으로 비디오의 인기를 정의 할 것입니다.
이 정의는 분명히 논쟁의 여지가 있고 임의적이며 비디오의 인기에 대한 최상의 정의를 찾고자하지 않습니다. 우리는이 프로젝트의 목적에만 초점을 맞출 것입니다 : SETL 프레임 워크를 사용하여 연습하십시오.
이 프로젝트의 목표는 100 개의 가장 "인기있는"비디오와 가장 "인기있는"비디오 카테고리를 찾는 것입니다. 그러나 비디오의 인기를 어떻게 정의 했습니까? 공식은 다음과 같습니다.
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight .
좋아하는 백분율은 싫어하는 것보다 좋아하는 비율입니다. 이 비율은 뷰 수에 대해 정규화됩니다. 동일한 정규화가 주석 수로 수행됩니다.
다음은 프로젝트를 실현하기위한 각 난이도에 대한 지침입니다. 각 난이도에 대해 리포지트를 특정 지점으로 복제하여 시작 프로젝트를 수행 할 수 있습니다.
이 프로젝트에서는 이미 Scala와 Apache Spark에 대한 기본 지식이 있다고 가정합니다.
entity ; 트랜스포머를 포함하는 factory ; 및 데이터 변환을 포함하는 transformer .Factory 또는 Transformer 만들 때 Ctrl+i 사용하여 필요한 기능을 자동으로 생성 할 수 있습니다. 우리가해야 할 첫 번째 일은 물론 입력을 읽는 것입니다.
카테고리 파일부터 시작하겠습니다. 모든 카테고리 파일은 JSON 파일입니다. 카테고리를 나타내는 케이스 클래스를 만듭니다. 그런 다음 카테고리 파일을 케이스 클래스로 처리하는 Transformer 있는 Factory 만듭니다.
local.conf 파일을 살펴보십시오. 카테고리 파일을 읽기 위해 객체가 이미 작성되었습니다.org.apache.spark.sql.functions 의 폭발 기능을 살펴보십시오.coalesce 사용하는 것을 잊지 마십시오. 이제 비디오 파일로 작업 할 수 있습니다. 마찬가지로 입력을 읽기위한 비디오를 나타내는 케이스 클래스를 만듭니다. 그런 다음 처리를 수행 할 하나 또는 여러 개의 Transformers 있는 Factory 입니다. 비디오 파일은 지역과 분리되어 있으므로 데이터 세트의 각 레코드에 대한 지역 정보가 없습니다. 비디오 와 매우 유사한 다른 Case 클래스 비디오 홀로 사용 하여이 정보를 추가하고 모든 레코드를 단일 데이터 프레임/데이터 세트로 병합하십시오.
Transformers 유용합니다 : 하나는 country 열을 추가하고, 하나는 모든 비디오를 단일 데이터 세트로 병합하기위한 것입니다.비디오는 하루와 다음날 비디오가 가장 큰 트렌드 일 수 있기 때문에 비디오가 여러 행을 가질 수 있습니다. 여기서 각각은 각각의 숫자가 다른 숫자가 다른 숫자를 가지고 있으며, 결과적으로, 우리는 각 지역에 대해 단일 비디오에 사용할 수있는 최신 통계를 검색해야합니다. 왜냐하면 이러한 통계는 인상적이기 때문입니다. 동시에, 우리는 모든 비디오에 대해 추세 일수를 계산할 것입니다.
이전 사례 클래스와 매우 유사하지만 트렌드 데이 정보가있는 사례 클래스 Videostats를 만듭니다.
먼저 각 비디오의 추세 일수를 계산하십시오.
org.apache.spark.sql.functions 의 window 함수를보십시오.최신 통계를 검색하려면 각 비디오의 최신 트렌드 데이를 검색해야합니다. 실제로 최신 통계입니다.
window 만들어야합니다. 첫 번째는 추세의 날을 계산하는 것이었고 두 번째는 최신 통계를 검색하는 것이 었습니다.rank 기능을 사용하는 것입니다.결과를 지역, 유행의 날,보기, 좋아요 및 주석별별로 정렬하십시오. 다음 성과를 위해 데이터를 준비합니다.
우리는 이제 최신 통계를 얻은 후 각 비디오의 인기 점수를 계산할 것입니다. 앞에서 말했듯이, 우리의 공식은 매우 간단하며 현실을 나타내지 않을 수 있습니다.
뷰 수보다 좋아요/싫어하는 수를 정상화합시다. 각 레코드마다 좋아하는 수를 조회수로 나눈 다음 값의 수를 조회수로 나눕니다. 그 후 "정규화 된"좋아요의 백분율을 얻으십시오.
이제 댓글 수를 정상화합시다. 각 레코드에 대해 의견 수를 조회수로 나눕니다.
이제 인기 점수를 계산할 수 있습니다. 공식은 다음과 같습니다. views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
그러나 댓글이 비활성화되는 비디오가 있습니다. 이 경우 공식은 다음과 같습니다. views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . 우리는 중량을 임의로 결정했습니다.
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 쉽게 수정할 수 있도록 Input 으로 설정하십시오.
when 에서 otherwise org.apache.spark.sql.functions 확인하십시오. score 내림차순으로 정렬하고 100 개의 첫 번째 레코드를 가져갑니다. 이제 10 개 지역에서 가장 "인기있는"비디오가 있습니다.
우리가해야 할 첫 번째 일은 물론 입력을 읽는 것입니다.
카테고리 파일부터 시작하겠습니다. 모든 카테고리 파일은 JSON 파일입니다. 워크 플로는 다음과 같습니다. 우리는 읽을 카테고리 파일을 나타내는 구성 파일을 정의 할 것입니다. 범주를 나타내는 사례 클래스를 만듭니다. 그런 다음 카테고리 파일을 케이스 클래스로 처리하는 Transformer 가있는 Factory . 마지막으로, 우리는 변환을 트리거하기 위해 Pipeline 에 Stage 추가 할 것입니다.
구성 객체는 이미 resources/local.conf 에서 작성되었습니다. storage 및 path 옵션에주의하십시오. 그에 따라 카테고리 파일을 이동하십시오. 여러 파일이 동일한 폴더에 있고 폴더가 경로로 사용되는 경우 SETL은 파일을 단일 파일의 파티션으로 고려합니다. 다음으로 App.scala 확인하십시오. setConnector() 및 setSparkRepository() 메소드를 사용했음을 알 수 있습니다. 저장소를 사용할 때마다 구성에 구성을 추가하고 setl 객체에 등록해야합니다.
entity 폴더에서 Category 라는 케이스 클래스를 만듭니다. 이제 카테고리 파일에서 필요한 필드를 검사하십시오.
id 과 카테고리 title 필요합니다. 파일을 확인하고 동일한 철자를 사용하여 Category 케이스 클래스를 만듭니다.
Factory 의 골격이 이미 제공되었습니다. 논리적 구조를 이해해야합니다.
Connector 형태의 Delivery 통해 입력을 검색 할 수 있습니다. 또 다른 Delivery SparkRepository 역할을하며, 여기서 우리는 변환의 출력을 쓸 것입니다. App.scala 에서 각 Delivery 의 id 와 deliveryId 확인하십시오. SETL이 리포지토리를 가져올 때 모호함이 없으므로 사용됩니다. 이전 두 가지 배송을 읽을 수 있으려면 Connector 를 읽기위한 DataFrame 과 출력 SparkRepository 저장하기위한 Dataset 두 가지 변수를 사용합니다. 그들 사이의 차이점은 SparkRepository 가 입력되어 Dataset 입력된다는 것입니다.Factory 에는 4 가지 기능이 필요합니다.read : 아이디어는 Connector 또는 SparkRepository Delivery 입력을 가져 와서 필요한 경우 전처리하고 변수에 저장하여 다음 기능에서 사용하는 것입니다.process : 모든 데이터 변환이 수행되는 곳이 있습니다. 사용중인 Transformer 의 인스턴스를 작성하고 transform() 메소드를 호출하고 transformed getter를 사용하고 결과를 변수로 저장하십시오.write : 이름에서 알 수 있듯이 변환이 완료된 후에 변환의 출력을 저장하는 데 사용됩니다. Connector write() 메소드를 사용하여 DataFrame 저장하고 SparkRepository save() 메소드를 사용하여 Dataset 저장합니다.get :이 기능은 출력을 Pipeline 의 다음 Stage 로 전달하는 데 사용됩니다. Dataset 반환합니다.process 기능에는 다중 Transformer 있을 수 있습니다. 우리는 프로젝트의 나머지 부분 에서이 구조를 따라하려고 노력할 것입니다.Factory 의 결과는 get 기능을 통해 다음 Stage 로 자동 전송됩니다. 그러나 모든 Factory 의 출력을 작성하면 시각화 및 디버깅이 더 쉬워집니다. 다시, Transformer 의 골격이 이미 제공되었습니다. 그러나 데이터 변환을 작성하는 사람이 될 것입니다.
Transformer 논쟁을 취합니다. 일반적으로 처리하려는 DataFrame 또는 Dataset 입니다. 응용 프로그램에 따라 다른 인수를 추가 할 수 있습니다.transformedData 데이터 변환 결과를 저장하는 변수입니다.transformed 것은 데이터 변환의 결과를 검색하기 위해 Factory 에서 호출 할 getter입니다.transform() 데이터 변환을 수행하는 메소드입니다.items 필드를 선택할 것입니다. 카테고리 파일을 확인하면 필요한 정보 가이 필드에 있습니다.items 필드는 배열입니다. 우리는이 배열을 폭발시키고 snippet 필드에서 id 필드와 title 필드 만 가져 가려고합니다. 이렇게하려면 org.apache.spark.sql.functions 에서 explode 함수를 사용하십시오. 그런 다음 특정 필드를 얻으려면 id, snippet 및 title 에서 withColumn 메소드와 getField() 메소드를 사용하십시오. 당신이 만든 사례 클래스에 따라 유형을 캐스팅하는 것을 잊지 마십시오.id 와 title 열을 선택하십시오. 그런 다음 데이터 프레임을 as[T] 데이터 세트에 캐스트하십시오.Transformer 작성을 마쳤습니다. 그것이 무엇을하는지 보려면 이미 생성 된 App.scala 파일을 실행할 수 있습니다. 방금 작성한 Transformer 포함 된 Factory 실행하면 결과를 구성 파일의 경로에 출력합니다. 해당 Factory addStage() 통해 추가되어 Pipeline 을 실행하게합니다.Connector 설정, @Delivery 주석을 사용하여 deliveryId 사용합니다.Factory 의 process 방법에서 Transformer 사용.Factory 의 write 방법으로. 이제 비디오 파일을 처리합시다. 모든 파일을 단일 DataFrame / Dataset 또는 동일한 CSV 파일로 병합하고 각 비디오에 대한 영역의 정보를 유지하려고합니다. 모든 비디오 파일은 CSV 파일이며 컨텍스트 섹션에서 언급 한 것과 동일한 열이 있습니다. 워크 플로는 마지막 것과 유사합니다 : 구성; 사례 클래스; Factory ; Transformer ; Stage Pipeline 에 추가하십시오. 이번에는 여러 구성 객체를 설정할 것입니다.
우리는 지역 당 하나의 resources/local.conf 에서 여러 구성 객체를 설정할 것입니다. 각 구성 객체에서는 storage, path, inferSchema, delimiter, header, multiLine 및 dateFormat 설정해야합니다.
videos<region>Repository 와 같은 일반 이름을 알려주십시오.Factory 의 출력을 작성하기위한 구성 객체를 설정하는 것을 잊지 마십시오. entity 폴더에서 Video 라는 케이스 클래스를 만듭니다. 이제 비디오 파일에서 필요한 필드를 검사하십시오. 목표는 인기 점수를 계산하는 것이며, 공식은 number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight 상기시킵니다. 필드를 선택하는 데 도움이됩니다.
VideoCountry 라는 다른 사례 클래스를 만듭니다. Video 와 정확히 동일한 필드가 있지만 국가/지역 필드도 있습니다.
@ColumnName 주석을 볼 수 있습니다. 실제 비즈니스 상황에서 유용 할 수 있으므로 사용하십시오.java.sql.Date 사용하십시오. videoId , title , channel_title , category_id , trending_date , views , likes , dislikes , comment_count , comments_disabled 및 video_error_or_removed fields를 갖고 싶습니다.
이 공장의 목표는 모든 비디오 파일을 지역 정보를 제거하지 않고 단일 비디오로 병합하는 것입니다. 그것은 우리가 두 종류의 Transformer 사용한다는 것을 의미합니다.
Delivery SparkRepository[Video] 형태로 설정하십시오. 마지막 Delivery SparkRepository[VideoCountry] 로 설정하여 변환의 출력을 작성합니다. 많은 변수 Dataset[Video] 입력 수로 설정합니다.Factory 의 네 가지 기능을 설명해 봅시다.read : 제거 된 비디오를 필터링하거나 오류를 필터링하여 SparkRepository 전제로 처리하십시오. 그런 다음 Dataset[Video] 로 "캐스트"하여 해당 변수에 저장하십시오.process : 각 입력에 대한 첫 번째 Transformer 적용하고 결과를 두 번째 Transformer 에 적용하십시오.write : 출력 SparkRepository[VideoCountry] 작성하십시오.get : 최종 Transformer 의 결과를 반환하십시오.Connector 사용하여 입력 파일과 출력에 대한 SparkRepository 읽지 않은 이유는 무엇입니까?SparkRepository 사용하여 입력 파일의 구조를 제공하기 위해 입력을 읽었습니다.SparkRepository 와 해당 변수가 많이있는 것 같아서이 예쁜/consise를 찾지 못합니다. 다른 해결책이 없습니까?SparkRepository 형태의 Delivery 사용하는 대신 autoLoad = true 옵션이있는 Dataset 형태의 배송을 사용할 수 있습니다. 따라서, 대신 : @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
첫 번째 Transformer 의 주요 목표는 지역/국가 정보를 추가하는 것입니다. Dataset[Video] 와 문자열의 두 입력을 취하는 Transformer 만듭니다. 열 country 추가하고 Dataset[VideoCountry] 를 반환하십시오. 제거 또는 오류 로 표시된 비디오를 필터링 할 수도 있습니다. 물론,이 마지막 단계는 다른 곳에 배치 할 수 있습니다.
두 번째 Transformer 의 주요 목표는 지역 정보를 유지하면서 모든 비디오를 함께 그룹화하는 것입니다.
reduce 및 union 기능을 사용하십시오. 작업 결과를 확인하려면 App.scala 로 이동하여 SparkRepositories 설정하고 단계 VideoFactory 추가하고 코드를 실행하십시오. 해당 경로에서 출력 파일을 만듭니다.
Connector 와 SparkRepository 모두 사용하십시오.Transformer 또는 Connector 로 여러 Deliveries 읽으십시오.Factory 에서 여러 Transformers 사용하십시오.비디오는 하루 동안 가장 큰 트렌드 일 수 있고 다음 날에는 최고의 트렌드가 될 수 있으므로보기, 좋아요, 싫어함, 댓글 측면에서 다른 숫자를 갖습니다. 결과적으로 각 지역마다 단일 비디오에 사용할 수있는 최신 통계를 검색해야합니다. 동시에, 우리는 모든 비디오에 대해 추세 일수를 계산할 것입니다.
그러나 우리는 어떻게 그렇게 할 것인가? 우선, 우리는 동일한 비디오에 해당하는 레코드를 그룹화하고 기본적으로 트렌드 일의 수인 레코드 수를 계산할 것입니다. 그런 다음 최신 통계를 검색하기 위해이 그룹화 된 레코드를 순위에 올릴 것입니다.
VideoFactory 의 출력에 대한 구성 파일은 이미 이전 성과에서 설정되어있어 저장할 수 있습니다. 최신 비디오 통계를 얻으려면 읽고 처리해야합니다. 이 새로운 Factory 의 출력에 대한 구성 파일을 추가하는 것을 잊지 마십시오.
VideoStats 라는 사례 클래스를 만들고 VideoCountry 과 비슷한 필드를 가지고 있지만 추세의 수를 고려해야합니다.
이 공장에서는 입력을 읽고 데이터 처리를 수행하는 Transformer 로 전달하고 출력을 작성하는 것입니다. 매우 간단해야합니다. 다른 Factories 모방하려고 할 수 있습니다.
Deliveries 설정하는 것을 잊지 마십시오. 앞에서 말했듯이, 우리는 비디오를 함께 그룹화 할 것입니다. 이를 위해 org.apache.spark.sql.expressions.Window 사용할 것입니다. Window 미리 무엇을하는지 알아야합니다.
Window 만듭니다. 어떤 필드가 분할 할 필드를 알기 위해, 단일 비디오의 경우 어떤 필드가 동일 할 것인지 살펴보십시오.Window 만듭니다. 가장 최근 날짜를 선택하면 각 비디오의 최신 통계를 검색 할 수 있습니다.Windows 만든 후 이제 트렌드 일의 수에 대한 새로운 열 trendingDays 추가하고 순서를 내림으로써 추세 날짜의 순위에 rank 할 수 있습니다.rank 별로 필터링하고 rank 1의 레코드 만 가져 오십시오.DataFrame 을 Dataset[VideoStats] 에 캐스트하십시오.Window 에 partitionBy 및 orderBy 방법을 사용해야합니다. Dataset 를 작업 할 때 org.apache.spark.sql.functions 의 count , rank 메소드. 작업 결과를 확인하려면 App.scala 로 이동하여 SparkRepositories 설정하고 단계를 추가 한 다음 코드를 실행하십시오. 해당 경로에서 출력 파일을 만듭니다.
Pipeline 실행하는 방법.Connector 와 SparkRepository 가 무엇인지, 그리고 Deliveries 설정하는 방법을 이해하십시오. 우리는 이제 최신 통계를 얻은 후 각 비디오의 인기 점수를 계산할 것입니다. 앞에서 말했듯이, 우리의 공식은 매우 간단하며 현실을 나타내지 않을 수 있습니다. 공식이 views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight 상기합시다. VideoStats 의 이전 결과를 사용하여 공식을 적용하고 데이터를 가장 높은 점수로 가장 낮게 정렬합니다.
이것이 마지막 데이터 변환입니다. 이 마지막 Dataset[VideoStats] 저장할 수 있도록 구성을 설정하십시오. 공식에 사용 된 상수를 추가하려면 Pipeline 에서 Inputs 설정해야합니다. Pipeline 에 스테이지를 추가하기 전에 setInput[T](<value>, <id>) 사용하여 상수를 설정하십시오. 이러한 입력은 Pipeline 에 한 번 추가 한 Factories 에서 언제든지 검색 할 수 있습니다.
여기에는 존재가 필요하지 않습니다. 우리는 이전 데이터를 단순히 정렬하고 스코어를 계산하는 데 사용되는 열을 삭제하여 여전히 VideoStats 엔티티를 사용할 수 있습니다.
이 공장에서는 입력을 읽고 데이터 처리를 수행하는 Transformer 로 전달하고 출력을 작성하는 것입니다. 매우 간단해야합니다. 다른 Factories 모방하려고 할 수 있습니다.
Connector , SparkRepository 및/또는 Input Deliverable 입력 및 출력을 설정하는 것을 잊지 마십시오.뷰 수보다 좋아요/싫어하는 수를 정상화합시다. 각 레코드마다 좋아하는 수를 조회수로 나눈 다음 값의 수를 조회수로 나눕니다. 그 후 "정규화 된"좋아요의 백분율을 얻으십시오.
이제 댓글 수를 정상화합시다. 각 레코드에 대해 의견 수를 조회수로 나눕니다.
이제 인기 점수를 계산할 수 있습니다. 공식은 다음과 같습니다. views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
그러나 댓글이 비활성화되는 비디오가 있습니다. 이 경우 공식은 다음과 같습니다. views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . 우리는 중량을 임의로 결정했습니다.
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when 에서 otherwise org.apache.spark.sql.functions 확인하십시오. score 내림차순으로 정렬하고 100 개의 첫 번째 레코드를 가져갑니다. 이제 10 개 지역에서 가장 "인기있는"비디오가 있습니다.
작업 결과를 확인하려면 App.scala 로 이동하여 아직 설정되지 않은 경우 Inputs 설정하고 출력 SparkRepository 설정하고 단계를 추가 한 후 코드를 실행하십시오. 해당 경로에서 출력 파일을 만듭니다.
Input , Connector 및 SparkRepository , deliveryId 의 세 가지 유형의 Deliveries 사용하십시오.Factory 과 Transformer(s) 포함한 Stage 작성하십시오.이 프로젝트가 마음에 들었다면 여기에서 setl 프레임 워크를 확인하십시오 : https://github.com/setl-developers/setl.