
内容
Petastorm是Uber ATG开发的开源数据访问库。该库可以直接从Apache Parquet格式的数据集中直接从数据集中对单个机器或分布式培训进行评估。 Petastorm支持流行的基于Python的机器学习(ML)框架,例如Tensorflow,Pytorch和Pyspark。它也可以从纯Python代码中使用。
文档网站:https://petastorm.readthedocs.io
pip install petastorm petastorm软件包定义了几个额外的依赖性,这些依赖性未自动安装。附加功能是: tf , tf_gpu , torch , opencv , docs , test 。
例如,要触发tensorflow和openCV的GPU版本的安装,请使用以下pip命令:
pip install petastorm[opencv,tf_gpu]使用Petastorm创建的数据集以Apache Parquet格式存储。除了镶木架模式外,Patastorm还存储了将多维阵列的高级模式信息存储到Petastorm数据集的本地部分。
Patastorm支持可扩展的数据编解码器。这些使用户能够使用标准数据压缩之一(JPEG,PNG)或实现自己的。
生成数据集是使用Pyspark完成的。 Pyspark固有地支持镶木格式,使在单台计算机或火花计算集群上易于运行。这是一个简约的示例,写了一个带有一些随机数据的表。
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema是一个Unischema对象的实例。 Unischema能够将其字段的类型渲染为不同框架的格式,例如:Spark StructType ,Tensorflow tf.DType和numpy numpy.dtype 。type , shape , codec实例以及该字段是否可用于Unischema的每个字段。.master('local[2]') )上启动Pyspark。当然,对于大型数据集生成,我们需要一个真正的计算集群。materialize_dataset上下文管理器包装。上下文管理器负责在开始时配置行组大小,并在最后写出Petastorm特定的元数据。row_generator函数。dict_to_spark_row将字典转换为pyspark.Row对象,同时确保schema HelloWorldSchema complience(形状,类型和可否则的条件)。pyspark.DataFrame我们就会将其写入镶木库存储。镶木架模式自动衍生自HelloWorldSchema 。 petastorm.reader.Reader类是用户代码的主要入口处,该输入点从ML框架(例如Tensorflow或Pytorch)访问数据。读者具有多个功能,例如:
使用petastorm.reader.Reader类读取数据集很简单,可以使用petastorm.make_reader Factory方法创建数据集:
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://...和file://...受支持的URL协议。
Reader实例化后,您可以将其用作迭代器。
要将读取器连接到TensorFlow图中,您可以使用tf_tensors函数:
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors ))另外,您可以使用新的tf.data.Dataset api;
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id )如pytorch_example.py所示,可以通过适配器类petastorm.pytorch.DataLoader读取Patastorm数据集,该数据允许自定义的Pytorch整理函数并转换为提供。
确保已安装了torch和torchvision :
pip install torchvision下面的极简主义示例假定了pytorch_example中包含的Net类, train和test功能的定义:
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader )如果您使用的批量非常大,并且不需要对十进制/字符串的支持,我们会提供petastorm.pytorch.BatchedDataLoader ,可以使用火炬张量( cpu或cuda )缓冲,并具有更高的吞吐量。
如果数据集的大小可以适合系统内存,则可以使用内存版本dataLoader petastorm.pytorch.InMemBatchedDataLoader 。该数据加载器仅读取数据集,并在内存中缓存数据,以避免多个时期的I/O。
Spark Converter API简化了从Spark到TensorFlow或Pytorch的数据转换。输入火花数据框首先以镶木格式实现,然后加载为tf.data.Dataset或torch.utils.data.DataLoader 。
下面的极简主义示例假定了编译的tf.keras模型的定义和包含特征列的Spark DataFrame,然后是标签列。
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete ()下面的极简主义示例假定了Net类, train和test功能的定义,包括pytorch_example.py,以及包含功能列的Spark DataFrame,然后是标签列。
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()可以使用Pyspark将Petastorm数据集读取到Spark DataFrame中,您可以在其中使用广泛的Spark工具来分析和操纵数据集。
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL可用于查询Patastorm数据集:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()您可以在此处找到一个完整的代码示例:pyspark_hello_world.py,
Petastorm也可用于直接从Apache Parquet商店读取数据。为此,请使用make_batch_reader (而不是make_reader )。下表总结了make_batch_reader和make_reader函数的差异。
make_reader | make_batch_reader |
|---|---|
| 只有Petastorm数据集(使用Moletializes_DataSet创建) | 任何镶木店(某些本机镶木列类型尚未支持)。 |
| 读者一次返回一个记录。 | 读者返回记录批次。批处理的大小未固定,并由镶木式行组大小定义。 |
每行评估传递给make_reader谓词。 | 每个批次评估传递给make_batch_reader谓词。 |
可以根据filters参数过滤木准文件。 | 可以根据filters参数过滤镶木quet文件 |
请参阅故障排除页面,如果找不到答案,请提交票。
我们更喜欢以GitHub拉的请求的形式获得贡献。请针对github.com/uber/petastorm存储库发送拉动请求。
贡献一个补丁:
预先感谢您的贡献!
有关开发相关信息,请参见开发。