
![]()
การให้บริการแบบจำลองมีประสิทธิภาพในคลาวด์

MOSEC เป็นกรอบการให้บริการแบบจำลองประสิทธิภาพสูงและยืดหยุ่นสำหรับการสร้างแบ็กเอนด์ที่เปิดใช้งาน ML และ Microservices มันเชื่อมช่องว่างระหว่างรูปแบบการเรียนรู้ของเครื่องใด ๆ ที่คุณเพิ่งฝึกฝนและ API บริการออนไลน์ที่มีประสิทธิภาพ
Mosec ต้องการ Python 3.7 หรือสูงกว่า ติดตั้งแพ็คเกจ PYPI ล่าสุดสำหรับ Linux X86_64 หรือ MacOS X86_64/ARM64 ด้วย:
pip install -U mosec
# or install with conda
conda install conda-forge::mosecหากต้องการสร้างจากซอร์สโค้ดให้ติดตั้ง Rust และเรียกใช้คำสั่งต่อไปนี้:
make package คุณจะได้รับไฟล์ล้อ MOSEC ในโฟลเดอร์ dist
เราแสดงให้เห็นว่า MOSEC สามารถช่วยให้คุณโฮสต์รูปแบบการแพร่กระจายที่มั่นคงได้รับการฝึกอบรมมาก่อนเป็นบริการได้อย่างไร คุณต้องติดตั้ง diffusers และ transformers เป็นสิ่งที่จำเป็นต้องมี:
pip install --upgrade diffusers[torch] transformersประการแรกเรานำเข้าไลบรารีและตั้งค่าเครื่องบันทึกพื้นฐานเพื่อสังเกตสิ่งที่เกิดขึ้นได้ดีขึ้น
from io import BytesIO
from typing import List
import torch # type: ignore
from diffusers import StableDiffusionPipeline # type: ignore
from mosec import Server , Worker , get_logger
from mosec . mixin import MsgpackMixin
logger = get_logger ()จากนั้นเรา สร้าง API สำหรับลูกค้าเพื่อสอบถามพรอมต์ข้อความและรับภาพตามโมเดลที่มีความเสถียร Diffusion-V1-5 ในเวลาเพียง 3 ขั้นตอน
กำหนดบริการของคุณเป็นคลาสที่สืบทอด mosec.Worker ที่นี่เรายังสืบทอด MsgpackMixin เพื่อใช้รูปแบบการทำให้เป็นอนุกรม msgpack (A)
ภายในวิธี __init__ เริ่มต้นโมเดลของคุณและวางลงบนอุปกรณ์ที่เกี่ยวข้อง ทางเลือกคุณสามารถกำหนด self.example ด้วยข้อมูลบางอย่างเพื่ออุ่นเครื่อง (b) โมเดล โปรดทราบว่าข้อมูลควรเข้ากันได้กับรูปแบบอินพุตของตัวจัดการของคุณซึ่งเราให้รายละเอียดต่อไป
แทนที่วิธี forward เพื่อเขียนตัวจัดการบริการของคุณ (c) โดยมีลายเซ็น forward(self, data: Any | List[Any]) -> Any | List[Any] การรับ/ส่งคืนรายการเดียวหรือ tuple ขึ้นอยู่กับว่ามีการกำหนดค่าการแบตช์แบบไดนามิก (D) หรือไม่
class StableDiffusion ( MsgpackMixin , Worker ):
def __init__ ( self ):
self . pipe = StableDiffusionPipeline . from_pretrained (
"sd-legacy/stable-diffusion-v1-5" , torch_dtype = torch . float16
)
self . pipe . enable_model_cpu_offload ()
self . example = [ "useless example prompt" ] * 4 # warmup (batch_size=4)
def forward ( self , data : List [ str ]) -> List [ memoryview ]:
logger . debug ( "generate images for %s" , data )
res = self . pipe ( data )
logger . debug ( "NSFW: %s" , res [ 1 ])
images = []
for img in res [ 0 ]:
dummy_file = BytesIO ()
img . save ( dummy_file , format = "JPEG" )
images . append ( dummy_file . getbuffer ())
return images[!บันทึก]
(a) ในตัวอย่างนี้เราส่งคืนรูปภาพในรูปแบบไบนารีซึ่ง JSON ไม่รองรับ (เว้นแต่จะเข้ารหัสด้วย Base64 ที่ทำให้น้ำหนักบรรทุกมีขนาดใหญ่ขึ้น) ดังนั้น Msgpack จึงเหมาะกับความต้องการของเราดีกว่า หากเราไม่ได้รับมรดก
MsgpackMixinJSON จะถูกใช้โดยค่าเริ่มต้น กล่าวอีกนัยหนึ่งโปรโตคอลของคำขอ/การตอบสนองบริการสามารถเป็น Msgpack, JSON หรือรูปแบบอื่น ๆ (ตรวจสอบ mixins ของเรา)(b) การอุ่นเครื่องมักจะช่วยจัดสรรหน่วยความจำ GPU ล่วงหน้า หากมีการระบุตัวอย่างการอุ่นเครื่องบริการจะพร้อมหลังจากตัวอย่างจะถูกส่งต่อผ่านตัวจัดการ อย่างไรก็ตามหากไม่ได้รับตัวอย่างคาดว่าจะมีความล่าช้าในการร้องขอครั้งแรกนานขึ้น
exampleควรตั้งค่าเป็นรายการเดียวหรือ tuple ขึ้นอยู่กับforwardที่คาดหวังว่าจะได้รับ ยิ่งไปกว่านั้นในกรณีที่คุณต้องการอุ่นเครื่องด้วยตัวอย่างที่แตกต่างกันหลายตัวอย่างคุณอาจตั้งค่าmulti_examples(ตัวอย่างที่นี่)(c) ตัวอย่างนี้แสดงบริการขั้นตอนเดียวที่ผู้ปฏิบัติงาน
StableDiffusionใช้โดยตรงในคำขอพร้อมท์ของไคลเอ็นต์และตอบกลับภาพ ดังนั้นforwardถือได้ว่าเป็นตัวจัดการบริการที่สมบูรณ์ อย่างไรก็ตามเรายังสามารถออกแบบบริการหลายขั้นตอนกับคนงานที่ทำงานที่แตกต่างกัน (เช่นการดาวน์โหลดรูปภาพการอนุมานแบบจำลองการโพสต์การประมวลผล) ในท่อ ในกรณีนี้ไปป์ไลน์ทั้งหมดถือเป็นตัวจัดการบริการโดยมีคนงานคนแรกที่ได้รับการร้องขอและคนงานคนสุดท้ายที่ส่งการตอบกลับ การไหลของข้อมูลระหว่างคนงานทำได้โดยการสื่อสารระหว่างกระบวนการ(d) เนื่องจากการเปิดใช้งานการแบตช์แบบไดนามิกในตัวอย่างนี้วิธี
forwardจะได้รับ รายการ สตริงเช่น['a cute cat playing with a red ball', 'a man sitting in front of a computer', ...], รวมจากลูกค้าที่แตกต่างกันสำหรับ การอนุมานแบทช์
ในที่สุดเราผนวกผู้ปฏิบัติงานเข้ากับเซิร์ฟเวอร์เพื่อสร้างเวิร์กโฟลว์ ขั้นตอนเดียว (หลายขั้นตอนสามารถถูกส่งไปป์เพื่อเพิ่มปริมาณงานดูตัวอย่างนี้) และระบุจำนวนกระบวนการที่เราต้องการให้ทำงานในแบบขนาน ( num=1 max_batch_size=4 max_wait_time=10 เป็นมิลลิวินาทีซึ่งหมายถึงเวลาที่ยาวนานที่สุด Mosec รอจนกว่าจะส่งชุดไปยังคนงาน)
if __name__ == "__main__" :
server = Server ()
# 1) `num` specifies the number of processes that will be spawned to run in parallel.
# 2) By configuring the `max_batch_size` with the value > 1, the input data in your
# `forward` function will be a list (batch); otherwise, it's a single item.
server . append_worker ( StableDiffusion , num = 1 , max_batch_size = 4 , max_wait_time = 10 )
server . run ()ตัวอย่างด้านบนถูกรวมเข้ากับไฟล์ตัวอย่างของเรา คุณสามารถทำงานได้โดยตรงที่ระดับรากของโครงการ ก่อนอื่นเราได้ดูที่ อาร์กิวเมนต์บรรทัดคำสั่ง (คำอธิบายที่นี่):
python examples/stable_diffusion/server.py --helpจากนั้นเริ่มเซิร์ฟเวอร์ด้วยบันทึกการดีบัก:
python examples/stable_diffusion/server.py --log-level debug --timeout 30000 เปิด http://127.0.0.1:8000/openapi/swagger/ ในเบราว์เซอร์ของคุณเพื่อรับเอกสาร OpenAPI
และในเทอร์มินัลอื่นให้ทดสอบ:
python examples/stable_diffusion/client.py --prompt " a cute cat playing with a red ball " --output cat.jpg --port 8000คุณจะได้รับภาพชื่อ "cat.jpg" ในไดเรกทอรีปัจจุบัน
คุณสามารถตรวจสอบตัวชี้วัด:
curl http://127.0.0.1:8000/metricsแค่ไหน! คุณเพิ่งโฮสต์ โมเดลการกระจายเสถียร ของคุณเป็นบริการ!
ตัวอย่างที่พร้อมใช้งานมากขึ้นสามารถพบได้ในส่วนตัวอย่าง มันรวมถึง:
max_batch_size และ max_wait_time (millisecond) ได้รับการกำหนดค่าเมื่อคุณเรียก append_workermax_batch_size จะไม่ทำให้เกิดความจำใน GPUmax_wait_time ควรน้อยกว่าเวลาอนุมานแบทช์max_batch_size หรือเมื่อ max_wait_time ผ่านไปแล้ว บริการจะได้รับประโยชน์จากคุณสมบัตินี้เมื่อปริมาณการใช้งานสูงmosec คุณสามารถตรวจสอบภาพ mosecorg/mosec อย่างเป็นทางการได้ สำหรับกรณีการใช้งานที่ซับซ้อนลองดู Envdmosec_service_batch_size_bucket แสดงการกระจายขนาดแบทช์mosec_service_batch_duration_second_bucket แสดงระยะเวลาของการแบตช์แบบไดนามิกสำหรับการเชื่อมต่อแต่ละครั้งในแต่ละขั้นตอน (เริ่มจากการรับงานแรก)mosec_service_process_duration_second_bucket แสดงระยะเวลาการประมวลผลสำหรับการเชื่อมต่อแต่ละครั้งในแต่ละขั้นตอน (รวมถึงเวลา IPC แต่ไม่รวม mosec_service_batch_duration_second_bucket )mosec_service_remaining_task แสดงจำนวนงานประมวลผลในปัจจุบันmosec_service_throughput แสดงปริมาณงานบริการSIGINT ( CTRL+C ) หรือ SIGTERM ( kill {PID} ) เนื่องจากมีตรรกะการปิดที่สง่างาม max_batch_size ที่ดีที่สุดและ max_wait_time สำหรับบริการการอนุมานของคุณ ตัวชี้วัดจะแสดงฮิสโตแกรมของขนาดแบทช์จริงและระยะเวลาแบทช์ นี่คือข้อมูลสำคัญในการปรับพารามิเตอร์ทั้งสองนี้serialize_ipc/deserialize_ipc ดังนั้นข้อมูลที่มีขนาดใหญ่มากอาจทำให้ท่อทั้งหมดช้าลง ข้อมูลอนุกรมจะถูกส่งผ่านไปยังขั้นตอนต่อไปผ่านการเกิดสนิมโดยค่าเริ่มต้นคุณสามารถเปิดใช้งานหน่วยความจำที่ใช้ร่วมกันเพื่อลดเวลาแฝง (Ref Redisshmipcmixin)serialize/deserialize ที่เหมาะสมซึ่งใช้ในการถอดรหัสคำขอของผู้ใช้และเข้ารหัสการตอบกลับ โดยค่าเริ่มต้นทั้งสองกำลังใช้ JSON อย่างไรก็ตามรูปภาพและการฝังตัวไม่ได้รับการสนับสนุนอย่างดีจาก JSON คุณสามารถเลือก Msgpack ซึ่งเข้ากันได้เร็วขึ้นและเข้ากันได้กับไบนารี (การแพร่กระจายที่เสถียรของอ้างอิง)mosec ปรับให้เข้ากับโปรโตคอลของผู้ใช้โดยอัตโนมัติ (เช่น HTTP/2) ตั้งแต่ v0.8.8 นี่คือบาง บริษัท และผู้ใช้รายบุคคลที่ใช้ MOSEC:
หากคุณพบว่าซอฟต์แวร์นี้มีประโยชน์สำหรับการวิจัยของคุณโปรดพิจารณาอ้างถึง
@software{yang2021mosec,
title = {{MOSEC: Model Serving made Efficient in the Cloud}},
author = {Yang, Keming and Liu, Zichen and Cheng, Philip},
url = {https://github.com/mosecorg/mosec},
year = {2021}
}
เรายินดีต้อนรับการบริจาคทุกประเภท โปรดให้ข้อเสนอแนะกับเราโดยการยกประเด็นหรือพูดคุยเกี่ยวกับความไม่ลงรอยกัน นอกจากนี้คุณยังสามารถสนับสนุนรหัสและคำขอดึงของคุณโดยตรง!
ในการเริ่มพัฒนาคุณสามารถใช้ EnvD เพื่อสร้างสภาพแวดล้อม Python & Rust ที่แยกได้และสะอาด ตรวจสอบ envd-docs หรือ build.envd สำหรับข้อมูลเพิ่มเติม