
Inhalt
Petastorm ist eine Open -Source -Datenzugriffsbibliothek, die bei Uber ATG entwickelt wurde. Diese Bibliothek ermöglicht einzelne Maschine oder verteilte Schulungen und Bewertung von Deep -Learning -Modellen direkt aus Datensätzen im Apache -Parquetformat. Petastorm unterstützt beliebte Python-basierte maschinelle Lernrahmen (ML) wie Tensorflow, Pytorch und PySpark. Es kann auch aus reinem Python -Code verwendet werden.
Dokumentationswebsite: https://petastorm.readthedocs.io
pip install petastorm Es gibt mehrere zusätzliche Abhängigkeiten, die durch das petastorm -Paket definiert werden, die nicht automatisch installiert sind. Die Extras sind: tf , tf_gpu , torch , opencv , docs , test .
Verwenden Sie beispielsweise den folgenden PIP -Befehl, um die Installation der GPU -Version von TensorFlow und OpenCV auszulösen:
pip install petastorm[opencv,tf_gpu]Ein mit Petastorm erstellter Datensatz wird im Apache -Parquetformat gespeichert. Zusätzlich zu einem Parkettschema speichert Petastorm auch Schema-Informationen über höhere Ebene, die mehrdimensionale Arrays zu einem nativen Teil eines Petastorm-Datensatzes machen.
Petastorm unterstützt erweiterbare Datencodecs. Diese ermöglichen es einem Benutzer, eine der Standarddatenkompressionen (JPEG, PNG) zu verwenden oder ihre eigenen zu implementieren.
Das Generieren eines Datensatzes erfolgt mit PYSPARK. PYSPARK unterstützt das Parquetformat nativ und erleichtert es einfach, auf einer einzelnen Maschine oder auf einem Spark Compute -Cluster auszuführen. Hier ist ein minimalistisches Beispiel, das eine Tabelle mit einigen zufälligen Daten ausstellt.
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema ist eine Instanz eines Unischema . Unischema ist in der Lage tf.DType Arten seiner Felder in verschiedene rahmenspezifische Formate zu verwandeln StructType wie z numpy.dtypetype , shape , eine codec -Instanz angeben und ob das Feld für jedes Feld des Unischema nullbar ist..master('local[2]') ). Natürlich würden wir für eine größere Datensatzgenerierung einen echten Rechencluster benötigen.materialize_dataset . Der Kontextmanager ist für die Konfiguration der Zeilengruppengröße am Anfang verantwortlich und schreibt am Ende petastormspezifische Metadaten auf.row_generator .dict_to_spark_row konvertiert das Wörterbuch in ein pyspark.Row -Objekt und stellt sicher, dass Schema HelloWorldSchema Konformität (Form, Typ und Nullable-Bedingung getestet werden).pyspark.DataFrame haben, schreiben wir ihn in einen Parquetspeicher aus. Das Parkettschema wird automatisch von HelloWorldSchema abgeleitet. Die Klasse petastorm.reader.Reader ist der Haupteinstiegspunkt für den Benutzercode, der auf die Daten aus einem ML -Framework wie TensorFlow oder Pytorch zugreift. Der Leser verfügt über mehrere Funktionen wie:
Das Lesen eines Datensatzes ist einfach mit der Klasse petastorm.reader.Reader , die mit der Fabrikmethode petastorm.make_reader erstellt werden kann:
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... und file://... werden unterstützte URL -Protokolle.
Sobald ein Reader instanziiert ist, können Sie ihn als Iterator verwenden.
Um den Leser in ein TensorFlow -Diagramm zu verbinden, können Sie die Funktion tf_tensors verwenden:
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) Alternativ können Sie eine neue tf.data.Dataset -API verwenden.
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) Wie in pytorch_example.py dargestellt, kann das Lesen eines Petastorm -Datensatzes von Pytorch über die Adapter -Klasse petastorm.pytorch.DataLoader erfolgen, wodurch die kundenspezifische Pytorch -Sammlungsfunktion und die Transformationen geliefert werden können.
Stellen Sie sicher, dass Sie torch und torchvision installiert haben:
pip install torchvision Das nachstehende minimalistische Beispiel wird von der Definition einer Net sowie train und test angenommen, die in pytorch_example enthalten sind:
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) Wenn Sie mit sehr großen Chargengrößen arbeiten und keine Unterstützung für Dezimal-/Zeichenfolgen benötigen, bieten wir einen petastorm.pytorch.BatchedDataLoader , der mit Tensoren ( cpu oder cuda ) mit einem signifikant höheren Durchsatz pufferen kann.
Wenn die Größe Ihres Datensatzes in den Systemspeicher passen kann, können Sie eine In-Memory-Version Dataloader petastorm.pytorch.InMemBatchedDataLoader verwenden. Dieser Dataloader liest den Datensatz nur einmal und richtete Daten im Speicher ein, um zusätzliche E/A für mehrere Epochen zu vermeiden.
Die Spark Converter API vereinfacht die Datenumwandlung von Spark zu Tensorflow oder Pytorch. Der Eingangs -Spark DataFrame wird zuerst im Parquetformat materialisiert und dann als tf.data.Dataset oder torch.utils.data.DataLoader geladen.
Das nachstehende minimalistische Beispiel geht aus der Definition eines kompilierten tf.keras -Modells und eines Spark -Datenfrequers mit einer Feature -Spalte, gefolgt von einer Beschriftungsspalte.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () Das nachstehende minimalistische Beispiel wird von der Definition einer Net sowie train und test angenommen, die in pytorch_example.py enthalten sind, und eines Spark -Datenframe, der eine Feature -Spalte enthält, gefolgt von einer Beschriftungsspalte.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()Ein Petastorm -Datensatz kann mit PYSPARK in einen Spark -Datenframe gelesen werden, in dem Sie eine Vielzahl von Spark -Tools verwenden können, um den Datensatz zu analysieren und zu manipulieren.
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL kann verwendet werden, um einen Petastorm -Datensatz abzufragen:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()Hier finden Sie ein vollständiges Code -Beispiel: pyspark_hello_world.py,
Petastorm kann auch verwendet werden, um Daten direkt aus Apache -Parquetspeichern zu lesen. Um dies zu erreichen, verwenden Sie make_batch_reader (und nicht make_reader ). Die folgende Tabelle fasst die Unterschiede make_batch_reader und make_reader functions zusammen.
make_reader | make_batch_reader |
|---|---|
| Nur Petastorm -Datensätze (erstellt mit materializes_dataset) | Jeder Parkettgeschäft (einige native Parquet -Säulentypen werden noch nicht unterstützt. |
| Der Leser gibt jeweils einen Datensatz zurück. | Der Leser gibt Rekordstapel zurück. Die Größe der Charge wird nicht durch die Größe der Parquetzapfengruppe festgelegt und definiert. |
Prädikate, die an make_reader übergeben wurden, werden pro einzelner Zeile ausgewertet. | Prädikate, die an make_batch_reader übergeben wurden, werden pro Charge ausgewertet. |
Kann die Parquetdatei basierend auf dem filters filtern. | Kann die Parquetdatei basierend auf dem filters filtern |
Sehen Sie sich die Seite Fehlerbehebung an und senden Sie bitte ein Ticket ein, wenn Sie keine Antwort finden können.
Wir bevorzugen Beiträge in Form von Github -Pull -Anfragen. Bitte senden Sie Pull -Anfragen gegen das github.com/uber/petastorm Repository.
Einen Patch beitragen:
Vielen Dank im Voraus für Ihre Beiträge!
Siehe die Entwicklung für Entwicklungsbezogene Informationen.