
Contenido
Petastorm es una biblioteca de acceso a datos de código abierto desarrollada en Uber ATG. Esta biblioteca permite capacitación y evaluación de una sola máquina o distribuida de modelos de aprendizaje profundo directamente desde conjuntos de datos en formato de parquet Apache. Petastorm admite marcos populares de aprendizaje automático basado en Python (ML) como TensorFlow, Pytorch y Pyspark. También se puede usar a partir del código de Python puro.
Sitio web de documentación: https://petastorm.readthedocs.io
pip install petastorm Hay varias dependencias adicionales definidas por el paquete petastorm que no se instalan automáticamente. Los extras son: tf , tf_gpu , torch , opencv , docs , test .
Por ejemplo, para activar la instalación de la versión GPU de TensorFlow y OpenCV, use el siguiente comando PIP:
pip install petastorm[opencv,tf_gpu]Un conjunto de datos creado usando Petastorm se almacena en formato Apache Parquet. Además de un esquema de parquet, Petastorm también almacena información de esquemas de nivel superior que convierte matrices multidimensionales en una parte nativa de un conjunto de datos de tormenta.
Petastorm admite códecs de datos extensibles. Estos permiten a un usuario usar una de las compresiones de datos estándar (JPEG, PNG) o implementar la suya propia.
Generación de un conjunto de datos se realiza utilizando Pyspark. Pyspark es compatible con el formato de parquet, lo que facilita la ejecución en una sola máquina o en un clúster de cómputo de chispa. Aquí hay un ejemplo minimalista que escribe una tabla con algunos datos aleatorios.
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema es una instancia de un objeto Unischema . Unischema es capaz de hacer que los tipos de sus campos en diferentes formatos específicos del marco, como: Spark StructType , Tensorflow tf.DType y Numpy numpy.dtype .type , shape , una instancia codec y si el campo es anulable para cada campo de Unischema ..master('local[2]') ). Por supuesto, para una generación de conjuntos de datos a mayor escala, necesitaríamos un clúster de cómputo real.materialize_dataset . El administrador de contexto es responsable de configurar el tamaño del grupo de filas al principio y escribir metadatos específicos de Petastorm al final.row_generator para eso.dict_to_spark_row convierte el diccionario en un objeto pyspark.Row al tiempo que garantiza el cumplimiento del esquema HelloWorldSchema (se prueba la forma, el tipo y la condición anulable).pyspark.DataFrame , lo escribimos en un almacenamiento de parquet. El esquema de Parquet se deriva automáticamente de HelloWorldSchema . La clase petastorm.reader.Reader es el principal punto de entrada para el código de usuario que accede a los datos desde un marco ML como TensorFlow o PyTorch. El lector tiene múltiples características como:
Leer un conjunto de datos es simple usando la clase petastorm.reader.Reader que se puede crear utilizando el método de fábrica petastorm.make_reader :
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... y file://... son protocolos URL compatibles.
Una vez que un Reader es instanciado, puede usarlo como iterador.
Para conectar al lector en un gráfico TensorFlow, puede usar la función tf_tensors :
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) Alternativamente, puede usar una nueva API tf.data.Dataset ;
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) Como se ilustra en pytorch_example.py, la lectura de un conjunto de datos Petastorm de Pytorch se puede hacer a través de la clase de adaptador petastorm.pytorch.DataLoader , que permite que se suministren la función de recopilación de Pytorch personalizada y se transformarán.
Asegúrese de tener instalado torch y torchvision :
pip install torchvision El ejemplo minimalista a continuación supone la definición de una clase Net y funciones de train y test , incluidas en pytorch_example :
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) Si está trabajando con tamaños de lotes muy grandes y no necesita soporte para decimales/cadenas, proporcionamos una petastorm.pytorch.BatchedDataLoader que puede amortiguar el uso de tensores de antorcha ( cpu o cuda ) con un rendimiento significativamente mayor.
Si el tamaño de su conjunto de datos puede caber en la memoria del sistema, puede usar una versión en memoria dataLoader petastorm.pytorch.InMemBatchedDataLoader . Este dataloader solo lee el conjunto de datos una vez, y almacena datos en la memoria para evitar E/S adicionales para múltiples épocas.
La API del convertidor de chispa simplifica la conversión de datos de Spark a TensorFlow o Pytorch. El marcado de datos Spark de entrada se materializa primero en el formato Parquet y luego se carga como un tf.data.Dataset o torch.utils.data.DataLoader .
El ejemplo minimalista a continuación supone la definición de un modelo tf.keras compilado y un marcado de datos de chispa que contiene una columna de características seguida de una columna de etiqueta.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () El siguiente ejemplo minimalista supone la definición de una clase Net y las funciones train y test , incluidas en pytorch_example.py, y un marcador de datos de chispa que contiene una columna de características seguida de una columna de etiqueta.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()Se puede leer un conjunto de datos de Petastorm en un marco de datos Spark utilizando Pyspark, donde puede usar una amplia gama de herramientas de chispa para analizar y manipular el conjunto de datos.
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL se puede utilizar para consultar un conjunto de datos Petastorm:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()Puede encontrar una muestra de código completa aquí: pyspark_hello_world.py,
Petastorm también se puede usar para leer datos directamente de las tiendas Apache Parquet. Para lograr eso, use make_batch_reader (y no make_reader ). La siguiente tabla resume las diferencias make_batch_reader y las funciones make_reader .
make_reader | make_batch_reader |
|---|---|
| Solo los conjuntos de datos de Petastorm (creados usando Materialize_DataSet) | Cualquier tienda Parquet (algunos tipos de columnas de parquet nativos aún no son compatibles. |
| El lector devuelve un registro a la vez. | El lector devuelve lotes de registros. El tamaño del lote no es fijo y definido por el tamaño del grupo de hileras Parquet. |
Los predicados pasados a make_reader se evalúan por fila única. | Los predicados pasados make_batch_reader se evalúan por lote. |
Puede filtrar el archivo parquet basado en el argumento filters . | Puede filtrar el archivo parquet basado en el argumento filters |
Vea la página de solución de problemas y envíe un boleto si no puede encontrar una respuesta.
Preferimos recibir contribuciones en forma de solicitudes de extracción de GitHub. Envíe solicitudes de extracción contra el repositorio github.com/uber/petastorm .
Para contribuir con un parche:
¡Gracias de antemano por sus contribuciones!
Consulte el desarrollo para la información relacionada con el desarrollo.