
![]()
模型服务在云中有效。

MOSEC是用于构建ML模型的后端和微服务的高性能和灵活的模型。它弥合了刚训练的任何机器学习模型与有效的在线服务API之间的差距。
MOSEC需要Python 3.7或更高。使用:
pip install -U mosec
# or install with conda
conda install conda-forge::mosec要从源代码构建,请安装Rust并运行以下命令:
make package您将在dist文件夹中获得MOSEC车轮文件。
我们演示了MOSEC如何帮助您轻松托管预先训练的稳定扩散模型作为服务。您需要将扩散器和变压器作为先决条件:
pip install --upgrade diffusers[torch] transformers首先,我们导入库并设置一个基本的记录仪,以更好地观察发生的事情。
from io import BytesIO
from typing import List
import torch # type: ignore
from diffusers import StableDiffusionPipeline # type: ignore
from mosec import Server , Worker , get_logger
from mosec . mixin import MsgpackMixin
logger = get_logger ()然后,我们为客户构建一个API ,以查询文本提示,并仅以3个步骤获得基于稳定的扩散-V1-5模型的图像。
将您的服务定义为继承mosec.Worker类。在这里,我们还继承了MsgpackMixin ,以采用MSGPACK序列化格式(a) 。
在__init__方法内,初始化模型并将其放在相应的设备上。您可以选择地分配self.example用一些数据进行热身(b)模型。请注意,数据应与处理程序的输入格式兼容,我们接下来会详细介绍。
覆盖编写您的服务处理程序(C) forward方法,并以签名forward(self, data: Any | List[Any]) -> Any | List[Any] 。接收/返回单个项目或元组取决于是否配置了动态批处理(D) 。
class StableDiffusion ( MsgpackMixin , Worker ):
def __init__ ( self ):
self . pipe = StableDiffusionPipeline . from_pretrained (
"sd-legacy/stable-diffusion-v1-5" , torch_dtype = torch . float16
)
self . pipe . enable_model_cpu_offload ()
self . example = [ "useless example prompt" ] * 4 # warmup (batch_size=4)
def forward ( self , data : List [ str ]) -> List [ memoryview ]:
logger . debug ( "generate images for %s" , data )
res = self . pipe ( data )
logger . debug ( "NSFW: %s" , res [ 1 ])
images = []
for img in res [ 0 ]:
dummy_file = BytesIO ()
img . save ( dummy_file , format = "JPEG" )
images . append ( dummy_file . getbuffer ())
return images[!笔记]
(a)在此示例中,我们以二进制格式返回图像,而JSON不支持该图像(除非用base64编码使有效负载更大的base64)。因此,MSGPACK更适合我们的需求。如果我们不继承
MsgpackMixin,则默认情况下将使用JSON。换句话说,服务请求/响应的协议可以是MSGPACK,JSON或任何其他格式(检查我们的Mixins)。(b)热身通常有助于提前分配GPU记忆。如果指定了热身示例,则只有在示例通过处理程序转发后才准备就绪。但是,如果没有给出示例,则预计第一个请求的延迟将更长。该
example应根据forward期望接收的方式设置为单个项目或元组。此外,如果您想使用多个不同的示例进行热身,则可以设置multi_examples(此处的演示)。(c)此示例显示了一个单阶段服务,在该服务中,
StableDiffusionWorker直接接收客户的提示请求并响应图像。因此,forward可以将其视为完整的服务处理程序。但是,我们还可以在管道中设计多个阶段服务(例如,下载图像,模型推理,后处理)的工人。在这种情况下,整个管道被视为服务处理程序,第一位工人接受了请求,最后一名工人发送了答复。工人之间的数据流是通过过程间通信完成的。(d)由于在此示例中启用了动态批处理,因此,
forward方法将希望收到字符串的列表,例如['a cute cat playing with a red ball', 'a man sitting in front of a computer', ...],从不同的客户端汇总进行批处理推断,改善了系统吞吐量。
最后,我们将工人附加到服务器上以构建单个阶段的工作流程(可以输送多个阶段以进一步提高吞吐量,请参见此示例),并指定我们希望其以并行运行的过程数( num=1 ),最大批次大小,最大批次= 4( max_batch_size=4 ),与最大的限制为time time time time tirn interim max_wait_time=10 interime;毫秒,这意味着最长的MOSEC等待,直到将批量发送给工人)。
if __name__ == "__main__" :
server = Server ()
# 1) `num` specifies the number of processes that will be spawned to run in parallel.
# 2) By configuring the `max_batch_size` with the value > 1, the input data in your
# `forward` function will be a list (batch); otherwise, it's a single item.
server . append_worker ( StableDiffusion , num = 1 , max_batch_size = 4 , max_wait_time = 10 )
server . run ()以上片段在我们的示例文件中合并。您可以直接在项目根级别上运行。我们首先查看命令行参数(这里说明):
python examples/stable_diffusion/server.py --help然后,让我们从调试日志开始服务器:
python examples/stable_diffusion/server.py --log-level debug --timeout 30000打开http://127.0.0.1:8000/openapi/swagger/在您的浏览器中获取OpenAPI DOC。
在另一个终端中,对其进行测试:
python examples/stable_diffusion/client.py --prompt " a cute cat playing with a red ball " --output cat.jpg --port 8000您将在当前目录中获得名为“ cat.jpg”的图像。
您可以检查指标:
curl http://127.0.0.1:8000/metrics就是这样!您刚刚托管了稳定的扩散模型作为服务!
可以在示例部分中找到更多的现成示例。它包括:
append_worker时,将配置max_batch_size和max_wait_time (millisecond) 。max_batch_size值推断不会导致GPU中的内存。max_wait_time应小于批处理推理时间。max_batch_size或max_wait_time经过时,它将收集批处理。当流量较高时,该服务将从此功能中受益。mosec的GPU基础图像,则可以检查官方图像mosecorg/mosec 。对于复杂的用例,请查看Envd。mosec_service_batch_size_bucket显示批处理大小分布。mosec_service_batch_duration_second_bucket显示每个阶段中每个连接的动态批次持续时间(从接收第一个任务开始)。mosec_service_process_duration_second_bucket显示每个阶段中每个连接的处理持续时间(包括IPC时间,但不包括mosec_service_batch_duration_second_bucket )。mosec_service_remaining_task显示当前处理任务的数量。mosec_service_throughput显示服务吞吐量。SIGINT ( CTRL+C )或SIGTERM ( kill {PID} )停止服务,因为它具有优美的关闭逻辑。 max_batch_size和max_wait_time 。指标将显示真实批处理大小和批处理持续时间的直方图。这些是调整这两个参数的关键信息。serialize_ipc/deserialize_ipc方法进行序列化/应对,因此非常大的数据可能会使整个管道慢速。默认情况下,序列化数据将通过RUST传递到下一阶段,您可以启用共享内存以减少潜伏期(参考Redisshmipcmixin)。serialize/deserialize方法,该方法用于解码用户请求并编码响应。默认情况下,两者都使用JSON。但是,JSON并不能很好地支持图像和嵌入。您可以选择更快,二进制兼容的MSGPACK(参考稳定扩散)。mosec自V0.8.8以自动适应用户协议(例如,http/2)。 以下是使用MOSEC的一些公司和个人用户:
如果您发现此软件对您的研究有用,请考虑引用
@software{yang2021mosec,
title = {{MOSEC: Model Serving made Efficient in the Cloud}},
author = {Yang, Keming and Liu, Zichen and Cheng, Philip},
url = {https://github.com/mosecorg/mosec},
year = {2021}
}
我们欢迎任何贡献。请通过提出问题或讨论不和谐来给我们反馈。您也可以直接贡献您的代码并提取请求!
要开始开发,您可以使用Envd创建一个孤立,清洁的Python&Rust环境。检查Envd-Docs或build.envd以获取更多信息。