
Conteúdo
A Petastorm é uma biblioteca de acesso a dados de código aberto desenvolvido no Uber ATG. Esta biblioteca permite uma única máquina ou treinamento distribuído e avaliação de modelos de aprendizado profundo diretamente dos conjuntos de dados no formato Apache Parquet. A Petastorm suporta estruturas populares de aprendizado de máquina baseadas em Python (ML), como Tensorflow, Pytorch e Pyspark. Também pode ser usado a partir do código python puro.
Site de documentação: https://petastorm.readthedocs.io
pip install petastorm Existem várias dependências extras que são definidas pelo pacote petastorm que não são instaladas automaticamente. Os extras são: tf , tf_gpu , torch , opencv , docs , test .
Por exemplo, para acionar a instalação da versão GPU do TensorFlow e OpenCV, use o seguinte comando pip:
pip install petastorm[opencv,tf_gpu]Um conjunto de dados criado usando o Petastorm é armazenado no formato Apache Parquet. No topo de um esquema parquet, o Petastorm também armazena informações de esquema de nível superior que transformam matrizes multidimensionais em uma parte nativa de um conjunto de dados Petastorm.
A Petastorm suporta codecs de dados extensíveis. Isso permite que um usuário use uma das compressões de dados padrão (JPEG, PNG) ou implemente a sua.
A geração de um conjunto de dados é feita usando Pyspark. O Pyspark suporta nativamente o formato Parquet, facilitando a execução em uma única máquina ou em um cluster de computação de faíscas. Aqui está um exemplo minimalista, escrevendo uma tabela com alguns dados aleatórios.
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema é uma instância de um objeto Unischema . Unischema é capaz de renderizar tipos de seus campos em diferentes formatos específicos da estrutura, como: Spark StructType , Tensorflow tf.DType e numpy numpy.dtype .type , shape , uma instância codec e se o campo é anulado para cada campo do Unischema ..master('local[2]') ). Obviamente, para uma geração de dados de maior escala, precisaríamos de um cluster de computação real.materialize_dataset . O gerenciador de contexto é responsável por configurar o tamanho do grupo de linhas no início e escrever metadados específicos do Petastorm no final.row_generator para isso.dict_to_spark_row converte o dicionário em um objeto pyspark.Row , garantindo que o esquema HelloWorldSchema Compliance (Shape, Type e Is Nullable Condition sejam testados).pyspark.DataFrame , escrevemos para um armazenamento de parquet. O esquema parquet é automaticamente derivado do HelloWorldSchema . A classe petastorm.reader.Reader é o principal ponto de entrada para o código de usuário que acessa os dados de uma estrutura ML, como Tensorflow ou Pytorch. O leitor possui vários recursos, como:
A leitura de um conjunto de dados é simples usando a classe petastorm.reader.Reader , que pode ser criada usando o método petastorm.make_reader Factory:
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... e file://... são protocolos de URL suportados.
Depois que um Reader é instanciado, você pode usá -lo como um iterador.
Para conectar o leitor em um gráfico de tensorflow, você pode usar a função tf_tensors :
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) Como alternativa, você pode usar a nova API tf.data.Dataset ;
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) Conforme ilustrado em pytorch_example.py, a leitura de um conjunto de dados Petastorm da Pytorch pode ser feita através da classe adaptadora petastorm.pytorch.DataLoader , que permite que a função de coleta de pytorch personalizada e as transformações sejam fornecidas.
Certifique -se de ter torch e torchvision instalada:
pip install torchvision O exemplo minimalista abaixo assume a definição de uma classe Net e funções train e test , incluídas em pytorch_example :
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) Se você estiver trabalhando com tamanhos de lote muito grandes e não precisar de suporte para decimais/strings, fornecemos um petastorm.pytorch.BatchedDataLoader que pode buffer usando tensores da tocha ( cpu ou cuda ) com um rendimento significativamente maior.
Se o tamanho do seu conjunto de dados puder se encaixar na memória do sistema, você poderá usar uma versão em memória dataloader petastorm.pytorch.InMemBatchedDataLoader . Esse Dataloader apenas lê o conjunto de dados uma vez e armazena em cache dados na memória para evitar E/S adicionais para várias épocas.
A API do Spark Converter simplifica a conversão de dados de Spark para Tensorflow ou Pytorch. O quadro de dados de faísca de entrada é materializado pela primeira vez no formato do parquet e depois carregado como tf.data.Dataset ou torch.utils.data.DataLoader .
O exemplo minimalista abaixo assume a definição de um modelo tf.keras compilado e um quadro de dados de faísca contendo uma coluna de recurso seguida por uma coluna de etiqueta.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () O exemplo minimalista abaixo assume a definição de uma classe Net e funções train e test , incluídos em pytorch_example.py, e um quadro de dados de faísca que contém uma coluna de recurso seguida por uma coluna de etiqueta.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()Um conjunto de dados Petastorm pode ser lido em um quadro de dados de faísca usando Pyspark, onde você pode usar uma ampla gama de ferramentas de faísca para analisar e manipular o conjunto de dados.
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()O SQL pode ser usado para consultar um conjunto de dados Petastorm:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()Você pode encontrar uma amostra de código completa aqui: pyspark_hello_world.py,
O Petastorm também pode ser usado para ler dados diretamente das lojas Apache Parquet. Para conseguir isso, use make_batch_reader (e não make_reader ). A tabela a seguir resume as diferenças make_batch_reader e make_reader Funções.
make_reader | make_batch_reader |
|---|---|
| Somente conjuntos de dados Petastorm (criados usando materializes_dataset) | Qualquer loja de parquet (alguns tipos de colunas parquet nativas ainda não são suportados. |
| O leitor retorna um recorde de cada vez. | O leitor retorna lotes de registros. O tamanho do lote não é fixo e definido pelo tamanho do grupo de linha do parquet. |
Os predicados passados para make_reader são avaliados por linha única. | Os predicados passados para make_batch_reader são avaliados por lote. |
Pode filtrar o arquivo parquet com base no argumento filters . | Pode filtrar o arquivo parquet com base no argumento dos filters |
Consulte a página Solução de problemas e envie um ticket se não conseguir encontrar uma resposta.
Preferimos receber contribuições na forma de solicitações de tração do GitHub. Envie solicitações de puxar contra o repositório github.com/uber/petastorm .
Para contribuir com um patch:
Agradeço antecipadamente por suas contribuições!
Consulte o desenvolvimento para informações relacionadas ao desenvolvimento.