YouTube Setl adalah proyek yang bertujuan memberikan titik awal untuk mempraktikkan kerangka setl: https://github.com/setl-developers/setl. Idenya adalah untuk memberikan proyek konteks yang melibatkan ekstrak, transformasi, dan memuat operasi. Ada tiga tingkat kesulitan untuk latihan: mode mudah, mode normal dan mode keras.
Data yang digunakan adalah dari Kaggle, https://www.kaggle.com/datasnaek/youtube-new.
Saya menggunakan JetBrains Intellij Idea Community Edition untuk proyek ini, dengan Scala dan Apache Spark.
Data ini dibagi di daerah berlipat ganda: Kanada (CA), Jerman (DE), Prancis (FR), Inggris Raya (GB), India (IN), Jepang (JP), Korea Selatan (KR), Meksiko (MX), Rusia (RU) dan Amerika Serikat (AS). Untuk masing -masing wilayah ini, ada dua file:

Setiap hari, YouTube menyediakan sekitar 200 video yang paling tren di setiap negara. YouTube mengukur seberapa besar video yang trendi berdasarkan kombinasi faktor yang tidak sepenuhnya dipublikasikan. Dataset ini terdiri dari kumpulan video tren terbaik sehari -hari. Sebagai akibatnya, adalah mungkin bagi video yang sama muncul beberapa kali, yang berarti tren selama beberapa hari.
Pada dasarnya, elemen -elemen bidang item memungkinkan kita untuk memetakan file category_id dari file CSV ke kategori nama lengkap.
Kami akan menganalisis dataset ini dan menentukan video "populer". Tapi, bagaimana kita mendefinisikan video populer? Kami akan mendefinisikan popularitas video berdasarkan jumlah pandangan, suka, tidak suka, jumlah komentar, dan jumlah hari yang sedang tren.
Definisi ini jelas diperdebatkan dan sewenang -wenang, dan kami tidak ingin mengetahui definisi terbaik untuk popularitas video. Kami hanya akan fokus pada tujuan proyek ini: berlatih dengan kerangka setl.
Tujuan dari proyek ini adalah untuk menemukan 100 video paling "populer", dan kategori video "populer" yang paling populer. Tapi bagaimana kita mendefinisikan popularitas video? Formulanya akan menjadi:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight .
Persentase suka adalah rasio suka karena tidak suka. Rasio ini dinormalisasi dalam jumlah tampilan. Normalisasi yang sama dilakukan dengan jumlah komentar.
Di bawah ini adalah instruksi untuk setiap tingkat kesulitan untuk mewujudkan proyek. Untuk setiap tingkat kesulitan, Anda dapat mengkloning repo dengan cabang spesifik untuk memiliki proyek awal.
Untuk proyek ini, kami berasumsi bahwa Anda sudah memiliki pengetahuan dasar tentang Scala dan Apache Spark.
entity yang berisi kelas case atau objek; factory yang berisi transformator; dan transformer yang berisi transformasi data.Factory setl atau Transformer , Anda dapat menggunakan Ctrl+i untuk secara otomatis membuat fungsi yang diperlukan. Hal pertama yang akan kami lakukan adalah, tentu saja, membaca input: file CSV, yang akan saya sebut file video, dan file JSON, file kategori.
Mari kita mulai dengan file kategori. Semua file kategori adalah file JSON . Buat kelas case yang mewakili kategori , kemudian Factory dengan Transformer yang akan memproses file kategori ke dalam kelas case.
local.conf . Suatu objek telah dibuat untuk membaca file kategori.org.apache.spark.sql.functions .coalesce saat menyimpan file. Kami sekarang dapat bekerja dengan file video. Demikian pula, buat kelas case yang mewakili video untuk membaca input, kemudian Factory dengan satu atau beberapa Transformers yang akan melakukan pemrosesan. Karena file video dipisahkan dari daerah, tidak ada informasi wilayah untuk setiap catatan dalam dataset. Cobalah untuk menambahkan informasi ini dengan menggunakan Videocountry kelas kasus lain yang sangat mirip dengan video , dan gabungkan semua catatan dalam satu data dan dataset.
Transformers akan berguna: satu untuk menambahkan kolom country , dan satu untuk menggabungkan semua video menjadi satu dataset.Karena video dapat menjadi tren teratas untuk hari dan hari berikutnya, dimungkinkan untuk video memiliki beberapa baris, di mana masing -masing memiliki angka yang berbeda dalam hal pandangan, suka, tidak suka, komentar ... sebagai akibatnya, kita harus mengambil statistik terbaru yang tersedia untuk satu video, untuk setiap wilayah, karena statistik ini tidak dapat diselesaikan. Pada saat yang sama, kami akan menghitung jumlah hari tren untuk setiap video.
Buat case class videostats , yang sangat mirip dengan kelas kasus sebelumnya, tetapi dengan informasi hari yang sedang tren.
Pertama, hitung jumlah hari tren dari setiap video.
window dari org.apache.spark.sql.functions .Untuk mengambil statistik terbaru, Anda harus mengambil hari tren terbaru dari setiap video. Ini sebenarnya statistik terbaru yang tersedia.
window lain. Yang pertama adalah untuk menghitung jumlah hari tren, dan yang kedua untuk mengambil statistik terbaru.rank .Urutkan hasil berdasarkan wilayah, jumlah hari tren, tampilan, suka dan kemudian komentar. Ini akan menyiapkan data untuk pencapaian berikutnya.
Kami sekarang akan menghitung skor popularitas setiap video, setelah mendapatkan statistik terbaru mereka. Seperti yang dikatakan sebelumnya, formula kami sangat sederhana dan mungkin tidak mewakili kenyataan.
Mari kita normalkan jumlah suka/tidak suka atas jumlah tampilan. Untuk setiap catatan, bagi jumlah suka dengan jumlah tampilan, dan kemudian jumlah ketidaksukaan dengan jumlah tampilan. Setelah itu, dapatkan persentase suka "dinormalisasi".
Sekarang mari kita normalisasi jumlah komentar. Untuk setiap catatan, bagi jumlah komentar dengan jumlah tampilan.
Kami sekarang dapat menghitung skor popularitas. Ingatkan bahwa rumusnya adalah: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Namun, ada video di mana komentar dinonaktifkan. Dalam hal ini, formula menjadi: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Kami secara sewenang -wenang memutuskan bobotnya:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 Aturnya sebagai Input sehingga dapat dengan mudah dimodifikasi.
when dan otherwise berfungsi dari org.apache.spark.sql.functions . Urutkan berdasarkan score dalam urutan menurun, dan ambil 100 catatan pertama. Anda sekarang memiliki 100 video paling "populer" dari 10 wilayah.
Hal pertama yang akan kami lakukan adalah, tentu saja, membaca input: file CSV, yang akan saya sebut file video, dan file JSON, file kategori.
Mari kita mulai dengan file kategori. Semua file kategori adalah file JSON. Berikut adalah alur kerja: Kami akan menentukan file konfigurasi yang akan menunjukkan file kategori untuk dibaca; Buat kelas case yang mewakili kategori; Kemudian Factory dengan Transformer yang akan memproses file kategori ke dalam kelas case. Akhirnya, kita akan menambahkan Stage ke dalam Pipeline untuk memicu transformasi.
Objek konfigurasi telah dibuat di resources/local.conf . Perhatikan opsi storage dan path . Pindahkan file kategori sesuai. Jika beberapa file berada di folder yang sama dan folder digunakan sebagai jalur, Setl akan menganggap file sebagai partisi dari satu file. Selanjutnya, lihat App.scala . Anda dapat melihat bahwa kami menggunakan metode setConnector() dan setSparkRepository() . Setiap kali Anda ingin menggunakan repositori, Anda harus menambahkan konfigurasi dalam konfigurasi dan mendaftarkannya di objek setl .
Buat kelas case bernama Category di folder entity . Sekarang periksa, dalam file kategori, bidang yang akan kita butuhkan.
Kami akan membutuhkan id dan title kategori. Pastikan untuk memeriksa file dan menggunakan ejaan yang sama untuk membuat kelas kasus Category .
Kerangka Factory telah disediakan. Pastikan Anda memahami struktur logis.
Delivery dalam bentuk Connector memungkinkan kita untuk mengambil input. Delivery lain akan bertindak sebagai SparkRepository , di mana kita akan menulis output dari transformasi. Lihatlah id dari setiap Delivery dan deliveryId di App.scala . Mereka digunakan sehingga tidak ada ambiguitas saat setl mengambil repositori. Untuk dapat membaca dua pengiriman sebelumnya, kami akan menggunakan dua variabel lain: DataFrame untuk membaca Connector , dan Dataset untuk menyimpan output SparkRepository . Perbedaan di antara mereka adalah bahwa SparkRepository diketik, oleh karena itu Dataset .Factory setl :read : Idenya adalah untuk mengambil input pengiriman Connector atau SparkRepository Delivery , preprocess jika diperlukan, dan menyimpannya ke dalam variabel untuk menggunakannya di fungsi berikutnya.process : Di sinilah semua transformasi data akan dilakukan. Buat instance dari Transformer yang Anda gunakan, panggil metode transform() , gunakan pengambil transformed dan simpan hasilnya menjadi variabel.write : Seperti namanya, ini digunakan untuk menyimpan output transformasi setelah selesai. Connector menggunakan metode write() untuk menyimpan DataFrame , dan SparkRepository menggunakan metode save() untuk menyimpan Dataset .get : Fungsi ini digunakan untuk meneruskan output ke Stage Pipeline berikutnya. Kembalikan saja Dataset .process , mungkin ada beberapa Transformer . Kami akan mencoba mengikuti struktur ini di seluruh proyek lainnya.Factory akan secara otomatis ditransfer ke Stage berikutnya melalui fungsi get . Namun, menulis output dari setiap Factory akan lebih mudah untuk visualisasi dan debugging. Sekali lagi, kerangka Transformer telah disediakan. Namun, Anda akan menjadi orang yang akan menulis transformasi data.
Transformer kami bertengkar. Biasanya, itu adalah DataFrame atau Dataset yang ingin kami proses. Bergantung pada aplikasi Anda, Anda dapat menambahkan argumen lain.transformedData adalah variabel yang akan menyimpan hasil transformasi data.transformed adalah pengambil yang akan dipanggil oleh Factory untuk mengambil hasil transformasi data.transform() adalah metode yang akan melakukan transformasi data.items . Jika Anda memeriksa file kategori, informasi yang kami butuhkan ada di bidang ini.items adalah array. Kami ingin meledak array ini dan hanya mengambil bidang id dan bidang title dari bidang snippet . Untuk melakukan itu, gunakan fungsi explode dari org.apache.spark.sql.functions . Kemudian, untuk mendapatkan bidang tertentu, gunakan metode withColumn dan metode getField() pada id, snippet dan title . Jangan lupa untuk melemparkan tipe yang sesuai dengan kelas kasing yang Anda buat.id dan kolom title . Kemudian, masukkan DataFrame ke dalam dataset dengan as[T] .Transformer . Untuk melihat apa fungsinya, Anda dapat menjalankan file App.scala yang telah dibuat. Ini hanya menjalankan Factory yang berisi Transformer yang baru saja Anda tulis, dan itu akan menghasilkan hasilnya ke jalur file konfigurasi. Perhatikan bahwa Factory yang sesuai telah ditambahkan melalui addStage() yang membuat Pipeline menjalankannya.Connector , Menggunakan Anotasi @Delivery , dengan deliveryId .Transformer dalam metode process sebuah Factory .write sebuah Factory . Sekarang mari kita proses file video. Kami ingin menggabungkan semua file dalam satu DataFrame / Dataset atau dalam file CSV yang sama, sambil menyimpan informasi wilayah tersebut untuk setiap video. Semua file video adalah file CSV dan mereka memiliki kolom yang sama, seperti yang dinyatakan sebelumnya di bagian konteks . Alur kerja mirip dengan yang terakhir: konfigurasi; kelas kasus; Factory ; Transformer ; Tambahkan Stage ke dalam Pipeline . Kali ini, kami akan mengatur beberapa objek konfigurasi.
Kami akan mengatur beberapa objek konfigurasi di resources/local.conf , satu per wilayah. Di setiap objek konfigurasi, Anda harus mengatur storage, path, inferSchema, delimiter, header, multiLine dan dateFormat .
videos<region>Repository .Factory . Buat kelas case bernama Video di folder entity . Sekarang periksa, di file video, bidang yang kita perlukan. Ingatkan bahwa tujuannya adalah untuk menghitung skor popularitas, dan bahwa formulanya adalah number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight . Ini akan membantu memilih bidang.
Buat kelas kasus lain bernama VideoCountry . Ini akan memiliki bidang yang persis sama dengan Video , tetapi dengan bidang negara/wilayah sebagai tambahan.
@ColumnName . Cobalah untuk menggunakannya karena dapat berguna dalam beberapa situasi bisnis kehidupan nyata.java.sql.Date untuk bidang jenis tanggal. Kami ingin memiliki videoId , title , channel_title , category_id , trending_date , views , likes , dislikes , comment_count , comments_disabled dan bidang video_error_or_removed .
Tujuan dari pabrik ini adalah untuk menggabungkan semua file video menjadi satu, tanpa menghapus informasi wilayah. Itu berarti bahwa kita akan menggunakan dua jenis Transformer .
Delivery input dalam bentuk SparkRepository[Video] . Tetapkan Delivery terakhir sebagai SparkRepository[VideoCountry] , di mana kami akan menulis output dari transformasi. Tetapkan sebanyak mungkin variabel Dataset[Video] sebagai jumlah input.Factory :read : Preprocess SparkRepository dengan memfilter video yang dihapus atau kesalahan . Kemudian, "cast" mereka sebagai Dataset[Video] dan menyimpannya ke variabel yang sesuai.process : Terapkan Transformer pertama untuk masing -masing input, dan terapkan hasilnya ke Transformer kedua.write : Tulis output SparkRepository[VideoCountry] .get : Kembalikan cukup hasil Transformer akhir.Connector untuk membaca file input dan SparkRepository untuk output?SparkRepository untuk membaca input hanya untuk menyediakan struktur untuk file input.SparkRepository dan banyak variabel yang sesuai, dan saya tidak menemukan ini cantik/centise. Bukankah ada solusi lain?Delivery dalam bentuk SparkRepository , Anda dapat menggunakan pengiriman dalam bentuk Dataset dengan autoLoad = true . Jadi, alih -alih memiliki: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
Tujuan utama dari Transformer pertama adalah menambahkan informasi wilayah/negara. Bangun Transformer yang mengambil dua input, Dataset[Video] dan string. Tambahkan country kolom dan kembalikan Dataset[VideoCountry] . Anda juga dapat memfilter video yang diberi label sebagai dihapus atau kesalahan . Tentu saja, langkah terakhir ini dapat ditempatkan di tempat lain.
Tujuan utama dari Transformer kedua adalah untuk menyusun kembali semua video bersama -sama, sambil menjaga informasi wilayah.
reduce dan union . Untuk memeriksa hasil pekerjaan Anda, buka App.scala , atur SparkRepositories , tambahkan VideoFactory panggung, dan jalankan kode. Ini akan membuat file output di jalur yang sesuai.
Connector dan SparkRepository .Deliveries ke dalam Transformer atau Connector .Transformers di sebuah Factory .Karena video dapat menjadi tren teratas untuk satu hari dan hari berikutnya, itu akan memiliki angka yang berbeda dalam hal pandangan, suka, tidak suka, komentar ... sebagai akibatnya, kita harus mengambil statistik terbaru yang tersedia untuk satu video, untuk setiap wilayah. Pada saat yang sama, kami akan menghitung jumlah hari tren untuk setiap video.
Tapi bagaimana kita akan melakukan itu? Pertama -tama, kita akan mengelompokkan catatan yang sesuai dengan video yang sama, dan menghitung jumlah catatan, yang pada dasarnya adalah jumlah hari yang sedang tren. Kemudian, kami akan memberi peringkat catatan yang dikelompokkan ini dan mengambil yang terbaru, untuk mengambil statistik terbaru.
File konfigurasi untuk output VideoFactory sudah diatur dalam pencapaian sebelumnya sehingga dapat disimpan. Anda perlu membacanya dan memprosesnya untuk mendapatkan statistik video terbaru. Jangan lupa menambahkan file konfigurasi untuk output Factory baru ini.
Buat kelas case bernama VideoStats yang memiliki bidang yang sama dengan VideoCountry , tetapi Anda perlu mempertimbangkan jumlah hari yang sedang tren.
Di pabrik ini, yang perlu Anda lakukan adalah membaca input, meneruskannya ke Transformer yang akan melakukan pemrosesan data, dan menulis output. Seharusnya sangat sederhana; Anda dapat mencoba meniru Factories lain.
Deliveries . Seperti yang dikatakan sebelumnya, kami akan mengelompokkan video bersama. Untuk itu, kita akan menggunakan org.apache.spark.sql.expressions.Window . Pastikan Anda tahu apa yang dilakukan Window sebelumnya.
Window pertama yang akan Anda partisi dengan menghitung jumlah hari yang sedang tren untuk setiap video. Untuk mengetahui bidang mana yang akan Anda selesaikan, lihat bidang apa yang akan sama untuk satu video.Window kedua yang akan digunakan untuk memberi peringkat video berdasarkan tanggal tren mereka. Dengan memilih tanggal terbaru, kami dapat mengambil statistik terbaru dari setiap video.Windows , Anda sekarang dapat menambahkan kolom baru trendingDays untuk jumlah hari yang sedang tren dan rank untuk peringkat tanggal tren dengan pesanan menurun.rank mereka, hanya mengambil catatan dengan rank 1.DataFrame ke Dataset[VideoStats] .partitionBy dan orderBy untuk Window ; dan count , rank metode dari org.apache.spark.sql.functions saat bekerja dengan Dataset . Untuk memeriksa hasil pekerjaan Anda, buka App.scala , atur SparkRepositories , tambahkan panggung, dan jalankan kode. Ini akan membuat file output di jalur yang sesuai.
Pipeline .Connector dan SparkRepository , dan cara mengatur Deliveries . Kami sekarang akan menghitung skor popularitas setiap video, setelah mendapatkan statistik terbaru mereka. Seperti yang dikatakan sebelumnya, formula kami sangat sederhana dan mungkin tidak mewakili kenyataan. Mari kita ingatkan bahwa formulanya adalah views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight . Menggunakan hasil sebelumnya dari VideoStats , kami hanya akan menerapkan formula, dan mengurutkan data berdasarkan skor tertinggi ke yang terendah.
Ini adalah transformasi data terakhir. Atur konfigurasi sehingga Anda dapat menyimpan Dataset[VideoStats] . Untuk menambahkan konstanta yang digunakan untuk rumus, Anda harus mengatur Inputs di dalam Pipeline . Sebelum menambahkan tahapan dalam Pipeline , gunakan setInput[T](<value>, <id>) untuk mengatur konstanta. Input ini dapat diambil kapan saja di Factories mana pun yang pernah ditambahkan ke Pipeline .
Tidak diperlukan entitas di sini. Kami hanya akan mengurutkan data sebelumnya dan menjatuhkan kolom yang digunakan untuk menghitung skor sehingga kami masih dapat menggunakan entitas VideoStats .
Di pabrik ini, yang perlu Anda lakukan adalah membaca input, meneruskannya ke Transformer yang akan melakukan pemrosesan data, dan menulis output. Seharusnya sangat sederhana; Anda dapat mencoba meniru Factories lain.
Deliverable : Connector , SparkRepository dan/atau Input .Mari kita normalkan jumlah suka/tidak suka atas jumlah tampilan. Untuk setiap catatan, bagi jumlah suka dengan jumlah tampilan, dan kemudian jumlah ketidaksukaan dengan jumlah tampilan. Setelah itu, dapatkan persentase suka "dinormalisasi".
Sekarang mari kita normalisasi jumlah komentar. Untuk setiap catatan, bagi jumlah komentar dengan jumlah tampilan.
Kami sekarang dapat menghitung skor popularitas. Ingatkan bahwa rumusnya adalah: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
Namun, ada video di mana komentar dinonaktifkan. Dalam hal ini, formula menjadi: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . Kami secara sewenang -wenang memutuskan bobotnya:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when dan otherwise berfungsi dari org.apache.spark.sql.functions . Urutkan berdasarkan score dalam urutan menurun, dan ambil 100 catatan pertama. Anda sekarang memiliki 100 video paling "populer" dari 10 wilayah.
Untuk memeriksa hasil pekerjaan Anda, buka App.scala , atur Inputs jika belum diatur, atur output SparkRepository , tambahkan panggung, dan jalankan kode. Ini akan membuat file output di jalur yang sesuai.
Deliveries : Input , Connector , dan SparkRepository , dengan deliveryId .Stage , termasuk Factory dan Transformer(s) .Jika Anda menyukai proyek ini, silakan periksa Setl Framework di sini: https://github.com/setl-developers/setl, dan mengapa tidak membawa kontribusi Anda!