
Isi
Petastorm adalah pustaka akses data sumber terbuka yang dikembangkan di Uber ATG. Perpustakaan ini memungkinkan mesin tunggal atau pelatihan terdistribusi dan evaluasi model pembelajaran mendalam langsung dari dataset dalam format Parquet Apache. Petastorm mendukung kerangka pembelajaran mesin (ML) berbasis Python yang populer seperti TensorFlow, Pytorch, dan Pyspark. Ini juga dapat digunakan dari kode Python murni.
Situs Web Dokumentasi: https://petastorm.readthedocs.io
pip install petastorm Ada beberapa dependensi tambahan yang ditentukan oleh paket petastorm yang tidak diinstal secara otomatis. Ekstra adalah: tf , tf_gpu , torch , opencv , docs , test .
Misalnya untuk memicu pemasangan versi GPU dari TensorFlow dan OpenCV, gunakan perintah PIP berikut:
pip install petastorm[opencv,tf_gpu]Dataset yang dibuat menggunakan Petastorm disimpan dalam format Parquet Apache. Di atas skema parket, Petastorm juga menyimpan informasi skema tingkat lebih tinggi yang membuat susunan multidimensi menjadi bagian asli dari dataset Petastorm.
Petastorm mendukung codec data yang dapat diperluas. Ini memungkinkan pengguna untuk menggunakan salah satu kompresi data standar (JPEG, PNG) atau mengimplementasikannya sendiri.
Menghasilkan dataset dilakukan dengan menggunakan pyspark. Pyspark secara asli mendukung format parket, membuatnya mudah dijalankan pada satu mesin atau pada cluster komputasi percikan. Berikut adalah contoh minimalis menulis tabel dengan beberapa data acak.
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema adalah contoh dari objek Unischema . Unischema mampu membuat jenis bidangnya ke dalam format spesifik kerangka kerja yang berbeda, seperti: Spark StructType , TensorFlow tf.DType dan numpy numpy.dtype .type , shape , instance codec dan apakah bidang tersebut dapat dibatalkan untuk setiap bidang Unischema ..master('local[2]') ). Tentu saja untuk generasi dataset skala yang lebih besar, kami akan membutuhkan kluster komputasi nyata.materialize_dataset . Manajer Konteks bertanggung jawab untuk mengonfigurasi ukuran grup baris di awal dan menulis metadata spesifik Petastorm di akhir.row_generator untuk itu.dict_to_spark_row Mengubah kamus menjadi objek pyspark.Row sambil memastikan skema HelloWorldSchema kepatuhan (bentuk, jenis dan kondisi yang dapat diuji).pyspark.DataFrame kami menulisnya ke penyimpanan parket. Skema parket secara otomatis berasal dari HelloWorldSchema . Kelas petastorm.reader.Reader adalah titik masuk utama untuk kode pengguna yang mengakses data dari kerangka ML seperti TensorFlow atau Pytorch. Pembaca memiliki banyak fitur seperti:
Membaca dataset sederhana menggunakan kelas petastorm.reader.Reader yang dapat dibuat menggunakan metode pabrik petastorm.make_reader :
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... dan file://... didukung protokol URL.
Setelah Reader dipakai, Anda dapat menggunakannya sebagai iterator.
Untuk menghubungkan pembaca ke dalam grafik TensorFlow, Anda dapat menggunakan fungsi tf_tensors :
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) Atau, Anda dapat menggunakan tf.data.Dataset API baru;
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) Seperti yang diilustrasikan dalam pytorch_example.py, membaca dataset petastorm dari pytorch dapat dilakukan melalui kelas adaptor petastorm.pytorch.DataLoader , yang memungkinkan fungsi kumpulkan pytorch khusus dan mengubah untuk disediakan.
Pastikan Anda memiliki torch dan torchvision yang diinstal:
pip install torchvision Contoh minimalis di bawah ini mengasumsikan definisi kelas Net dan fungsi train dan test , termasuk dalam pytorch_example :
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) Jika Anda bekerja dengan ukuran batch yang sangat besar dan tidak memerlukan dukungan untuk desimal/string, kami menyediakan petastorm.pytorch.BatchedDataLoader yang dapat buffer menggunakan tensor obor ( cpu atau cuda ) dengan throughput yang lebih tinggi.
Jika ukuran dataset Anda dapat masuk ke dalam memori sistem, Anda dapat menggunakan versi in-memory Dataloader petastorm.pytorch.InMemBatchedDataLoader . Dataloader ini hanya membaca dataset sekali, dan menyimpan data dalam memori untuk menghindari I/O tambahan untuk beberapa zaman.
Spark Converter API menyederhanakan konversi data dari Spark ke TensorFlow atau Pytorch. Input Spark DataFrame pertama kali terwujud dalam format parket dan kemudian dimuat sebagai tf.data.Dataset atau torch.utils.data.DataLoader .
Contoh minimalis di bawah ini mengasumsikan definisi model tf.keras yang dikompilasi dan bintik -bintik Spark yang berisi kolom fitur diikuti oleh kolom label.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () Contoh minimalis di bawah ini mengasumsikan definisi kelas Net dan fungsi train dan test , termasuk dalam pytorch_example.py, dan bekas data yang berisi kolom fitur yang diikuti oleh kolom label.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()Dataset Petastorm dapat dibaca ke dalam DataFrame Spark menggunakan Pyspark, di mana Anda dapat menggunakan berbagai alat percikan untuk menganalisis dan memanipulasi dataset.
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL dapat digunakan untuk menanyakan dataset Petastorm:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()Anda dapat menemukan sampel kode lengkap di sini: pyspark_hello_world.py,
Petastorm juga dapat digunakan untuk membaca data langsung dari toko Parquet Apache. Untuk mencapai itu, gunakan make_batch_reader (dan bukan make_reader ). Tabel berikut merangkum perbedaan fungsi make_batch_reader dan make_reader .
make_reader | make_batch_reader |
|---|---|
| Hanya dataset Petastorm (dibuat menggunakan materizes_dataset) | Setiap toko parket (beberapa jenis kolom parket asli belum didukung. |
| Pembaca mengembalikan satu rekor pada satu waktu. | Pembaca mengembalikan kumpulan catatan. Ukuran batch tidak diperbaiki dan ditentukan oleh ukuran parket row-grup. |
Predikat yang diteruskan ke make_reader dievaluasi per baris tunggal. | Predikat yang diteruskan ke make_batch_reader dievaluasi per batch. |
Dapat memfilter file parket berdasarkan argumen filters . | Dapat memfilter file parket berdasarkan argumen filters |
Lihat halaman pemecahan masalah dan silakan kirim tiket jika Anda tidak dapat menemukan jawaban.
Kami lebih suka menerima kontribusi dalam bentuk permintaan tarik GitHub. Silakan kirim permintaan tarik terhadap repositori github.com/uber/petastorm .
Untuk menyumbangkan tambalan:
Terima kasih sebelumnya atas kontribusi Anda!
Lihat pengembangan untuk informasi terkait pengembangan.