
محتويات
Petastorm هي مكتبة الوصول إلى البيانات مفتوحة المصدر تم تطويرها في Uber ATG. تتيح هذه المكتبة الجهاز الفردي أو التدريب الموزعة وتقييم نماذج التعلم العميق مباشرة من مجموعات البيانات بتنسيق Apache Parquet. يدعم Petastorm أطر عمل التعلم الآلي القائم على Python (ML) مثل TensorFlow و Pytorch و Pyspark. يمكن أيضًا استخدامه من رمز Python النقي.
موقع الويب الوثائق: https://petastorm.readthedocs.io
pip install petastorm هناك العديد من التبعيات الإضافية التي يتم تعريفها بواسطة حزمة petastorm التي لم يتم تثبيتها تلقائيًا. الإضافات هي: tf ، tf_gpu ، torch ، opencv ، docs ، test .
على سبيل المثال لتركيب نسخة GPU من TensorFlow و OpenCV ، استخدم أمر PIP التالي:
pip install petastorm[opencv,tf_gpu]يتم تخزين مجموعة بيانات تم إنشاؤها باستخدام Petastorm بتنسيق Apache Parquet. علاوة على مخطط الباركيه ، يقوم Petastorm أيضًا بتخزين معلومات مخطط المستوى الأعلى التي تجعل المصفوفات متعددة الأبعاد في جزء أصلي من مجموعة بيانات Petastorm.
Petastorm يدعم برامج ترميز البيانات القابلة للتمديد. هذه تمكن المستخدم من استخدام أحد ضغط البيانات القياسي (JPEG ، PNG) أو تنفيذها.
يتم إنشاء مجموعة بيانات باستخدام pyspark. يدعم Pyspark تنسيق Parquet ، مما يجعل من السهل تشغيله على جهاز واحد أو على مجموعة حساب الشرارة. فيما يلي مثال أضيق الحدود كتابة جدول مع بعض البيانات العشوائية.
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema هو مثال على كائن Unischema . Unischema قادرة على تقديم أنواع من حقولها في تنسيقات مختلفة إطار محددة ، مثل: Spark StructType و Tensorflow tf.DType و numpy numpy.dtype .type ، shape ، مثيل codec وما إذا كان الحقل غير قابلة لكل حقل من Unischema ..master('local[2]') ). بالطبع لتوليد مجموعة البيانات على نطاق واسع ، سنحتاج إلى مجموعة حسابية حقيقية.materialize_dataset Context Manager. مدير السياق مسؤول عن تكوين حجم مجموعة الصف في البداية وكتابة بيانات التعريف الخاصة بـ Petastorm في النهاية.row_generator لذلك.dict_to_spark_row يحول القاموس إلى كائن pyspark.Row مع ضمان امتثال SCHEMA HelloWorldSchema (يتم اختبار الشكل والنوع والشرط غير المألوف).pyspark.DataFrame ، نكتبها إلى تخزين الباركيه. يتم اشتقاق مخطط الباركيه تلقائيًا من HelloWorldSchema . فئة petastorm.reader.Reader هي نقطة الدخول الرئيسية لرمز المستخدم الذي يصل إلى البيانات من إطار ML مثل TensorFlow أو Pytorch. يحتوي القارئ على ميزات متعددة مثل:
قراءة مجموعة البيانات بسيطة باستخدام فئة petastorm.reader.Reader التي يمكن إنشاؤها باستخدام طريقة مصنع petastorm.make_reader :
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... وملف file://... هي بروتوكولات عنوان URL.
بمجرد إنشاء Reader ، يمكنك استخدامه كمؤلف.
لربط القارئ في رسم بياني TensorFlow ، يمكنك استخدام وظيفة tf_tensors :
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) بدلاً من ذلك ، يمكنك استخدام tf.data.Dataset API ؛
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) كما هو موضح في pytorch_example.py ، يمكن قراءة مجموعة بيانات petastorm من Pytorch عبر فئة المحول petastorm.pytorch.DataLoader ، والتي تسمح بتوفير وظيفة pytorch المخصصة وتحويلها.
تأكد من تثبيت torch و torchvision :
pip install torchvision يفترض المثال الحد الأدنى أدناه تعريف وظائف الفئة Net train test ، المدرجة في pytorch_example :
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) إذا كنت تعمل بأحجام دفع كبيرة جدًا ولا تحتاج إلى دعم للملاحظة العشرية/الأوتار ، فنحن نقدم petastorm.pytorch.BatchedDataLoader التي يمكن أن تتجنب باستخدام موترات الشعلة ( cpu أو cuda ) مع إنتاجية أعلى.
إذا كان حجم مجموعة البيانات الخاصة بك يمكن أن يتناسب مع ذاكرة النظام ، فيمكنك استخدام إصدار Dataloader petastorm.pytorch.InMemBatchedDataLoader . يقرأ Dataloader هذا فقط مجموعة البيانات مرة واحدة ، واختزال بيانات في الذاكرة لتجنب الإدخال/الإخراج الإضافي لعصر متعددة.
يقوم API Converter Spark بتبسيط تحويل البيانات من Spark إلى Tensorflow أو Pytorch. يتم تجسيد إدخال DataFrame لأول مرة في تنسيق parquet ثم يتم تحميله على أنه tf.data.Dataset أو torch.utils.data.DataLoader .
يفترض المثال الحد الأدنى أدناه تعريف نموذج tf.keras المترجم و SPARK DataFrame يحتوي على عمود ميزة متبوعًا بعمود تسمية.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () يفترض المثال الحد الأدنى أدناه تعريف وظائف Net واختبار train test ، المدرجة في pytorch_example.py ، و spark dataframe تحتوي على عمود ميزة متبوعًا بعمود تسمية.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()يمكن قراءة مجموعة بيانات Petastorm في إطار بيانات Spark باستخدام Pyspark ، حيث يمكنك استخدام مجموعة واسعة من أدوات Spark لتحليل مجموعة البيانات ومعالجتها.
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()يمكن استخدام SQL للاستعلام عن مجموعة بيانات Petastorm:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()يمكنك العثور على نموذج رمز كامل هنا: pyspark_hello_world.py ،
يمكن أيضًا استخدام Petastorm لقراءة البيانات مباشرة من متاجر Apache Parquet. لتحقيق ذلك ، استخدم make_batch_reader (وليس make_reader ). يلخص الجدول التالي الاختلافات make_batch_reader و make_reader .
make_reader | make_batch_reader |
|---|---|
| فقط مجموعات بيانات Petastorm (تم إنشاؤها باستخدام steachizes_dataset) | أي متجر Parquet (بعض أنواع الأعمدة الأصلية لم يتم دعمها بعد. |
| يعيد القارئ سجلًا واحدًا في وقت واحد. | القارئ يعيد دفعات من السجلات. لا يتم إصلاح حجم الدُفعة ويحدده حجم مجموعة مجموعة صف Parquet. |
يتم تقييم المتوقعات التي تم تمريرها إلى make_reader لكل صف واحد. | يتم تقييم المتوقعات التي تم تمريرها إلى make_batch_reader لكل دفعة. |
يمكن تصفية ملف parquet بناءً على وسيطة filters . | يمكن تصفية ملف parquet بناءً على وسيطة filters |
راجع صفحة استكشاف الأخطاء وإصلاحها ويرجى إرسال تذكرة إذا لم تتمكن من العثور على إجابة.
نفضل تلقي المساهمات في شكل طلبات سحب Github. يرجى إرسال طلبات السحب مقابل مستودع github.com/uber/petastorm .
للمساهمة في التصحيح:
شكرا لك مقدمًا على مساهماتك!
انظر تطوير المعلومات المتعلقة بالتنمية.