
내용물
Petastorm은 Uber ATG에서 개발 된 오픈 소스 데이터 액세스 라이브러리입니다. 이 라이브러리를 사용하면 Apache Parquet 형식의 데이터 세트에서 직접 딥 러닝 모델에 대한 단일 기계 또는 분산 교육 및 평가가 가능합니다. Petastorm은 Tensorflow, Pytorch 및 Pyspark와 같은 인기있는 Python 기반 머신 러닝 (ML) 프레임 워크를 지원합니다. 순수한 파이썬 코드에서도 사용할 수 있습니다.
문서 웹 사이트 : https://petastorm.readthedocs.io
pip install petastorm petastorm 패키지에 의해 자동으로 설치되지 않은 몇 가지 추가 종속성이 있습니다. 엑스트라는 tf , tf_gpu , torch , opencv , docs , test 입니다.
예를 들어 Tensorflow 및 OpenCV의 GPU 버전 설치를 트리거하려면 다음 PIP 명령을 사용하십시오.
pip install petastorm[opencv,tf_gpu]Petastorm을 사용하여 생성 된 데이터 세트는 Apache Parquet 형식으로 저장됩니다. Parquet 스키마 외에 Petastorm은 다차원 배열을 Petastorm 데이터 세트의 기본 부분으로 만드는 고급 스키마 정보를 저장합니다.
Petastorm은 확장 가능한 데이터 코덱을 지원합니다. 이를 통해 사용자는 표준 데이터 압축 (JPEG, PNG) 중 하나를 사용하거나 자신의 자체를 구현할 수 있습니다.
데이터 세트 생성은 PySpark를 사용하여 수행됩니다. PySpark는 기본적으로 파크 형식을 지원하므로 단일 시스템 또는 Spark Compute 클러스터에서 쉽게 실행할 수 있습니다. 다음은 임의의 데이터가 포함 된 테이블을 작성하는 최소한의 예입니다.
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema 는 Unischema 객체의 인스턴스입니다. Unischema Spark StructType , Tensorflow tf.DType 및 Numpy numpy.dtype 와 같은 다른 프레임 워크 특정 형식으로 유형의 필드를 다른 프레임 워크 특정 형식으로 렌더링 할 수 있습니다.type , shape , codec 인스턴스 및 필드가 Unischema 의 각 필드에 대해 무효가 될 수 있는지 여부를 지정해야합니다..master('local[2]') ). 물론 대규모 데이터 세트 생성의 경우 실제 컴퓨팅 클러스터가 필요합니다.materialize_dataset 컨텍스트 관리자로 래핑합니다. Context Manager는 처음에 행 그룹 크기를 구성하고 끝에 Petastorm 특정 메타 데이터를 작성해야합니다.row_generator 기능을 사용합니다.dict_to_spark_row 스키마 HelloWorldSchema 준수 (모양, 유형 및 IS-nullable 조건이 테스트)를 보장하면서 사전을 pyspark.Row 객체로 변환합니다.pyspark.DataFrame 이 있으면 오르 쿠트 스토리지에 작성합니다. 마루 스키마는 자동으로 HelloWorldSchema 에서 파생됩니다. petastorm.reader.Reader 클래스는 Tensorflow 또는 Pytorch와 같은 ML 프레임 워크의 데이터에 액세스하는 사용자 코드의 주요 항목 지점입니다. 독자는 다음과 같은 여러 기능을 가지고 있습니다.
petastorm.reader.Reader 클래스를 사용하여 데이터 세트를 읽는 것은 petastorm.make_reader Factory 메소드를 사용하여 간단합니다.
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... 그리고 file://... 는 지원되는 URL 프로토콜입니다.
Reader 가 인스턴스화되면 반복자로 사용할 수 있습니다.
독자를 Tensorflow 그래프로 연결하려면 tf_tensors 기능을 사용할 수 있습니다.
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) 또는 새로운 tf.data.Dataset API를 사용할 수 있습니다.
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) pytorch_example.py에 설명 된 바와 같이, pytorch에서 petastorm 데이터 세트를 읽는 것은 어댑터 클래스 petastorm.pytorch.DataLoader 를 통해 수행 할 수 있으며,이를 통해 사용자 정의 Pytorch Collating Function 및 변환을 제공 할 수 있습니다.
torch 와 torchvision 설치되어 있는지 확인하십시오.
pip install torchvision 아래의 미니멀리스트 예는 pytorch_example 에 포함 된 Net 클래스 및 train 및 test 기능의 정의를 가정합니다.
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) 매우 큰 배치 크기로 작업하고 있으며 소수/문자열에 대한 지원이 필요하지 않은 경우 petastorm.pytorch.BatchedDataLoader 를 제공하여 토치 텐서 ( cpu 또는 cuda )를 대표적으로 높은 처리량으로 버퍼 할 수 있습니다.
데이터 세트의 크기가 시스템 메모리에 들어갈 수있는 경우 Memory 버전 Dataloader petastorm.pytorch.InMemBatchedDataLoader 를 사용할 수 있습니다. 이 데이터 로더는 데이터 세트를 한 번만 읽고 메모리에 데이터를 캐시하여 여러 에포크에 대한 추가 I/O를 피합니다.
Spark Converter API는 Spark에서 Tensorflow 또는 Pytorch로 데이터 변환을 단순화합니다. 입력 스파크 데이터 프레임은 먼저 파크 형식으로 구체화 된 다음 tf.data.Dataset 또는 torch.utils.data.DataLoader 로로드됩니다.
아래의 미니멀리스트 예는 컴파일 된 tf.keras 모델의 정의와 기능 열을 포함하는 스파크 데이터 프레임과 라벨 열을 포함합니다.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () 아래의 미니멀리스트 예는 pytorch_example.py에 포함 된 Net 클래스 및 train 및 test 기능의 정의와 기능 열을 포함하는 스파크 데이터 프레임과 레이블 열을 포함합니다.
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()Pyspark를 사용하여 Petastorm 데이터 세트를 Spark Dataframe으로 읽을 수 있으며, 여기서 광범위한 Spark 도구를 사용하여 데이터 세트를 분석하고 조작 할 수 있습니다.
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL은 Petastorm 데이터 세트를 쿼리하는 데 사용될 수 있습니다.
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()전체 코드 샘플을 여기에서 찾을 수 있습니다 : pyspark_hello_world.py,
Petastorm은 또한 Apache Parquet Stores에서 직접 데이터를 읽는 데 사용될 수 있습니다. 이를 달성하려면 make_batch_reader (그리고 make_reader 아님)를 사용하십시오. 다음 표는 make_batch_reader 및 make_reader 함수 차이를 요약합니다.
make_reader | make_batch_reader |
|---|---|
| Petastorm 데이터 세트 만 (ratiousizes_dataset을 사용하여 생성) | 모든 마모 상점 (일부 기본 파크 컬럼 유형은 아직 지원되지 않습니다. |
| 독자는 한 번에 한 레코드를 반환합니다. | 독자는 레코드 배치를 반환합니다. 배치의 크기는 고정되지 않았으며 오르 켓 행 그룹 크기로 정의됩니다. |
make_reader 로 전달 된 Predicates는 단일 행 당 평가됩니다. | make_batch_reader 로 전달 된 Predicates는 배치 당 평가됩니다. |
filters 인수를 기반으로 파일 파일을 필터링 할 수 있습니다. | filters 인수를 기반으로 파일 파일을 필터링 할 수 있습니다 |
문제 해결 페이지를보고 답을 찾을 수 없으면 티켓을 제출하십시오.
우리는 GitHub PULL 요청의 형태로 기부금을받는 것을 선호합니다. github.com/uber/petastorm 리포지토리에 대한 풀 요청을 보내주십시오.
패치를 기여하려면 :
귀하의 기여에 미리 감사드립니다!
개발 관련 정보 개발을 참조하십시오.