
內容
Petastorm是Uber ATG開發的開源數據訪問庫。該庫可以直接從Apache Parquet格式的數據集中直接從數據集中對單個機器或分佈式培訓進行評估。 Petastorm支持流行的基於Python的機器學習(ML)框架,例如Tensorflow,Pytorch和Pyspark。它也可以從純Python代碼中使用。
文檔網站:https://petastorm.readthedocs.io
pip install petastorm petastorm軟件包定義了幾個額外的依賴性,這些依賴性未自動安裝。附加功能是: tf , tf_gpu , torch , opencv , docs , test 。
例如,要觸發tensorflow和openCV的GPU版本的安裝,請使用以下pip命令:
pip install petastorm[opencv,tf_gpu]使用Petastorm創建的數據集以Apache Parquet格式存儲。除了鑲木架模式外,Patastorm還存儲了將多維陣列的高級模式信息存儲到Petastorm數據集的本地部分。
Patastorm支持可擴展的數據編解碼器。這些使用戶能夠使用標準數據壓縮之一(JPEG,PNG)或實現自己的。
生成數據集是使用Pyspark完成的。 Pyspark固有地支持鑲木格式,使在單台計算機或火花計算集群上易於運行。這是一個簡約的示例,寫了一個帶有一些隨機數據的表。
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema是一個Unischema對象的實例。 Unischema能夠將其字段的類型渲染為不同框架的格式,例如:Spark StructType ,Tensorflow tf.DType和numpy numpy.dtype 。type , shape , codec實例以及該字段是否可用於Unischema的每個字段。.master('local[2]') )上啟動Pyspark。當然,對於大型數據集生成,我們需要一個真正的計算集群。materialize_dataset上下文管理器包裝。上下文管理器負責在開始時配置行組大小,並在最後寫出Petastorm特定的元數據。row_generator函數。dict_to_spark_row將字典轉換為pyspark.Row對象,同時確保schema HelloWorldSchema complience(形狀,類型和可否則的條件)。pyspark.DataFrame我們就會將其寫入鑲木庫存儲。鑲木架模式自動衍生自HelloWorldSchema 。 petastorm.reader.Reader類是用戶代碼的主要入口處,該輸入點從ML框架(例如Tensorflow或Pytorch)訪問數據。讀者俱有多個功能,例如:
使用petastorm.reader.Reader類讀取數據集很簡單,可以使用petastorm.make_reader Factory方法創建數據集:
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://...和file://...受支持的URL協議。
Reader實例化後,您可以將其用作迭代器。
要將讀取器連接到TensorFlow圖中,您可以使用tf_tensors函數:
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors ))另外,您可以使用新的tf.data.Dataset api;
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id )如pytorch_example.py所示,可以通過適配器類petastorm.pytorch.DataLoader讀取Patastorm數據集,該數據允許自定義的Pytorch整理函數並轉換為提供。
確保已安裝了torch和torchvision :
pip install torchvision下面的極簡主義示例假定了pytorch_example中包含的Net類, train和test功能的定義:
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader )如果您使用的批量非常大,並且不需要對十進制/字符串的支持,我們會提供petastorm.pytorch.BatchedDataLoader ,可以使用火炬張量( cpu或cuda )緩衝,並具有更高的吞吐量。
如果數據集的大小可以適合系統內存,則可以使用內存版本dataLoader petastorm.pytorch.InMemBatchedDataLoader 。該數據加載器僅讀取數據集,並在內存中緩存數據,以避免多個時期的I/O。
Spark Converter API簡化了從Spark到TensorFlow或Pytorch的數據轉換。輸入火花數據框首先以鑲木格式實現,然後加載為tf.data.Dataset或torch.utils.data.DataLoader 。
下面的極簡主義示例假定了編譯的tf.keras模型的定義和包含特徵列的Spark DataFrame,然後是標籤列。
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete ()下面的極簡主義示例假定了Net類, train和test功能的定義,包括pytorch_example.py,以及包含功能列的Spark DataFrame,然後是標籤列。
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()可以使用Pyspark將Petastorm數據集讀取到Spark DataFrame中,您可以在其中使用廣泛的Spark工具來分析和操縱數據集。
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL可用於查詢Patastorm數據集:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()您可以在此處找到一個完整的代碼示例:pyspark_hello_world.py,
Petastorm也可用於直接從Apache Parquet商店讀取數據。為此,請使用make_batch_reader (而不是make_reader )。下表總結了make_batch_reader和make_reader函數的差異。
make_reader | make_batch_reader |
|---|---|
| 只有Petastorm數據集(使用Moletializes_DataSet創建) | 任何鑲木店(某些本機鑲木列類型尚未支持)。 |
| 讀者一次返回一個記錄。 | 讀者返回記錄批次。批處理的大小未固定,並由鑲木式行組大小定義。 |
每行評估傳遞給make_reader謂詞。 | 每個批次評估傳遞給make_batch_reader謂詞。 |
可以根據filters參數過濾木准文件。 | 可以根據filters參數過濾鑲木quet文件 |
請參閱故障排除頁面,如果找不到答案,請提交票。
我們更喜歡以GitHub拉的請求的形式獲得貢獻。請針對github.com/uber/petastorm存儲庫發送拉動請求。
貢獻一個補丁:
預先感謝您的貢獻!
有關開發相關信息,請參見開發。