
Содержимое
Petastorm - это библиотека доступа к данным с открытым исходным кодом, разработанная на Uber ATG. Эта библиотека обеспечивает отдельную машину или распределенную подготовку и оценку моделей глубокого обучения непосредственно из наборов данных в формате Parquet Apache. Petastorm поддерживает популярные рамки машинного обучения на основе Python (ML), такие как Tensorflow, Pytorch и Pyspark. Его также можно использовать из кода Pure Python.
Веб -сайт документации: https://petastorm.readthedocs.io
pip install petastorm Есть несколько дополнительных зависимостей, которые определяются пакетом petastorm , которые не установлены автоматически. Дополнительно: tf , tf_gpu , torch , opencv , docs , test .
Например, чтобы запустить установку версии GPU Tensorflow и OpenCV, используйте следующую команду PIP:
pip install petastorm[opencv,tf_gpu]Набор данных, созданный с использованием Petastorm, хранится в формате Apache Parquet. Помимо паркетной схемы, Petastorm также хранит информацию о схеме более высокого уровня, которая превращает многомерные массивы в собственную часть набора данных Petastorm.
Petastorm поддерживает расширяемые кодеки данных. Они позволяют пользователю использовать одну из стандартных сжатий данных (JPEG, PNG) или реализовать ее.
Создание набора данных выполняется с использованием pyspark. Pyspark Nangy поддерживает формат паркета, позволяя легко работать на одной машине или в кластере Spark Compute. Вот минималистичный пример написания таблицы с некоторыми случайными данными.
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema - это случай объекта Unischema . Unischema способен представлять типы своих полей в различные структуры, конкретные форматы, такие как: Spark StructType , Tensorflow tf.DType и Numpy numpy.dtype .type , shape , экземпляр codec и является ли поле для каждого поля Unischema ..master('local[2]') ). Конечно, для более масштабного генерации наборов данных нам понадобится настоящий вычислительный кластер.materialize_dataset Context Manager. Контекст -диспетчер отвечает за настройку размера группы строк в начале и в конце написание метаданных, специфичных, специфичных для петистам.row_generator для этого.dict_to_spark_row преобразует словарь в объект pyspark.Row , обеспечивая при этом схему HelloWorldSchema соответствия (форма, тип и нулевое состояние протестированы).pyspark.DataFrame , мы записываем это на паркетное хранилище. Паркетная схема автоматически получена из HelloWorldSchema . Класс petastorm.reader.Reader является основной точкой входа для кода пользователя, который обращается к данным из структуры ML, такой как Tensorflow или Pytorch. У читателя есть несколько функций, таких как:
Чтение набора данных просто с использованием класса petastorm.reader.Reader , который можно создать с помощью метода petastorm.make_reader Factory:
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... и file://... поддерживаются протоколы URL.
Как только Reader создается, вы можете использовать его в качестве итератора.
Чтобы связать считывателя с графиком TensorFlow, вы можете использовать функцию tf_tensors :
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) В качестве альтернативы, вы можете использовать новый tf.data.Dataset API;
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) Как показано в pytorch_example.py, чтение набора данных Petastorm от Pytorch может быть выполнено с помощью адаптерного класса petastorm.pytorch.DataLoader , который позволяет поставляться на заказ и преобразования.
Убедитесь, что у вас установлена torch и torchvision :
pip install torchvision Минималистский пример ниже предполагает определение Net класса и функций train и test , включенных в pytorch_example :
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) Если вы работаете с очень большими размерами партии и не нуждаетесь в поддержке десятичных/струн, мы предоставляем petastorm.pytorch.BatchedDataLoader , который может буферизировать с использованием тензоров горелки ( cpu или cuda ) с значительно более высокой пропускной способностью.
Если размер вашего набора данных может вписаться в системную память, вы можете использовать версию в памяти DataLoader petastorm.pytorch.InMemBatchedDataLoader . Этот DataLoader только один раз считывает набор данных и кэширует данные в памяти, чтобы избежать дополнительного ввода -вывода для нескольких эпох.
API API Spark Converter упрощает преобразование данных от Spark в Tensorflow или Pytorch. Входной Spark DataFrame сначала материализуется в формате паркета, а затем загружается как tf.data.Dataset или torch.utils.data.DataLoader .
Приведенный ниже пример минималистского примера предполагает определение скомпилированной модели tf.keras и Spark DataFrame, содержащей столбец функции, за которым следует столбец метки.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () Минималистский пример ниже предполагает определение Net класса и функций train и test , включенных в pytorch_example.py, и Spark DataFrame, содержащий столбец функции, за которым следует столбец метки.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()Набор данных Petastorm можно прочитать в Spark DataFrame, используя Pyspark, где вы можете использовать широкий спектр инструментов Spark для анализа и манипулирования набором данных.
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL можно использовать для запроса набора данных Petastorm:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()Вы можете найти полный образец кода здесь: pyspark_hello_world.py,
Петисторм также можно использовать для чтения данных непосредственно из магазинов Apache Parquet. Чтобы достичь этого, используйте make_batch_reader (а не make_reader ). В следующей таблице приведены различия функций make_batch_reader и make_reader .
make_reader | make_batch_reader |
|---|---|
| Только наборы данных Petastorm (созданные с использованием materialize_dataset) | Любой паркетный магазин (некоторые нативные типы столбцов паркета еще не поддерживаются. |
| Читатель возвращает по одной записи за раз. | Читатель возвращает партии записей. Размер партии не фиксируется и определяется размером группы паркетов. |
Предизаты передаются в make_reader , оцениваются в одну строку. | Предизаты переданы в make_batch_reader оценены на партию. |
Может отфильтровать паркетный файл на основе аргумента filters . | Может отфильтровать файл паркета на основе аргумента filters |
Посмотрите страницу устранения неполадок и отправьте билет, если вы не можете найти ответ.
Мы предпочитаем получать взносы в форме запросов GitHub. Пожалуйста, отправьте запросы на привлечение против репозитория github.com/uber/petastorm .
Чтобы внести свой патч:
Заранее спасибо за ваш вклад!
См. Развитие информации, связанной с развитием.