YouTube Setl เป็นโครงการที่มีจุดมุ่งหมายในการจัดหาจุดเริ่มต้นในการฝึก SETL Framework: https://github.com/setl-developers/setl แนวคิดคือการให้โครงการบริบทที่เกี่ยวข้องกับสารสกัดแปลงและโหลด มีความยากลำบากสามระดับสำหรับการออกกำลังกาย: โหมดง่ายโหมดปกติและโหมดฮาร์ด
ข้อมูลที่ใช้มาจาก Kaggle, https://www.kaggle.com/datasnaek/youtube-new
ฉันใช้ Jetbrains Intellij Idea Community Edition สำหรับโครงการนี้กับ Scala และ Apache Spark
ข้อมูลถูกแบ่งออกเป็นหลายภูมิภาค: แคนาดา (CA), เยอรมนี (DE), ฝรั่งเศส (FR), บริเตนใหญ่ (GB), อินเดีย (IN), ญี่ปุ่น (JP), เกาหลีใต้ (KR), เม็กซิโก (MX), รัสเซีย (RU) และสหรัฐอเมริกา (สหรัฐอเมริกา) สำหรับแต่ละภูมิภาคเหล่านี้มีสองไฟล์:

ทุกวัน YouTube มีวิดีโอที่ได้รับความนิยมมากที่สุดประมาณ 200 รายการในแต่ละประเทศ YouTube วัดจำนวนวิดีโอที่ทันสมัยขึ้นอยู่กับการรวมกันของปัจจัยที่ไม่ได้เปิดเผยต่อสาธารณะอย่างเต็มที่ ชุดข้อมูลนี้ประกอบด้วยคอลเลกชันวิดีโอที่ได้รับความนิยมสูงสุดของทุกวัน ด้วยเหตุนี้จึงเป็นไปได้ที่วิดีโอเดียวกันจะปรากฏหลายครั้งซึ่งหมายความว่ามีแนวโน้มเป็นเวลาหลายวัน
โดยทั่วไปองค์ประกอบของฟิลด์ รายการ ช่วยให้เราสามารถแมป category_id ของไฟล์ CSV กับหมวดหมู่ชื่อเต็ม
เราจะวิเคราะห์ชุดข้อมูลนี้และกำหนดวิดีโอ "ยอดนิยม" แต่เราจะกำหนดวิดีโอยอดนิยมได้อย่างไร? เราจะกำหนดความนิยมของวิดีโอตามจำนวนการดูไลค์ไม่ชอบจำนวนความคิดเห็นและจำนวนวันที่มีแนวโน้ม
คำจำกัดความนี้เป็นที่ถกเถียงกันอย่างชัดเจนและเป็นไปตามอำเภอใจและเราไม่ได้มองหาคำจำกัดความที่ดีที่สุดสำหรับความนิยมของวิดีโอ เราจะมุ่งเน้นไปที่วัตถุประสงค์ของโครงการนี้เท่านั้น: ฝึกฝนกับเฟรมเวิร์ก SETL
เป้าหมายของโครงการนี้คือการค้นหาวิดีโอ "ยอดนิยม" 100 รายการและหมวดหมู่วิดีโอยอดนิยม "มากที่สุด แต่เราจะกำหนดความนิยมของวิดีโอได้อย่างไร? สูตรกำลังจะเป็น:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight
เปอร์เซ็นต์ที่ชอบคืออัตราส่วนของการชอบที่ไม่ชอบ อัตราส่วนนี้ถูกทำให้เป็นมาตรฐานมากกว่าจำนวนมุมมอง การทำให้เป็นมาตรฐานเดียวกันนั้นทำด้วยจำนวนความคิดเห็น
ด้านล่างนี้เป็นคำแนะนำสำหรับแต่ละระดับความยากในการตระหนักถึงโครงการ สำหรับแต่ละระดับความยากคุณสามารถโคลน repo กับสาขาเฉพาะเพื่อมีโครงการเริ่มต้น
สำหรับโครงการนี้เราคิดว่าคุณมีความรู้พื้นฐานเกี่ยวกับ Scala และ Apache Spark แล้ว
entity ที่มีคลาสเคสหรือวัตถุ; factory ที่มีหม้อแปลง; และ transformer ที่มีการแปลงข้อมูลFactory Setl หรือ Transformer คุณสามารถใช้ Ctrl+i เพื่อสร้างฟังก์ชั่นที่จำเป็นโดยอัตโนมัติ สิ่งแรกที่เราจะทำคืออ่านอินพุต: ไฟล์ CSV ที่ฉันจะเรียกไฟล์วิดีโอและไฟล์ JSON ไฟล์หมวดหมู่
เริ่มต้นด้วยไฟล์หมวดหมู่กันเถอะ ไฟล์หมวดหมู่ทั้งหมดเป็นไฟล์ JSON สร้างคลาสเคสที่แสดงถึง หมวดหมู่ จากนั้น Factory ที่มี Transformer ที่จะประมวลผลไฟล์หมวดหมู่ลงในคลาสเคส
local.conf วัตถุได้ถูกสร้างขึ้นแล้วเพื่ออ่านไฟล์หมวดหมู่org.apache.spark.sql.functionscoalesce เมื่อบันทึกไฟล์ ตอนนี้เราสามารถทำงานกับไฟล์วิดีโอได้ ในทำนองเดียวกันสร้างคลาสเคสที่แสดง วิดีโอ สำหรับการอ่านอินพุตจากนั้น Factory ที่มี Transformers หนึ่งหรือหลายตัวที่จะทำการประมวลผล เนื่องจากไฟล์วิดีโอถูกแยกออกจากภูมิภาคจึงไม่มีข้อมูลภูมิภาคสำหรับแต่ละระเบียนในชุดข้อมูล ลองเพิ่มข้อมูลนี้โดยใช้ VideoCountry เคสเคสเคสอื่น ๆ ซึ่งคล้ายกับ วิดีโอ มากและผสานบันทึกทั้งหมดในชุดข้อมูล/ชุดข้อมูลเดียว
Transformers จะมีประโยชน์: หนึ่งสำหรับการเพิ่มคอลัมน์ country และอีกหนึ่งสำหรับการรวมวิดีโอทั้งหมดเข้ากับชุดข้อมูลเดียวเนื่องจากวิดีโอสามารถเป็นหนึ่งในหนึ่งในหนึ่งวันและในวันถัดไปจึงเป็นไปได้ที่วิดีโอจะมีหลายแถวที่แต่ละคนมีตัวเลขที่แตกต่างกันในแง่ของมุมมองการชอบไม่ชอบความคิดเห็น ... เป็นผลให้เราต้องดึงสถิติล่าสุดที่มีให้สำหรับวิดีโอเดียวสำหรับแต่ละภูมิภาคเพราะสถิติเหล่านี้เพิ่มขึ้น ในเวลาเดียวกันเราจะคำนวณจำนวนวันที่ได้รับความนิยมสำหรับวิดีโอทุกรายการ
สร้าง videostats คลาสเคสซึ่งคล้ายกับคลาสกรณีก่อนหน้า แต่ด้วยข้อมูลวันที่ได้รับความนิยม
ขั้นแรกให้คำนวณจำนวนวันที่ได้รับความนิยมของแต่ละวิดีโอ
window จาก org.apache.spark.sql.functionsในการดึงสถิติล่าสุดคุณต้องดึงวันที่ได้รับความนิยมล่าสุดของวิดีโอแต่ละรายการ ในความเป็นจริงแล้วสถิติล่าสุดที่มีอยู่
window อื่น คนแรกคือการคำนวณจำนวนวันที่ได้รับความนิยมและครั้งที่สองเพื่อดึงสถิติล่าสุดrankเรียงลำดับผลลัพธ์ตามภูมิภาคจำนวนวันที่ได้รับความนิยมมุมมองไลค์และความคิดเห็น มันจะเตรียมข้อมูลสำหรับความสำเร็จครั้งต่อไป
ตอนนี้เราจะคำนวณคะแนนความนิยมของวิดีโอแต่ละรายการหลังจากได้รับสถิติล่าสุด ดังที่ได้กล่าวไว้ก่อนหน้านี้สูตรของเรานั้นง่ายมากและอาจไม่เป็นตัวแทนของความเป็นจริง
ลองทำให้จำนวนไลค์/ไม่ชอบเป็นปกติมากกว่าจำนวนการดู สำหรับแต่ละระเบียนให้แบ่งจำนวนไลค์ตามจำนวนมุมมองจากนั้นจำนวนไม่ชอบตามจำนวนมุมมอง หลังจากนั้นรับเปอร์เซ็นต์ของการชอบ "ปกติ"
ตอนนี้ขอให้จำนวนความคิดเห็นปกติ สำหรับแต่ละระเบียนแบ่งจำนวนความคิดเห็นด้วยจำนวนมุมมอง
ตอนนี้เราสามารถคำนวณคะแนนความนิยม เตือนว่าสูตรคือ: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight
อย่างไรก็ตามมีวิดีโอที่ความคิดเห็นถูกปิดใช้งาน ในกรณีนี้สูตรจะกลายเป็น: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) เราตัดสินใจว่าน้ำหนักเป็น:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 ตั้งค่าเป็น Input เพื่อให้สามารถแก้ไขได้ง่าย
when และ otherwise ฟังก์ชั่นจาก org.apache.spark.sql.functions เรียงลำดับตาม score ตามลำดับจากมากไปน้อยและใช้ 100 ระเบียนแรก ตอนนี้คุณมีวิดีโอ "ยอดนิยม" 100 รายการจาก 10 ภูมิภาค
สิ่งแรกที่เราจะทำคือการอ่านอินพุต: ไฟล์ CSV ที่ฉันจะเรียกไฟล์วิดีโอและไฟล์ JSON ไฟล์หมวดหมู่
เริ่มต้นด้วยไฟล์หมวดหมู่กันเถอะ ไฟล์หมวดหมู่ทั้งหมดเป็นไฟล์ JSON นี่คือเวิร์กโฟลว์: เราจะกำหนดไฟล์การกำหนดค่าที่จะระบุไฟล์หมวดหมู่ที่จะอ่าน; สร้างคลาสเคสที่แสดงถึงหมวดหมู่ จากนั้น Factory ที่มี Transformer ที่จะประมวลผลไฟล์หมวดหมู่ลงในคลาสเคส ในที่สุดเราจะเพิ่ม Stage ลงใน Pipeline เพื่อกระตุ้นการเปลี่ยนแปลง
วัตถุการกำหนดค่าได้ถูกสร้างขึ้นแล้วใน resources/local.conf ให้ความสนใจกับตัวเลือก storage และ path ย้ายไฟล์หมวดหมู่ตามนั้น หากหลายไฟล์อยู่ในโฟลเดอร์เดียวกันและใช้โฟลเดอร์เป็นพา ธ SetL จะพิจารณาไฟล์เป็นพาร์ติชันของไฟล์เดียว ถัดไปตรวจสอบ App.scala คุณจะเห็นว่าเราใช้วิธีการ setConnector() และ setSparkRepository() ทุกครั้งที่คุณต้องการใช้ที่เก็บข้อมูลคุณจะต้องเพิ่มการกำหนดค่าในการกำหนดค่าและลงทะเบียนในวัตถุ setl
สร้างคลาสเคสชื่อ Category ในโฟลเดอร์ entity ตอนนี้ตรวจสอบในไฟล์หมวดหมู่ฟิลด์ที่เราต้องการ
เราจะต้องใช้ id และ title ของหมวดหมู่ ตรวจสอบให้แน่ใจว่าได้ตรวจสอบไฟล์และใช้การสะกดคำเดียวกันเพื่อสร้างคลาสเคส Category
โครงกระดูกของ Factory ได้รับการจัดเตรียมแล้ว ตรวจสอบให้แน่ใจว่าคุณเข้าใจโครงสร้างเชิงตรรกะ
Delivery ในรูปแบบของ Connector ช่วยให้เราสามารถดึงอินพุตได้ Delivery อื่นจะทำหน้าที่เป็น SparkRepository ที่เราจะเขียนผลลัพธ์ของการเปลี่ยนแปลง ตรวจสอบ id ของ Delivery แต่ละครั้งและ deliveryId ใน App.scala พวกเขาจะใช้ดังนั้นจึงไม่มีความคลุมเครือเมื่อ Setl ดึงข้อมูลที่เก็บ เพื่อให้สามารถอ่านการส่งมอบก่อนหน้านี้สองครั้งเราจะใช้ตัวแปรอื่นอีกสองตัว: DataFrame สำหรับการอ่าน Connector และ Dataset สำหรับการจัดเก็บเอาท์พุท SparkRepository ความแตกต่างระหว่างพวกเขาคือการพิมพ์ SparkRepository ดังนั้น DatasetFactory Setl :read : แนวคิดคือการใช้ Connector หรืออินพุต SparkRepository Delivery ให้ประมวลผลล่วงหน้าหากจำเป็นและจัดเก็บไว้ในตัวแปรเพื่อใช้ในฟังก์ชั่นถัดไปprocess : นี่คือที่ที่การแปลงข้อมูลทั้งหมดจะทำ สร้างอินสแตนซ์ของ Transformer ที่คุณใช้เรียกใช้วิธี transform() ใช้ getter transformed และเก็บผลลัพธ์ไว้ในตัวแปรwrite : ตามชื่อของมันมันถูกใช้เพื่อบันทึกเอาต์พุตของการแปลงหลังจากที่พวกเขาได้ทำไปแล้ว Connector ใช้วิธี write() เพื่อบันทึก DataFrame และ SparkRepository ใช้วิธี save() เพื่อบันทึก Datasetget : ฟังก์ชั่นนี้ใช้เพื่อส่งเอาต์พุตไปยัง Stage ต่อไปของ Pipeline เพียงส่งคืน Datasetprocess อาจมี Transformer หลายตัว เราจะพยายามติดตามโครงสร้างนี้ตลอดส่วนที่เหลือของโครงการFactory จะถูกถ่ายโอนไปยัง Stage ต่อไปโดยอัตโนมัติผ่านฟังก์ชั่น get อย่างไรก็ตามการเขียนผลลัพธ์ของ Factory ทุกแห่งจะง่ายขึ้นสำหรับการสร้างภาพและการดีบัก อีกครั้งโครงกระดูกของ Transformer ได้รับการจัดเตรียมแล้ว อย่างไรก็ตามคุณจะเป็นคนที่จะเขียนการแปลงข้อมูล
Transformer ของเรามีข้อโต้แย้ง โดยปกติแล้วมันคือ DataFrame หรือ Dataset ที่เราต้องการประมวลผล ขึ้นอยู่กับแอปพลิเคชันของคุณคุณอาจเพิ่มอาร์กิวเมนต์อื่น ๆtransformedData เป็นตัวแปรที่จะเก็บผลลัพธ์ของการแปลงข้อมูลtransformed เป็น Getter ที่จะถูกเรียกโดย Factory เพื่อดึงผลลัพธ์ของการแปลงข้อมูลtransform() เป็นวิธีการที่จะทำการแปลงข้อมูลitems หากคุณตรวจสอบไฟล์หมวดหมู่ข้อมูลที่เราต้องการอยู่ในฟิลด์นี้items เป็นอาร์เรย์ เราต้องการระเบิดอาร์เรย์นี้และใช้เฉพาะฟิลด์ id และฟิลด์ title จากฟิลด์ snippet ในการทำเช่นนั้นให้ใช้ฟังก์ชั่น explode จาก org.apache.spark.sql.functions จากนั้นเพื่อให้ได้ฟิลด์เฉพาะให้ใช้วิธี withColumn และวิธี getField() บน id, snippet และ title อย่าลืมที่จะโยนประเภทตามคลาสเคสที่คุณสร้างขึ้นid และคอลัมน์ title จากนั้นโยน dataframe ลงในชุดข้อมูลด้วย as[T]Transformer เสร็จแล้ว หากต้องการดูว่ามันทำอะไรคุณสามารถเรียกใช้ไฟล์ App.scala ที่สร้างขึ้นแล้ว มันเพียงแค่เรียกใช้ Factory ที่มี Transformer ที่คุณเพิ่งเขียนและมันจะส่งออกผลลัพธ์ไปยังเส้นทางของไฟล์การกำหนดค่า โปรดทราบว่ามีการเพิ่ม Factory ที่เกี่ยวข้องผ่าน addStage() ที่ทำให้ Pipeline ส่งข้อมูลConnector โดยใช้คำอธิบายประกอบ @Delivery พร้อม deliveryIdTransformer ในวิธี process ของ Factorywrite ของ Factory ตอนนี้มาประมวลผลไฟล์วิดีโอกันเถอะ เราต้องการรวมไฟล์ทั้งหมดใน DataFrame / Dataset เดียวหรือในไฟล์ CSV เดียวกันในขณะที่เก็บข้อมูลของภูมิภาคสำหรับแต่ละวิดีโอ ไฟล์วิดีโอทั้งหมดเป็นไฟล์ CSV และมีคอลัมน์เดียวกันตามที่ระบุไว้ก่อนหน้านี้ในส่วน บริบท เวิร์กโฟลว์คล้ายกับอันสุดท้าย: การกำหนดค่า; คลาสกรณี; Factory ; Transformer ; เพิ่ม Stage ลงใน Pipeline เวลานี้เราจะตั้งค่าวัตถุการกำหนดค่าหลายรายการ
เราจะตั้งค่าวัตถุการกำหนดค่าหลายรายการใน resources/local.conf หนึ่งต่อภูมิภาค ในแต่ละวัตถุการกำหนดค่าคุณจะต้องตั้งค่า storage, path, inferSchema, delimiter, header, multiLine และ dateFormat
videos<region>RepositoryFactory สร้างคลาสเคสชื่อ Video ในโฟลเดอร์ entity ตอนนี้ตรวจสอบในไฟล์วิดีโอฟิลด์ที่เราต้องการ เตือนว่าวัตถุประสงค์คือการคำนวณคะแนนความนิยมและสูตรคือ number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight มันจะช่วยในการเลือกฟิลด์
สร้างคลาสเคสอื่นที่ชื่อว่า VideoCountry มันจะมีฟิลด์เดียวกันกับ Video แต่กับเขตประเทศ/ภูมิภาคนอกจากนี้
@ColumnName ของเฟรมเวิร์ก ลองใช้มันเพราะมันจะเป็นประโยชน์ในสถานการณ์ธุรกิจในชีวิตจริงjava.sql.Date สำหรับฟิลด์ประเภทวันที่ เราต้องการมี videoId , title , channel_title , category_id , trending_date , views , likes , dislikes , comment_count , comments_disabled และ video_error_or_removed ฟิลด์
เป้าหมายของโรงงานนี้คือการรวมไฟล์วิดีโอทั้งหมดเข้าด้วยกันเป็นไฟล์เดียวโดยไม่ต้องลบข้อมูลภูมิภาค นั่นหมายความว่าเราจะใช้ Transformer สองชนิด
Delivery ทั้งหมดในรูปแบบของ SparkRepository[Video] ตั้งค่า Delivery ครั้งสุดท้ายเป็น SparkRepository[VideoCountry] ซึ่งเราจะเขียนผลลัพธ์ของการแปลง ตั้ง Dataset[Video] เป็นจำนวนอินพุตFactory :read : ประมวลผล preprocess SparkRepository โดยการกรองวิดีโอที่ ถูกลบ หรือ ข้อผิดพลาด จากนั้น "หล่อ" พวกเขาเป็น Dataset[Video] และเก็บไว้ในตัวแปรที่เกี่ยวข้องprocess : ใช้ Transformer แรกสำหรับแต่ละอินพุตและใช้ผลลัพธ์กับ Transformer ที่สองwrite : เขียนเอาท์พุท SparkRepository[VideoCountry]get : เพียงส่งคืนผลลัพธ์ของ Transformer สุดท้ายConnector เพื่ออ่านไฟล์อินพุตและ SparkRepository สำหรับเอาต์พุต?SparkRepository เพื่ออ่านอินพุตเพียงเพื่อให้โครงสร้างสำหรับไฟล์อินพุตSparkRepository มากมายและตัวแปรที่สอดคล้องกันจำนวนมากและฉันไม่พบสิ่งนี้/เป็นที่ถกเถียงกัน ไม่มีทางออกอื่นหรือ?Delivery ในรูปแบบของ SparkRepository คุณสามารถใช้การส่งมอบในรูปแบบของ Dataset ที่มีตัวเลือก autoLoad = true ดังนั้นแทนที่จะมี: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
เป้าหมายหลักของ Transformer แรกคือการเพิ่มข้อมูลภูมิภาค/ประเทศ สร้าง Transformer ที่ใช้สองอินพุต Dataset[Video] และสตริง เพิ่ม country คอลัมน์และส่งคืน Dataset[VideoCountry] นอกจากนี้คุณยังสามารถกรองวิดีโอที่มีป้ายกำกับว่า ถูกลบหรือผิดพลาด แน่นอนว่าขั้นตอนสุดท้ายนี้สามารถวางไว้ที่อื่นได้
เป้าหมายหลักของ Transformer ที่สองคือการจัดกลุ่มวิดีโอทั้งหมดเข้าด้วยกันใหม่ในขณะที่เก็บข้อมูลภูมิภาค
reduce และ union หากต้องการตรวจสอบผลลัพธ์ของงานของคุณให้ไปที่ App.scala ตั้งค่า SparkRepositories เพิ่ม VideoFactory เวทีและเรียกใช้รหัส มันจะสร้างไฟล์เอาต์พุตในเส้นทางที่สอดคล้องกัน
Connector และ SparkRepositoryDeliveries หลายรายการลงใน Transformer หรือ ConnectorTransformers หลายตัวใน Factoryเนื่องจากวิดีโอสามารถเป็นหนึ่งในหนึ่งในหนึ่งวันและในวันถัดไปมันจะมีตัวเลขที่แตกต่างกันในแง่ของการดูการชอบไม่ชอบความคิดเห็น ... เป็นผลให้เราต้องดึงสถิติล่าสุดที่มีให้สำหรับวิดีโอเดียวสำหรับแต่ละภูมิภาค ในเวลาเดียวกันเราจะคำนวณจำนวนวันที่ได้รับความนิยมสำหรับวิดีโอทุกรายการ
แต่เราจะทำอย่างไร? ก่อนอื่นเราจะจัดกลุ่มระเบียนที่สอดคล้องกับวิดีโอเดียวกันและนับจำนวนระเบียนซึ่งโดยทั่วไปเป็นจำนวนวันที่ได้รับความนิยม จากนั้นเราจะจัดอันดับระเบียนที่จัดกลุ่มเหล่านี้และนำไฟล์ล่าสุดเพื่อดึงสถิติล่าสุด
ไฟล์การกำหนดค่าสำหรับเอาต์พุตของ VideoFactory ถูกตั้งค่าไว้แล้วในความสำเร็จก่อนหน้านี้เพื่อให้สามารถบันทึกได้ คุณจะต้องอ่านและประมวลผลเพื่อรับสถิติวิดีโอล่าสุด อย่าลืมเพิ่มไฟล์การกำหนดค่าสำหรับผลลัพธ์ของ Factory ใหม่นี้
สร้างคลาสเคสชื่อ VideoStats ซึ่งมีฟิลด์ที่คล้ายกันกับ VideoCountry แต่คุณต้องคำนึงถึงจำนวนวันที่ได้รับความนิยม
ในโรงงานนี้สิ่งที่คุณต้องทำคืออ่านอินพุตส่งผ่านไปยัง Transformer ที่จะทำการประมวลผลข้อมูลและเขียนเอาต์พุต มันควรจะค่อนข้างง่าย คุณสามารถลองเลียนแบบ Factories อื่น ๆ
Deliveries อย่างที่กล่าวไว้ก่อนหน้านี้เราจะจัดกลุ่มวิดีโอด้วยกัน สำหรับสิ่งนั้นเราจะใช้ org.apache.spark.sql.expressions.Window ตรวจสอบให้แน่ใจว่าคุณรู้ว่า Window ทำอะไรล่วงหน้า
Window แรกที่คุณจะพาร์ติชันโดยการนับจำนวนวันที่ได้รับความนิยมสำหรับแต่ละวิดีโอ หากต้องการทราบว่าคุณกำลังจะพาร์ติชันฟิลด์ใดให้ดูว่าฟิลด์ใดจะเหมือนกันสำหรับวิดีโอเดียวWindow ที่สองที่จะใช้สำหรับการจัดอันดับวิดีโอตามวันที่ได้รับความนิยม โดยการเลือกวันที่ล่าสุดเราสามารถดึงสถิติล่าสุดของแต่ละวิดีโอได้Windows นี้คุณสามารถเพิ่มคอลัมน์ใหม่ trendingDays สำหรับจำนวนวันที่ได้รับความนิยมและ rank สำหรับการจัดอันดับของวันที่แนวโน้มโดยการสั่งซื้อลดลงrank ของพวกเขาโดยใช้เฉพาะบันทึกที่ rank 1DataFrame ไปยัง Dataset[VideoStats]partitionBy และวิธี orderBy สำหรับ Window และ count วิธี rank จาก org.apache.spark.sql.functions เมื่อทำงานกับ Dataset หากต้องการตรวจสอบผลลัพธ์ของงานของคุณให้ไปที่ App.scala ตั้งค่า SparkRepositories เพิ่มเวทีและเรียกใช้รหัส มันจะสร้างไฟล์เอาต์พุตในเส้นทางที่สอดคล้องกัน
PipelineConnector และ SparkRepository คืออะไรและวิธีการตั้งค่า Deliveries ของพวกเขา ตอนนี้เราจะคำนวณคะแนนความนิยมของวิดีโอแต่ละรายการหลังจากได้รับสถิติล่าสุด ดังที่ได้กล่าวไว้ก่อนหน้านี้สูตรของเรานั้นง่ายมากและอาจไม่เป็นตัวแทนของความเป็นจริง มาเตือนว่าสูตรนี้เป็น views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight การใช้ผลลัพธ์ก่อนหน้าของ VideoStats เราจะใช้สูตรและเรียงลำดับข้อมูลด้วยคะแนนสูงสุดให้ต่ำที่สุด
นี่คือการแปลงข้อมูลครั้งสุดท้าย ตั้งค่าการกำหนดค่าเพื่อให้คุณสามารถบันทึก Dataset[VideoStats] ในการเพิ่มค่าคงที่ที่ใช้สำหรับสูตรคุณจะต้องตั้ง Inputs ใน Pipeline ก่อนที่จะเพิ่มขั้นตอนใน Pipeline ให้ใช้ setInput[T](<value>, <id>) เพื่อตั้งค่าค่าคงที่ อินพุตเหล่านี้สามารถเรียกคืนได้ตลอดเวลาใน Factories ใด ๆ เมื่อเพิ่มลงใน Pipeline
ไม่จำเป็นต้องมีเอนทิตีที่นี่ เราจะเรียงลำดับข้อมูลก่อนหน้าและวางคอลัมน์ที่ใช้สำหรับการคำนวณคะแนนเพื่อให้เรายังสามารถใช้เอนทิตี VideoStats ได้
ในโรงงานนี้สิ่งที่คุณต้องทำคืออ่านอินพุตส่งผ่านไปยัง Transformer ที่จะทำการประมวลผลข้อมูลและเขียนเอาต์พุต มันควรจะค่อนข้างง่าย คุณสามารถลองเลียนแบบ Factories อื่น ๆ
Deliverable : Connector , SparkRepository และ/หรือ Inputลองทำให้จำนวนไลค์/ไม่ชอบเป็นปกติมากกว่าจำนวนการดู สำหรับแต่ละระเบียนให้แบ่งจำนวนไลค์ตามจำนวนมุมมองจากนั้นจำนวนไม่ชอบตามจำนวนมุมมอง หลังจากนั้นรับเปอร์เซ็นต์ของการชอบ "ปกติ"
ตอนนี้ขอให้จำนวนความคิดเห็นปกติ สำหรับแต่ละระเบียนแบ่งจำนวนความคิดเห็นด้วยจำนวนมุมมอง
ตอนนี้เราสามารถคำนวณคะแนนความนิยม เตือนว่าสูตรคือ: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight
อย่างไรก็ตามมีวิดีโอที่ความคิดเห็นถูกปิดใช้งาน ในกรณีนี้สูตรจะกลายเป็น: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) เราตัดสินใจว่าน้ำหนักเป็น:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when และ otherwise ฟังก์ชั่นจาก org.apache.spark.sql.functions เรียงลำดับตาม score ตามลำดับจากมากไปน้อยและใช้ 100 ระเบียนแรก ตอนนี้คุณมีวิดีโอ "ยอดนิยม" 100 รายการจาก 10 ภูมิภาค
หากต้องการตรวจสอบผลลัพธ์ของงานของคุณให้ไปที่ App.scala ตั้ง Inputs หากยังไม่ได้ตั้งค่าให้ตั้งค่า SparkRepository เอาต์พุตเพิ่มเวทีและเรียกใช้รหัส มันจะสร้างไฟล์เอาต์พุตในเส้นทางที่สอดคล้องกัน
Deliveries สามประเภท: Input , Connector และ SparkRepository พร้อม deliveryIdStage รวมถึง Factory และ Transformer(s)หากคุณชอบโครงการนี้โปรดตรวจสอบ Setl Framework ที่นี่: https://github.com/setl-developers/setl และทำไมไม่นำการบริจาคของคุณ!