
コンテンツ
Petastormは、Uber ATGで開発されたオープンソースデータアクセスライブラリです。このライブラリは、Apache Parquet形式のデータセットから直接深い学習モデルの単一のマシンまたは分散トレーニングと評価を可能にします。 Petastormは、Tensorflow、Pytorch、Pysparkなどの人気のあるPythonベースの機械学習(ML)フレームワークをサポートしています。純粋なPythonコードからも使用できます。
ドキュメントWebサイト:https://petastorm.readthedocs.io
pip install petastorm自動的にインストールされていないpetastormパッケージによって定義されるいくつかの追加の依存関係があります。エクストラは、 tf 、 tf_gpu 、 torch 、 opencv 、 docs 、 testです。
たとえば、TensorflowとOpenCVのGPUバージョンのインストールをトリガーするには、次のPIPコマンドを使用します。
pip install petastorm[opencv,tf_gpu]Petastormを使用して作成されたデータセットは、Apache Parquet形式で保存されます。 Parquet Schemaの上に、Petastormは、多次元配列をPetastormデータセットのネイティブ部分にする高レベルのスキーマ情報も保存します。
Petastormは、拡張可能なデータコーデックをサポートしています。これらにより、ユーザーは標準データ圧縮(JPEG、PNG)のいずれかを使用したり、独自のものを実装できます。
データセットの生成は、Pysparkを使用して実行されます。 PysparkはネイティブにParquet形式をサポートしているため、単一のマシンまたはSpark Computeクラスターで簡単に実行できます。これは、いくつかのランダムデータを使用したテーブルを書き出す最小限の例です。
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchemaは、 Unischemaオブジェクトのインスタンスです。 Unischema 、Spark StructType 、Tensorflow tf.DType 、numpy numpy.dtypeなど、そのフィールドの種類をさまざまなフレームワーク固有の形式にレンダリングすることができます。type 、 shape 、 codecインスタンス、およびUnischemaの各フィールドでフィールドがnullageであるかどうかを指定する必要があります。.master('local[2]') )でPysparkを起動します。もちろん、大規模なデータセット生成の場合、実際の計算クラスターが必要です。materialize_dataset Context Managerでラップします。 Context Managerは、最初に行のグループサイズを構成し、最後にPetastorm固有のメタデータを作成する責任があります。row_generator関数を使用します。dict_to_spark_rowスキーマHelloWorldSchemaコンプライアンスを確保しながら、辞書をpyspark.Rowオブジェクトに変換します(形状、タイプ、およびIS-Nullable条件がテストされます)。pyspark.DataFrameができたら、それを寄木細工の保管場所に書き留めます。寄木細工スキーマは、 HelloWorldSchemaから自動的に導出されます。 petastorm.reader.Readerクラスは、TensorflowやPytorchなどのMLフレームワークからのデータにアクセスするユーザーコードのメインエントリポイントです。読者には次のような複数の機能があります。
データセットを読むことは、 petastorm.reader.Readerクラスpetastorm.make_reader使用して簡単です。
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://...およびfile://...はサポートされているURLプロトコルです。
Readerがインスタンス化されたら、それをイテレーターとして使用できます。
読者をTensorflowグラフに接続するには、 tf_tensors関数を使用できます。
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors ))または、新しいtf.data.Dataset APIを使用できます。
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) pytorch_example.pyに示されているように、PytorchからPetastormデータセットを読むことは、アダプタークラスpetastorm.pytorch.DataLoaderを介して実行できます。
torchとtorchvisionがインストールされていることを確認してください。
pip install torchvision以下のミニマリストの例は、 pytorch_exampleに含まれるNetクラスとtrainおよびtest機能の定義を想定しています。
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader )非常に大きなバッチサイズを使用してcudaて、小数/文字列をサポートしていない場合は、 petastorm.pytorch.BatchedDataLoaderを提供しますcpu
データセットのサイズがシステムメモリに適合できる場合、メモリ版のデータローダーpetastorm.pytorch.InMemBatchedDataLoaderを使用できます。このDataloaderはデータセットを1回だけ読み取り、メモリ内のデータをキャッシュして、複数のエポックの追加のI/Oを避けます。
Spark Converter APIは、SparkからTensorflowまたはPytorchへのデータ変換を簡素化します。入力Spark DataFrameは、最初にParquet形式で実現され、次にtf.data.Datasetまたはtorch.utils.data.DataLoaderとしてロードされます。
以下のミニマリストの例は、コンパイルされたtf.kerasモデルの定義と、機能列に続いてラベル列が続くSpark DataFrameを想定しています。
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete ()以下のミニマリストの例は、pytorch_example.pyに含まれるNetクラスとtrainおよびtest機能の定義と、機能列とラベル列が続くSpark Dataframeを想定しています。
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()PetaStormデータセットは、Pysparkを使用してSparkデータフレームに読み込むことができます。ここでは、幅広いスパークツールを使用してデータセットを分析および操作できます。
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQLを使用して、Petastormデータセットを照会できます。
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()完全なコードサンプルはこちらをご覧ください:pyspark_hello_world.py、
Petastormは、Apache Parquet Storesから直接データを読むためにも使用できます。それを達成するには、 make_batch_reader ( make_readerではありません)を使用します。次の表は、違いmake_batch_readerとmake_reader関数をまとめたものです。
make_reader | make_batch_reader |
|---|---|
| PetaStormデータセットのみ(gaterizes_datasetを使用して作成) | 寄木細工店(一部のネイティブの寄木細工列タイプはまだサポートされていません。 |
| 読者は一度に1つのレコードを返します。 | 読者はレコードのバッチを返します。バッチのサイズは固定されておらず、Parquet Row-Groupサイズで定義されています。 |
make_readerに渡された述語は、単一行ごとに評価されます。 | make_batch_readerに渡された述語は、バッチごとに評価されます。 |
filters引数に基づいて寄木細工ファイルをフィルタリングできます。 | filters引数に基づいて寄木細工ファイルをフィルタリングできます |
トラブルシューティングページを参照してください。回答が見つからない場合は、チケットを送信してください。
GitHub Pullリクエストの形での寄付を受けることをお勧めします。 github.com/uber/petastormリポジトリに対してプルリクエストを送信してください。
パッチを提供するには:
よろしくお願いいたします。
開発関連情報の開発を参照してください。