
Contenu
Petastorm est une bibliothèque d'accès aux données open source développée à Uber ATG. Cette bibliothèque permet une formation et une évaluation distribuées à machine unique ou distribuées des modèles d'apprentissage en profondeur directement à partir des ensembles de données au format parquet Apache. Petastorm prend en charge les cadres populaires d'apprentissage automatique (ML) populaires tels que TensorFlow, Pytorch et Pyspark. Il peut également être utilisé à partir du code Python pur.
Documentation Site Web: https://petastorm.readthedocs.io
pip install petastorm Il existe plusieurs dépendances supplémentaires définies par le package petastorm qui ne sont pas installées automatiquement. Les extras sont: tf , tf_gpu , torch , opencv , docs , test .
Par exemple, pour déclencher l'installation de la version GPU de TensorFlow et OpenCV, utilisez la commande PIP suivante:
pip install petastorm[opencv,tf_gpu]Un ensemble de données créé à l'aide de PetaStorm est stocké au format parquet Apache. En plus d'un schéma de parquet, Petastorm stocke également des informations de schéma de niveau supérieur qui font des tableaux multidimensionnels en une partie native d'un ensemble de données Petastorm.
Petastorm prend en charge les codecs de données extensibles. Ceux-ci permettent à un utilisateur d'utiliser l'une des compressions de données standard (JPEG, PNG) ou d'implémenter la sienne.
La génération d'un ensemble de données se fait à l'aide de Pyspark. Pyspark prend en charge le format Parquet, ce qui facilite l'exécution sur une seule machine ou sur un cluster Spark Compute. Voici un exemple minimaliste écrivant un tableau avec quelques données aléatoires.
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema est une instance d'un objet Unischema . Unischema est capable de rendre des types de ses champs dans différents formats spécifiques à un cadre, tels que: Spark StructType , TensorFlow tf.DType et Numpy numpy.dtype .type , shape , une instance codec et si le champ est nullable pour chaque champ de l' Unischema ..master('local[2]') ). Bien sûr, pour une génération d'ensembles de données à plus grande échelle, nous aurions besoin d'un véritable cluster de calcul.materialize_dataset . Le gestionnaire de contexte est chargé de configurer la taille du groupe de lignes au début et d'écrire des métadonnées spécifiques à Petastorm à la fin.row_generator pour cela.dict_to_spark_row convertit le dictionnaire en un objet pyspark.Row tout en garantissant la conformité Schema HelloWorldSchema (la forme, le type et la condition est nulle sont testées).pyspark.DataFrame nous l'écrivons à un stockage de parquet. Le schéma du parquet est automatiquement dérivé de HelloWorldSchema . La classe petastorm.reader.Reader est le principal point de saisie du code utilisateur qui accède aux données à partir d'un cadre ML tel que TensorFlow ou Pytorch. Le lecteur a plusieurs fonctionnalités telles que:
La lecture d'un ensemble de données est simple à l'aide de la classe petastorm.reader.Reader qui peut être créée à l'aide de la méthode d'usine petastorm.make_reader :
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... et file://... sont des protocoles URL pris en charge.
Une fois qu'un Reader est instancié, vous pouvez l'utiliser comme itérateur.
Pour connecter le lecteur dans un graphique TensorFlow, vous pouvez utiliser la fonction tf_tensors :
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) Alternativement, vous pouvez utiliser une nouvelle API tf.data.Dataset ;
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) Comme illustré dans pytorch_example.py, la lecture d'un ensemble de données Petastorm à partir de Pytorch peut être effectuée via la classe adaptateur petastorm.pytorch.DataLoader , qui permet à la fonction de collation de pytorch personnalisée et des transformations à fournir.
Assurez-vous que torch et torchvision sont installées:
pip install torchvision L'exemple minimaliste ci-dessous suppose la définition d'une classe Net et des fonctions train et test , incluses dans pytorch_example :
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) Si vous travaillez avec de très grandes tailles de lots et que vous n'avez pas besoin de support pour décimales / chaînes, nous fournissons un petastorm.pytorch.BatchedDataLoader qui peut tamponner à l'aide de tenseurs de torche ( cpu ou cuda ) avec un débit significativement plus élevé.
Si la taille de votre ensemble de données peut s'intégrer dans la mémoire du système, vous pouvez utiliser une version en mémoire de DatalOader petastorm.pytorch.InMemBatchedDataLoader . Ce dataloader ne lit une seule fois que l'ensemble de données et met en cache des données en mémoire pour éviter des E / S supplémentaires pour plusieurs époques.
L'API du convertisseur Spark simplifie la conversion de données de Spark en TensorFlow ou Pytorch. La fracture des données de l'entrée est d'abord matérialisée au format Parquet, puis chargée en tant que tf.data.Dataset ou torch.utils.data.DataLoader .
L'exemple minimaliste ci-dessous suppose la définition d'un modèle tf.keras compilé et d'une colonne de données Spark contenant une colonne de fonctionnalité suivie d'une colonne d'étiquette.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () L'exemple minimaliste ci-dessous suppose la définition d'une classe Net et des fonctions train et test , incluses dans pytorch_example.py, et un spark dataframe contenant une colonne de fonctionnalité suivie d'une colonne d'étiquette.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()Un ensemble de données PetaStorm peut être lu dans un Spark DataFrame à l'aide de Pyspark, où vous pouvez utiliser une large gamme d'outils Spark pour analyser et manipuler l'ensemble de données.
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL peut être utilisé pour interroger un ensemble de données Petastorm:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()Vous pouvez trouver un exemple de code complet ici: pyspark_hello_world.py,
Petastorm peut également être utilisé pour lire les données directement à partir des magasins Apache Parquet. Pour y parvenir, utilisez make_batch_reader (et non make_reader ). Le tableau suivant résume les différences que les fonctions make_batch_reader et make_reader .
make_reader | make_batch_reader |
|---|---|
| Seuls les ensembles de données PetaStorm (créés à l'aide de matérializes_dataset) | Tout magasin Parquet (certains types de colonnes de parquet natifs ne sont pas encore pris en charge. |
| Le lecteur renvoie un enregistrement à la fois. | Le lecteur renvoie des lots d'enregistrements. La taille du lot n'est pas fixe et définie par la taille du groupe de lignes parquet. |
Les prédicats transmis à make_reader sont évalués par ligne unique. | Les prédicats transmis à make_batch_reader sont évalués par lot. |
Peut filtrer le fichier de parquet en fonction de l'argument filters . | Peut filtrer le fichier de parquet basé sur l'argument filters |
Voir la page de dépannage et veuillez soumettre un billet si vous ne trouvez pas de réponse.
Nous préférons recevoir des contributions sous forme de demandes de traction GitHub. Veuillez envoyer des demandes de traction contre le référentiel github.com/uber/petastorm .
Pour contribuer un patch:
Merci d'avance pour vos contributions!
Voir le développement des informations liées au développement.