YouTube Setl هو مشروع يهدف إلى توفير نقطة انطلاق لممارسة إطار SETL: https://github.com/setl-developers/setl. تتمثل الفكرة في إعطاء مشروع سياق يتضمن عمليات الاستخراج والتحويل والتحميل. هناك ثلاثة مستويات من الصعوبة في التمرين: الوضع السهل والوضع العادي والوضع الصلب.
البيانات المستخدمة هي من Kaggle ، https://www.kaggle.com/datasnaek/youtube-new.
لقد استخدمت JetBrains Intellij Idea Community Edition لهذا المشروع ، مع Scala و Apache Spark.
تنقسم البيانات إلى مناطق المضاعفات: كندا (كاليفورنيا) وألمانيا (DE) وفرنسا (FR) وبريطانيا العظمى (GB) والهند (في) واليابان (JP) وكوريا الجنوبية (KR) والمكسيك (MX) وروسيا (RU) والولايات المتحدة (الولايات المتحدة). لكل من هذه المناطق ، هناك ملفان:

كل يوم ، يوفر YouTube حوالي 200 من أكثر مقاطع الفيديو التي تتجه في كل بلد. يقيس YouTube مقدار الفيديو العصري استنادًا إلى مجموعة من العوامل التي لم يتم نشرها بشكل كامل. تتكون مجموعة البيانات هذه في مجموعة من أهم مقاطع الفيديو اليومية. نتيجة لذلك ، من الممكن أن يظهر الفيديو نفسه عدة مرات ، مما يعني أنه يتجه لعدة أيام.
في الأساس ، تسمح لنا عناصر حقول العناصر برسم خريطة category_id من ملف CSV إلى فئة الاسم الكامل.
سنقوم بتحليل مجموعة البيانات هذه وتحديد مقاطع الفيديو "الشعبية". ولكن ، كيف نحدد فيديو شهير؟ سنحدد شعبية مقطع فيديو استنادًا إلى عدد طرق العرض ، والإعجابات ، والكره ، وعدد التعليقات ، وعدد أيام الاتجاه.
من الواضح أن هذا التعريف قابل للنقاش والتعسفي ، ونحن لا نتطلع إلى معرفة أفضل تعريف لشعبية الفيديو. سوف نركز فقط على الغرض من هذا المشروع: التدريب مع إطار SETL.
الهدف من هذا المشروع هو العثور على 100 مقاطع فيديو "شائعة" ، وفئات الفيديو الأكثر شعبية. ولكن كيف حددنا شعبية الفيديو؟ الصيغة ستكون:
number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight .
النسبة المئوية الإعجابات هي نسبة الإعجابات المفردات. يتم تطبيع هذه النسبة على عدد المشاهدات. يتم التطبيع نفسه مع عدد التعليقات.
فيما يلي تعليمات لكل مستوى صعوبة لتحقيق المشروع. لكل مستوى صعوبة ، يمكنك استنساخ الريبو مع الفرع المحدد للحصول على مشروع بدء.
لهذا المشروع ، نفترض أن لديك بالفعل معرفة أساسية من Scala و Apache Spark.
entity الذي يحتوي على فئات الحالة أو الكائنات ؛ factory الذي يحتوي على محولات ؛ transformer الذي يحتوي على تحويلات البيانات.Factory Setl أو Transformer ، يمكنك استخدام Ctrl+i لإنشاء الوظائف المطلوبة تلقائيًا. أول شيء سنفعله هو ، بالطبع ، قراءة المدخلات: ملفات CSV ، التي سأتصل بها ملفات مقاطع الفيديو ، وملفات JSON ، ملفات الفئات.
لنبدأ بملفات الفئات. جميع ملفات الفئات هي ملفات JSON . قم بإنشاء فئة حالة تمثل فئة ، ثم Factory مع Transformer يعالج ملفات الفئات في فئة العلبة.
local.conf . تم إنشاء كائن بالفعل من أجل قراءة ملفات الفئات.org.apache.spark.sql.functions .coalesce عند حفظ ملف. يمكننا الآن العمل مع ملفات مقاطع الفيديو. وبالمثل ، قم بإنشاء فئة حالة تمثل مقطع فيديو لقراءة المدخلات ، ثم Factory مع واحد أو عدة Transformers ستقوم بالمعالجة. نظرًا لأن ملفات مقاطع الفيديو يتم فصلها عن المناطق ، فلا توجد معلومات المنطقة لكل سجل في مجموعة البيانات. حاول إضافة هذه المعلومات عن طريق استخدام Bideocountry في فئة حالة أخرى تشبه إلى حد كبير الفيديو ، ودمج جميع السجلات في مجموعة بيانات/بيانات واحدة.
Transformers مفيدًا: أحدهما لإضافة عمود country ، وواحد لدمج جميع مقاطع الفيديو في مجموعة بيانات واحدة.نظرًا لأن مقطع الفيديو يمكن أن يكون مقطعًا كبيرًا لليوم وفي اليوم التالي ، فمن الممكن أن يكون مقطع الفيديو له صفوف متعددة ، حيث يكون لكل منها أرقام مختلفة من حيث المشاهدات ، الإعجابات ، لا يكره ، تعليقات ... نتيجة لذلك ، يتعين علينا استرداد أحدث الإحصائيات المتاحة لمقطع فيديو واحد ، لكل منطقة ، لأن هذه الإحصائيات تزداد. في الوقت نفسه ، سنقوم بحساب عدد أيام الاتجاه لكل مقطع فيديو.
قم بإنشاء مقاطع فيديو فئة Case ، والتي تشبه إلى حد كبير فئات الحالات السابقة ، ولكن مع معلومات Days Training.
أولاً ، قم بحساب عدد أيام التوجه لكل مقطع فيديو.
window من org.apache.spark.sql.functions .لاسترداد أحدث الإحصائيات ، يجب عليك استرداد آخر يوم توجه لكل مقطع فيديو. إنها في الواقع أحدث الإحصاءات المتاحة.
window أخرى. كان الأول هو حساب عدد أيام الاتجاه ، والثاني لاسترداد أحدث الإحصاءات.rank .فرز النتائج حسب المنطقة ، وعدد أيام الاتجاه ، ووجهات النظر ، والإعجابات ، ثم التعليقات. وسوف يقوم بإعداد البيانات للإنجاز التالي.
سنقوم الآن بحساب درجة شعبية كل مقطع فيديو ، بعد الحصول على أحدث إحصائيات. كما قيل سابقًا ، فإن صيغتنا بسيطة للغاية وقد لا تمثل الواقع.
دعنا نطبيع عدد الإعجابات/التكره على عدد المشاهدات. لكل سجل ، قسّم عدد الإعجابات بعدد المشاهدات ، ثم عدد التهم بعدد المشاهدات. بعد ذلك ، احصل على نسبة الإعجابات "الطبيعية".
دعونا الآن نطبيع عدد التعليقات. لكل سجل ، قسّم عدد التعليقات على عدد المشاهدات.
يمكننا الآن حساب درجة الشعبية. ذكّر أن الصيغة هي: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
ومع ذلك ، هناك مقاطع فيديو حيث يتم تعطيل التعليقات. في هذه الحالة ، تصبح الصيغة: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . قررنا بشكل تعسفي أن تكون الأوزان:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05 قم بإعدادها Input حتى يمكن تعديلها بسهولة.
when تعمل otherwise من org.apache.spark.sql.functions . فرز score بالترتيب التنازلي ، واتخذ 100 سجل أولي. لديك الآن 100 مقاطع فيديو "شعبية" من المناطق العشر.
أول شيء سنفعله هو ، بالطبع ، قراءة المدخلات: ملفات CSV ، التي سأتصل بها ملفات مقاطع الفيديو ، وملفات JSON ، ملفات الفئات.
لنبدأ بملفات الفئات. جميع ملفات الفئات هي ملفات JSON. فيما يلي سير العمل: سنقوم بتحديد ملف التكوين الذي يشير إلى ملفات الفئات التي يجب قراءتها ؛ إنشاء فئة حالة تمثل فئة ؛ ثم Factory مع Transformer يعالج ملفات الفئات في فئة الحالة. أخيرًا ، سنضيف Stage إلى Pipeline لتحفيز التحولات.
تم إنشاء كائن التكوين بالفعل في resources/local.conf . انتبه في خيارات storage path . نقل ملفات الفئات وفقًا لذلك. إذا كانت الملفات المتعددة موجودة في نفس المجلد ويتم استخدام المجلد كمسار ، فسوف تنظر SETL في الملفات كأقسام لملف واحد. بعد ذلك ، تحقق من App.scala . يمكنك أن ترى أننا استخدمنا طرق setConnector() و setSparkRepository() . في كل مرة تريد فيها استخدام مستودع ، ستحتاج إلى إضافة تكوين في التكوين وتسجيله في كائن setl .
إنشاء فئة حالة باسم Category في مجلد entity . الآن فحص ، في ملفات الفئات ، الحقول التي سنحتاجها.
سنحتاج إلى id title الفئة. تأكد من التحقق من الملفات واستخدام نفس الإملاء لإنشاء فئة حالة Category .
وقد تم بالفعل توفير الهيكل العظمي Factory . تأكد من فهم الهيكل المنطقي.
Delivery في شكل Connector استرداد المدخلات. سيكون Delivery الآخر بمثابة SparkRepository ، حيث سنكتب إخراج التحول. تحقق من id كل Delivery deliveryId في App.scala . يتم استخدامها لذلك لا يوجد غموض عندما تجلب Setl المستودعات. لكي نكون قادرين على قراءة التسليمتين السابقتين ، سنستخدم متغيرين آخرين: A DataFrame لقراءة Connector ، ومجموعة Dataset لتخزين SparkRepository الإخراج. الفرق بينهما هو أن يتم كتابة SparkRepository ، وبالتالي Dataset .Factory setl :read : الفكرة هي أخذ مدخلات توصيل Connector أو SparkRepository Delivery ، والمعالجة المسبقة لها إذا لزم الأمر ، وتخزينها في متغيرات لاستخدامها في الوظيفة التالية.process : هنا سيتم إجراء جميع تحويلات البيانات. قم بإنشاء مثيل Transformer الذي تستخدمه ، واتصل طريقة transform() ، واستخدم getter transformed وتخزين النتيجة في متغير.write : كما يوحي اسمها ، يتم استخدامه لحفظ إخراج التحولات بعد الانتهاء منها. يستخدم Connector طريقة write() لحفظ DataFrame ، ويستخدم SparkRepository طريقة save() لحفظ Dataset .get : يتم استخدام هذه الوظيفة لتمرير الإخراج إلى Stage التالية من Pipeline . مجرد إرجاع Dataset .process ، يمكن أن يكون هناك Transformer متعدد. سنحاول اتباع هذا الهيكل طوال فترة المشروع.Factory تلقائيًا إلى Stage التالية من خلال وظيفة get . ومع ذلك ، فإن كتابة ناتج كل Factory ستكون أسهل في التصور والتصحيح. مرة أخرى ، تم بالفعل توفير الهيكل العظمي Transformer . ومع ذلك ، ستكون الشخص الذي سيكتب تحويل البيانات.
Transformer يأخذ حجة. عادةً ما يكون DataFrame أو Dataset التي نريد معالجتها. اعتمادًا على التطبيق الخاص بك ، يمكنك إضافة وسائط أخرى.transformedData هو المتغير الذي سيخزن نتيجة تحويل البيانات.transformed هو getter الذي سيطلق عليه Factory لاسترداد نتيجة تحويل البيانات.transform() هي الطريقة التي ستقوم بتحويلات البيانات.items . إذا قمت بفحص ملفات الفئات ، فإن المعلومات التي نحتاجها في هذا الحقل.items هو صفيف. نريد أن ننفجر هذه المجموعة ونأخذ حقل id وحقل title فقط من حقل snippet . للقيام بذلك ، استخدم وظيفة explode من org.apache.spark.sql.functions . بعد ذلك ، للحصول على حقول محددة ، استخدم طريقة withColumn وطريقة getField() على id, snippet title . لا تنس أن تثير الأنواع وفقًا لفئة الحالة التي قمت بإنشائها.id وأعمدة title . ثم ، قم بإلقاء مجموعة البيانات في مجموعة بيانات مع as[T] .Transformer . لمعرفة ما يفعله ، يمكنك تشغيل ملف App.scala الذي تم إنشاؤه بالفعل. إنه ببساطة يدير Factory الذي يحتوي على Transformer الذي كتبته للتو ، وسيقوم بإخراج النتيجة إلى مسار ملف التكوين. لاحظ أنه تمت إضافة Factory المقابل عبر addStage() الذي يجعل Pipeline يعمل.Connector ، باستخدام التعليق التوضيحي @Delivery ، مع deliveryId .Transformer في طريقة process Factory .write Factory . دعنا الآن معالجة ملفات مقاطع الفيديو. نود دمج جميع الملفات في DataFrame / Dataset واحدة أو في نفس ملف CSV ، مع الاحتفاظ بمعلومات المنطقة لكل مقطع فيديو. جميع ملفات مقاطع الفيديو هي ملفات CSV ولديها نفس الأعمدة ، كما ذكر سابقًا في قسم السياق . سير العمل مشابه للآخر: التكوين ؛ فئة القضية ؛ Factory ؛ Transformer أضف Stage إلى Pipeline . هذه المرة ، سنقوم بتعيين كائنات تكوين متعددة.
سنقوم بتعيين كائنات تكوين متعددة في resources/local.conf ، واحدة لكل منطقة. في كل كائن تكوين ، سيتعين عليك تعيين storage, path, inferSchema, delimiter, header, multiLine و dateFormat .
videos<region>Repository .Factory . قم بإنشاء فئة حالة تسمى Video في مجلد entity . الآن فحص ، في ملفات مقاطع الفيديو ، الحقول التي سنحتاجها. ذكّر أن الهدف هو حساب درجة الشعبية ، وأن الصيغة هي number of views * views weight + number of trending days * trending days weight + normalized likes percentage * likes weight + normalized comments * comments weight . سوف يساعد على اختيار الحقول.
قم بإنشاء فئة أخرى تسمى VideoCountry . سيكون لها نفس الحقول تمامًا مثل Video ، ولكن مع حقل البلد/المنطقة بالإضافة إلى ذلك.
@ColumnName التعليق التوضيحي للإطار. حاول استخدامه لأنه يمكن أن يكون مفيدًا في بعض مواقف الأعمال الواقعية.java.sql.Date لحقل نوع التاريخ. نود أن يكون لدينا videoId ، title ، channel_title ، category_id ، trending_date ، views ، likes ، dislikes ، comment_count ، comments_disabled و video_error_or_removed .
الهدف من هذا المصنع هو دمج جميع ملفات مقاطع الفيديو في واحدة ، دون إزالة معلومات المنطقة. هذا يعني أننا سنستخدم نوعين من Transformer .
Delivery في شكل SparkRepository[Video] . قم بتعيين Delivery أخير باعتباره SparkRepository[VideoCountry] ، حيث سنكتب إخراج التحول. قم بتعيين أكبر عدد من المتغيرات Dataset[Video] مع عدد المدخلات.Factory :read : معالجة SparkRepository عن طريق تصفية مقاطع الفيديو التي تمت إزالتها أو الخطأ . ثم ، "يلقي" لهم Dataset[Video] وقم بتخزينها في المتغيرات المقابلة.process : قم بتطبيق Transformer الأول لكل من المدخلات ، وتطبيق النتائج على Transformer الثاني.write : اكتب الإخراج SparkRepository[VideoCountry] .get : مجرد إعادة نتيجة Transformer النهائي.Connector لقراءة ملفات الإدخال و SparkRepository للإخراج؟SparkRepository لقراءة المدخلات فقط لتوفير بنية لملفات الإدخال.SparkRepository والكثير من المتغيرات المقابلة ، ولا أجد هذا جميلًا/متاحًا. أليس هناك حل آخر؟Delivery في شكل SparkRepository ، يمكنك استخدام عمليات التسليم في شكل Dataset مع خيار autoLoad = true . لذا ، بدلاً من وجود: @Delivery(id = "id")
var videosRegionRepo: SparkRepository[Video] = _
var videosRegion: Dataset[Video]
@Delivery(id = "id", autoLoad = true)
var videosRegion: Dataset[Video]
الهدف الرئيسي Transformer الأول هو إضافة معلومات المنطقة/البلد. قم بإنشاء Transformer يأخذ اثنين من المدخلات ، ومجموعة Dataset[Video] وسلسلة. أضف country العمود وإرجاع Dataset[VideoCountry] . يمكنك أيضًا تصفية مقاطع الفيديو التي يتم تصنيفها على أنها إزالتها أو خطأ . بالطبع ، يمكن وضع هذه الخطوة الأخيرة في مكان آخر.
الهدف الرئيسي من Transformer الثاني هو إعادة تجميع جميع مقاطع الفيديو معًا ، مع الحفاظ على معلومات المنطقة.
reduce union . للتحقق من نتيجة عملك ، انتقل إلى App.scala ، وضبط SparkRepositories ، وإضافة المرحلة VideoFactory ، وقم بتشغيل الرمز. سيقوم بإنشاء ملف الإخراج في المسار المقابل.
Connector و SparkRepository .Deliveries المتعددة في Transformer أو Connector .Transformers متعددة في Factory .نظرًا لأن مقطع الفيديو يمكن أن يكون مقطعًا كبيرًا لليوم وفي اليوم التالي ، سيكون له أرقام مختلفة من حيث المشاهدات ، الإعجابات ، لا يكره ، تعليقات ... نتيجة لذلك ، يتعين علينا استرداد أحدث الإحصائيات المتاحة لمقطع فيديو واحد ، لكل منطقة. في الوقت نفسه ، سنقوم بحساب عدد أيام الاتجاه لكل مقطع فيديو.
لكن كيف سنفعل ذلك؟ بادئ ذي بدء ، سنقوم بتجميع السجلات التي تتوافق مع نفس الفيديو ، وحساب عدد السجلات ، وهو في الأساس عدد أيام الاتجاه. بعد ذلك ، سنقوم بتصنيف هذه السجلات المجمعة ونأخذ آخرها ، لاسترداد أحدث الإحصاءات.
تم تعيين ملف التكوين لإخراج VideoFactory بالفعل في الإنجاز السابق حتى يمكن حفظه. ستحتاج إلى قراءتها ومعالجتها للحصول على أحدث إحصائيات مقاطع الفيديو. لا تنس إضافة ملف تكوين لإخراج هذا Factory الجديد.
قم بإنشاء فئة حالة تسمى VideoStats التي لها حقول مماثلة VideoCountry ، ولكن عليك أن تأخذ في الاعتبار عدد أيام الاتجاه.
في هذا المصنع ، كل ما عليك فعله هو قراءة الإدخال ، ونقلها إلى Transformer الذي سيقوم بمعالجة البيانات ، وكتابة الإخراج. يجب أن تكون بسيطة جدا. يمكنك محاولة تقليد Factories الأخرى.
Deliveries والمخرجات. كما قيل سابقًا ، سنقوم بتجميع مقاطع الفيديو معًا. لذلك ، سوف نستخدم org.apache.spark.sql.expressions.Window . تأكد من أنك تعرف ما الذي تفعله Window مسبقًا.
Window أولى ستقوم بتقسيمها لحساب عدد أيام التوجه لكل مقطع فيديو. لمعرفة الحقول التي ستقسمها ، انظر إلى الحقول التي ستكون هي نفسها بالنسبة لمقطع فيديو واحد.Window ثانية سيتم استخدامها لترتيب مقاطع الفيديو حسب تاريخ التوجه. من خلال اختيار أحدث تاريخ ، يمكننا استرداد أحدث الإحصائيات لكل مقطع فيديو.Windows ، يمكنك الآن إضافة أعمدة جديدة trendingDays لعدد أيام الاتجاه rank ترتيب تاريخ التوجه عن طريق الترتيب.rank ، مع أخذ السجلات فقط مع rank 1.DataFrame إلى Dataset[VideoStats] .partitionBy و orderBy Window ؛ count ، طرق rank من org.apache.spark.sql.functions عند العمل مع Dataset . للتحقق من نتيجة عملك ، انتقل إلى App.scala ، وضبط SparkRepositories ، وإضافة المرحلة ، وقم بتشغيل الرمز. سيقوم بإنشاء ملف الإخراج في المسار المقابل.
Pipeline .Connector و SparkRepository ، وكيفية تعيين Deliveries منها. سنقوم الآن بحساب درجة شعبية كل مقطع فيديو ، بعد الحصول على أحدث إحصائيات. كما قيل سابقًا ، فإن صيغتنا بسيطة للغاية وقد لا تمثل الواقع. دعونا نذكر أن الصيغة هي views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight . باستخدام النتيجة السابقة VideoStats ، سنقوم ببساطة بتطبيق الصيغة ، وفرز البيانات بأعلى درجة إلى أدنى درجة.
هذا هو آخر تحول للبيانات. اضبط التكوين بحيث يمكنك حفظ Dataset[VideoStats] . لإضافة الثوابت المستخدمة للصيغة ، ستحتاج إلى تعيين Inputs في Pipeline . قبل إضافة مراحل في Pipeline ، استخدم setInput[T](<value>, <id>) لتعيين الثوابت. يمكن استرداد هذه المدخلات في أي وقت في أي Factories بمجرد إضافة Pipeline .
لن تكون هناك حاجة إلى كيان هنا. سنقوم ببساطة بفرز البيانات السابقة وإسقاط الأعمدة المستخدمة لحساب النتيجة بحيث لا يزال بإمكاننا استخدام كيان VideoStats .
في هذا المصنع ، كل ما عليك فعله هو قراءة الإدخال ، ونقلها إلى Transformer الذي سيقوم بمعالجة البيانات ، وكتابة الإخراج. يجب أن تكون بسيطة جدا. يمكنك محاولة تقليد Factories الأخرى.
Deliverable : Connector ، SparkRepository و/أو Input .دعنا نطبيع عدد الإعجابات/التكره على عدد المشاهدات. لكل سجل ، قسّم عدد الإعجابات بعدد المشاهدات ، ثم عدد التهم بعدد المشاهدات. بعد ذلك ، احصل على نسبة الإعجابات "الطبيعية".
دعونا الآن نطبيع عدد التعليقات. لكل سجل ، قسّم عدد التعليقات على عدد المشاهدات.
يمكننا الآن حساب درجة الشعبية. ذكّر أن الصيغة هي: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * likesWeight + normalizedComments * commentsWeight .
ومع ذلك ، هناك مقاطع فيديو حيث يتم تعطيل التعليقات. في هذه الحالة ، تصبح الصيغة: views * viewsWeight + trendingDays * trendingDaysWeight + normalizedLikesPercentage * (likesWeight + commentsWeight) . قررنا بشكل تعسفي أن تكون الأوزان:
viewsWeight = 0.4trendingDaysWeight = 0.35likesWeight = 0.2commentsWeight = 0.05when تعمل otherwise من org.apache.spark.sql.functions . فرز score بالترتيب التنازلي ، واتخذ 100 سجل أولي. لديك الآن 100 مقاطع فيديو "شعبية" من المناطق العشر.
للتحقق من نتيجة عملك ، انتقل إلى App.scala ، وقم بتعيين Inputs إذا لم يتم تعيينها بالفعل ، وقم بتعيين الإخراج SparkRepository ، وإضافة المرحلة ، وقم بتشغيل الرمز. سيقوم بإنشاء ملف الإخراج في المسار المقابل.
Deliveries : Input ، Connector و SparkRepository ، مع deliveryId .Stage ، بما في ذلك Factory Transformer(s) .إذا كنت تحب هذا المشروع ، فيرجى مراجعة SETL Framework هنا: https://github.com/setl-developers/setl ، ولماذا لا تجلب مساهمتك!