
สารบัญ
Petastorm เป็นไลบรารีการเข้าถึงข้อมูลโอเพ่นซอร์สที่พัฒนาขึ้นที่ Uber ATG ห้องสมุดนี้เปิดใช้งานเครื่องเดียวหรือการฝึกอบรมแบบกระจายและการประเมินผลของแบบจำลองการเรียนรู้ลึกโดยตรงจากชุดข้อมูลในรูปแบบ Apache Parquet Petastorm รองรับเฟรมเวิร์กการเรียนรู้ของเครื่อง Python (ML) ยอดนิยมเช่น Tensorflow, Pytorch และ Pyspark นอกจากนี้ยังสามารถใช้ได้จากรหัส Python บริสุทธิ์
เว็บไซต์เอกสาร: https://petastorm.readthedocs.io
pip install petastorm มีการพึ่งพาพิเศษหลายอย่างที่กำหนดโดยแพ็คเกจ petastorm ที่ไม่ได้ติดตั้งโดยอัตโนมัติ สิ่งพิเศษคือ: tf , tf_gpu , torch , opencv , docs , test
ตัวอย่างเช่นการติดตั้ง TensorFlow และ OpenCV เวอร์ชัน GPU ให้ใช้คำสั่ง PIP ต่อไปนี้:
pip install petastorm[opencv,tf_gpu]ชุดข้อมูลที่สร้างขึ้นโดยใช้ Petastorm จะถูกเก็บไว้ในรูปแบบ Apache Parquet ด้านบนของสคีมา Parquet Petastorm ยังเก็บข้อมูลสคีมาระดับสูงกว่าซึ่งทำให้อาร์เรย์หลายมิติเป็นส่วนหนึ่งของชุดข้อมูล Petastorm
Petastorm รองรับตัวแปลงสัญญาณข้อมูลที่ขยายได้ สิ่งเหล่านี้ช่วยให้ผู้ใช้สามารถใช้หนึ่งในการบีบอัดข้อมูลมาตรฐาน (JPEG, PNG) หรือใช้งานของเธอเอง
การสร้างชุดข้อมูลทำได้โดยใช้ pyspark Pyspark รองรับรูปแบบ Parquet ทำให้ง่ายต่อการทำงานบนเครื่องเดียวหรือบนคลัสเตอร์การคำนวณประกายไฟ นี่คือตัวอย่างที่เรียบง่ายในการเขียนตารางที่มีข้อมูลแบบสุ่ม
import numpy as np
from pyspark . sql import SparkSession
from pyspark . sql . types import IntegerType
from petastorm . codecs import ScalarCodec , CompressedImageCodec , NdarrayCodec
from petastorm . etl . dataset_metadata import materialize_dataset
from petastorm . unischema import dict_to_spark_row , Unischema , UnischemaField
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema ( 'HelloWorldSchema' , [
UnischemaField ( 'id' , np . int32 , (), ScalarCodec ( IntegerType ()), False ),
UnischemaField ( 'image1' , np . uint8 , ( 128 , 256 , 3 ), CompressedImageCodec ( 'png' ), False ),
UnischemaField ( 'array_4d' , np . uint8 , ( None , 128 , 30 , None ), NdarrayCodec (), False ),
])
def row_generator ( x ):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return { 'id' : x ,
'image1' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 128 , 256 , 3 )),
'array_4d' : np . random . randint ( 0 , 255 , dtype = np . uint8 , size = ( 4 , 128 , 30 , 3 ))}
def generate_petastorm_dataset ( output_url = 'file:///tmp/hello_world_dataset' ):
rowgroup_size_mb = 256
spark = SparkSession . builder . config ( 'spark.driver.memory' , '2g' ). master ( 'local[2]' ). getOrCreate ()
sc = spark . sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
rows_count = 10
with materialize_dataset ( spark , output_url , HelloWorldSchema , rowgroup_size_mb ):
rows_rdd = sc . parallelize ( range ( rows_count ))
. map ( row_generator )
. map ( lambda x : dict_to_spark_row ( HelloWorldSchema , x ))
spark . createDataFrame ( rows_rdd , HelloWorldSchema . as_spark_schema ())
. coalesce ( 10 )
. write
. mode ( 'overwrite' )
. parquet ( output_url )HelloWorldSchema เป็นตัวอย่างของวัตถุ Unischema Unischema มีความสามารถในการเรนเดอร์ประเภทของฟิลด์ในรูปแบบเฉพาะเฟรมเวิร์กที่แตกต่างกันเช่น: Spark StructType , Tensorflow tf.DType และ Numpy numpy.dtypetype shape ตัว codec และไม่ว่าฟิลด์จะเป็นโมฆะสำหรับแต่ละฟิลด์ของ Unischema หรือไม่.master('local[2]') ) แน่นอนสำหรับการสร้างชุดข้อมูลขนาดใหญ่ขึ้นเราจะต้องมีคลัสเตอร์คำนวณจริงmaterialize_dataset Context Manager มีหน้าที่กำหนดขนาดกลุ่มแถวที่จุดเริ่มต้นและเขียนข้อมูลเมตาเฉพาะของ Petastorm ในตอนท้ายrow_generator สำหรับสิ่งนั้นdict_to_spark_row แปลงพจนานุกรมเป็นวัตถุ pyspark.Row ในขณะที่มั่นใจว่า Schema HelloWorldSchema การปฏิบัติตาม (รูปร่างประเภทและเงื่อนไขที่สามารถทดสอบได้)pyspark.DataFrame เราจะเขียนมันออกไปยังที่เก็บของ Parquet Schema Parquet นั้นได้มาจาก HelloWorldSchema โดยอัตโนมัติ คลาส petastorm.reader.Reader เป็นจุดเริ่มต้นหลักสำหรับรหัสผู้ใช้ที่เข้าถึงข้อมูลจากเฟรมเวิร์ก ML เช่น TensorFlow หรือ Pytorch ผู้อ่านมีคุณสมบัติหลายอย่างเช่น:
การอ่านชุดข้อมูลนั้นง่ายโดยใช้คลาส petastorm.reader.Reader ซึ่งสามารถสร้างได้โดยใช้วิธี petastorm.make_reader โรงงาน:
from petastorm import make_reader
with make_reader ( 'hdfs://myhadoop/some_dataset' ) as reader :
for row in reader :
print ( row ) hdfs://... และ file://... ได้รับการสนับสนุน URL โปรโตคอล
เมื่อ Reader มีการสร้างอินสแตนซ์คุณสามารถใช้เป็นตัววนซ้ำ
ในการเชื่อมต่อเครื่องอ่านลงในกราฟ Tensorflow คุณสามารถใช้ฟังก์ชัน tf_tensors :
from petastorm . tf_utils import tf_tensors
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
row_tensors = tf_tensors ( reader )
with tf . Session () as session :
for _ in range ( 3 ):
print ( session . run ( row_tensors )) หรือคุณสามารถใช้ tf.data.Dataset API ใหม่;
from petastorm . tf_utils import make_petastorm_dataset
with make_reader ( 'file:///some/localpath/a_dataset' ) as reader :
dataset = make_petastorm_dataset ( reader )
iterator = dataset . make_one_shot_iterator ()
tensor = iterator . get_next ()
with tf . Session () as sess :
sample = sess . run ( tensor )
print ( sample . id ) ดังที่แสดงใน pytorch_example.py การอ่านชุดข้อมูล petastorm จาก pytorch สามารถทำได้ผ่านคลาสอะแดปเตอร์ petastorm.pytorch.DataLoader ซึ่งช่วยให้ฟังก์ชั่นการรวบรวม pytorch แบบกำหนดเอง
ต้องแน่ใจว่าคุณติดตั้ง torch และ torchvision :
pip install torchvision ตัวอย่างที่เรียบง่ายด้านล่างสันนิษฐานว่าคำจำกัดความของคลาส Net และฟังก์ชั่น train และ test รวมอยู่ใน pytorch_example :
import torch
from petastorm . pytorch import DataLoader
torch . manual_seed ( 1 )
device = torch . device ( 'cpu' )
model = Net (). to ( device )
optimizer = torch . optim . SGD ( model . parameters (), lr = 0.01 , momentum = 0.5 )
def _transform_row ( mnist_row ):
transform = transforms . Compose ([
transforms . ToTensor (),
transforms . Normalize (( 0.1307 ,), ( 0.3081 ,))
])
return ( transform ( mnist_row [ 'image' ]), mnist_row [ 'digit' ])
transform = TransformSpec ( _transform_row , removed_fields = [ 'idx' ])
with DataLoader ( make_reader ( 'file:///localpath/mnist/train' , num_epochs = 10 ,
transform_spec = transform , seed = 1 , shuffle_rows = True ), batch_size = 64 ) as train_loader :
train ( model , device , train_loader , 10 , optimizer , 1 )
with DataLoader ( make_reader ( 'file:///localpath/mnist/test' , num_epochs = 10 ,
transform_spec = transform ), batch_size = 1000 ) as test_loader :
test ( model , device , test_loader ) หากคุณกำลังทำงานกับขนาดแบทช์ที่มีขนาดใหญ่มากและไม่จำเป็นต้องได้รับการสนับสนุนสำหรับทศนิยม/สตริงเราให้ petastorm.pytorch.BatchedDataLoader -batchedDataloader ที่สามารถบัฟเฟอร์โดยใช้เทนเซอร์ไฟฉาย ( cpu หรือ cuda ) ที่มีปริมาณงานที่สูงขึ้นอย่างมีนัยสำคัญ
หากขนาดของชุดข้อมูลของคุณสามารถพอดีกับหน่วยความจำระบบคุณสามารถใช้ DataLoader petastorm.pytorch.InMemBatchedDataLoader เวอร์ชันในหน่วยความจำ dataloader นี้จะอ่านชุดข้อมูลเพียงครั้งเดียวและแคชข้อมูลในหน่วยความจำเพื่อหลีกเลี่ยง I/O เพิ่มเติมสำหรับหลายยุค
Spark Converter API ทำให้การแปลงข้อมูลง่ายขึ้นจาก Spark เป็น TensorFlow หรือ Pytorch อินพุต Spark Dataframe เป็นครั้งแรกในรูปแบบ Parquet จากนั้นโหลดเป็น tf.data.Dataset หรือ torch.utils.data.DataLoader
ตัวอย่างมินิมัลลิสต์ด้านล่างสันนิษฐานว่าคำจำกัดความของโมเดล tf.keras ที่คอมไพล์และ Spark Dataframe ที่มีคอลัมน์คุณสมบัติตามด้วยคอลัมน์ฉลาก
from petastorm . spark import SparkDatasetConverter , make_spark_converter
import tensorflow . compat . v1 as tf # pylint: disable=import-error
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df = ... # `df` is a spark dataframe
# create a converter from `df`
# it will materialize `df` to cache dir.
converter = make_spark_converter ( df )
# make a tensorflow dataset from `converter`
with converter . make_tf_dataset () as dataset :
# the `dataset` is `tf.data.Dataset` object
# dataset transformation can be done if needed
dataset = dataset . map (...)
# we can train/evaluate model on the `dataset`
model . fit ( dataset )
# when exiting the context, the reader of the dataset will be closed
# delete the cached files of the dataframe.
converter . delete () ตัวอย่างที่เรียบง่ายด้านล่างสันนิษฐานว่าคำจำกัดความของคลาส Net และฟังก์ชั่น train และ test รวมอยู่ใน pytorch_example.py และ Spark Dataframe ที่มีคอลัมน์คุณสมบัติตามด้วยคอลัมน์ฉลาก
from petastorm . spark import SparkDatasetConverter , make_spark_converter
# specify a cache dir first.
# the dir is used to save materialized spark dataframe files
spark . conf . set ( SparkDatasetConverter . PARENT_CACHE_DIR_URL_CONF , 'hdfs:/...' )
df_train , df_test = ... # `df_train` and `df_test` are spark dataframes
model = Net ()
# create a converter_train from `df_train`
# it will materialize `df_train` to cache dir. (the same for df_test)
converter_train = make_spark_converter ( df_train )
converter_test = make_spark_converter ( df_test )
# make a pytorch dataloader from `converter_train`
with converter_train . make_torch_dataloader () as dataloader_train :
# the `dataloader_train` is `torch.utils.data.DataLoader` object
# we can train model using the `dataloader_train`
train ( model , dataloader_train , ...)
# when exiting the context, the reader of the dataset will be closed
# the same for `converter_test`
with converter_test . make_torch_dataloader () as dataloader_test :
test ( model , dataloader_test , ...)
# delete the cached files of the dataframes.
converter_train . delete ()
converter_test . delete ()ชุดข้อมูล Petastorm สามารถอ่านได้ใน Spark Dataframe โดยใช้ Pyspark ซึ่งคุณสามารถใช้เครื่องมือ Spark ที่หลากหลายเพื่อวิเคราะห์และจัดการชุดข้อมูล
# Create a dataframe object from a parquet file
dataframe = spark . read . parquet ( dataset_url )
# Show a schema
dataframe . printSchema ()
# Count all
dataframe . count ()
# Show a single column
dataframe . select ( 'id' ). show ()SQL สามารถใช้ในการสืบค้นชุดข้อมูล Petastorm:
spark . sql (
'SELECT count(id) '
'from parquet.`file:///tmp/hello_world_dataset`' ). collect ()คุณสามารถค้นหาตัวอย่างรหัสเต็มได้ที่นี่: pyspark_hello_world.py,
Petastorm ยังสามารถใช้อ่านข้อมูลโดยตรงจากร้านค้า Apache Parquet เพื่อให้ได้สิ่งนั้นให้ใช้ make_batch_reader (และไม่ใช่ make_reader ) ตารางต่อไปนี้สรุปความแตกต่างของฟังก์ชั่น make_batch_reader และ make_reader
make_reader | make_batch_reader |
|---|---|
| เฉพาะชุดข้อมูล petastorm (สร้างโดยใช้ aterializes_dataset) | ร้านค้า Parquet ใด ๆ (คอลัมน์ Parquet บางประเภทยังไม่รองรับ |
| ผู้อ่านส่งคืนหนึ่งครั้งในแต่ละครั้ง | ผู้อ่านส่งคืนแบตช์ของระเบียน ขนาดของแบทช์ไม่ได้รับการแก้ไขและกำหนดโดยขนาดกลุ่มแถว Parquet |
เพรดิเคตที่ส่งผ่านไปยัง make_reader ได้รับการประเมินต่อแถวเดียว | เพรดิเคตที่ส่งผ่านไปยัง make_batch_reader ได้รับการประเมินต่อแบทช์ |
สามารถกรองไฟล์ Parquet ตามอาร์กิวเมนต์ตัว filters | สามารถกรองไฟล์ Parquet ตามอาร์กิวเมนต์ตัว filters |
ดูหน้าการแก้ไขปัญหาและโปรดส่งตั๋วหากคุณไม่พบคำตอบ
เราชอบที่จะได้รับการสนับสนุนในรูปแบบของคำขอดึง GitHub กรุณาส่งคำขอดึงกับ github.com/uber/petastorm ที่เก็บข้อมูล
เพื่อสนับสนุนแพตช์:
ขอบคุณล่วงหน้าสำหรับการมีส่วนร่วมของคุณ!
ดูการพัฒนาเพื่อการพัฒนาข้อมูลที่เกี่ยวข้อง