Los agentes de IA son máquinas de estado, no DAGS
Synth Machines permite a los usuarios crear y ejecutar máquinas de estado de agente de IA ( Synth ) al proporcionar una SynthDefinition para definir un flujo de trabajo de IA estructurado.
Las máquinas estatales son una construcción poderosa, ya que permiten a un experto en dominio deconstruir el problema en conjuntos de estados y transiciones.
Las transiciones entre los estados pueden llamar a una LLM, herramienta, proceso de datos o una mezcla de muchas salidas.
Instale el paquete. pip install synth_machine[openai,togetherai,anthropic] o poetry add synth_machine[openai,togetherai,anthropic]
Agregue cualquiera de las claves de entorno de su proveedor de proveedores de API para las cuales
# You only need to set the API providers you want to use.
export OPENAI_API_KUY=secret
export ANTHROPIC_API_KEY=secret
export TOGETHER_API_KEY=secret
pip install synth_machine[vllm,llamacpp] o poetry add synth_machine[vllm,llamacpp]
Es probable que necesite configurar CUDA, VLLM o LLAMA.CPP para uso local.
Enlaces útiles:
agent = Synth(
config: dict[SynthDefinition], # Synth state machine defining states, transitions and prompts.
tools=[], # List of tools the agent will use
memory={}, # Any existing memory to add on top of any model_config.initial_memory
rag_runner: Optional[RAG] = None # Define a RAG integration for your agent.
postprocess_functions = [] # Any glue code functions
store : ObjectStore = ObjectStore(":memory:") # Any files created by tools will automatically go to you object store
SynthDefinition se puede encontrar en SynthDefinition Docs o Synth_machine/Synth_definition.py. Los basemodelos pydánticos que conforman SynthDefinition serán la representación más precisa de un Synth .
Esperamos que la especificación tenga actualizaciones entre las versiones principales.
En cualquier momento, puede verificar el estado actual y los próximos disparadores
# Check state
agent.current_state()
# Triggers
agent.interfaces_for_available_triggers()
await agent.trigger(
"[trigger_name]",
params={
"input_1": "hello"
}
)
Las llamadas de transición por lotes emitirán cualquier variable de salida generada en esa transición.
await agent.streaming_trigger(
"[trigger_name]",
params={
"input_1": "hello"
}
)
Las respuestas de transmisión producen cualquiera de los siguientes eventos:
class YieldTasks(StrEnum):
CHUNK = "CHUNK"
MODEL_CONFIG = "MODEL_CONFIG"
SET_MEMORY = "SET_MEMORY"
SET_ACTIVE_OUTPUT = "SET_ACTIVE_OUTPUT"
CHUNK : las generaciones LLM son enviadas por trozos de un token a la vez.MODEL_CONFIG : rendimientos qué ejecutor se está utilizando actualmente para cualquier interfaces frontend específicas del proveedor.SET_MEMORP : envía eventos configuración de nuevas variables de memoriaSET_ACTIVE_OUTPUT : produce el disparador de salida de transición actual. Esto permite a los usuarios experimentar utilizando trigger y luego integrarse en las generaciones de LLM de transmisión en tiempo real a los usuarios utilizando eventos del lado del servidor (SSE) y trigger_streaming .
Ofrecemos múltiples ejecutores para generar completaciones de chat LLM de LLM local o de API.
openai : https://openai.com/api/picing/togetherai : https://docs.together.ai/docs/inference-modelsanthropic : https://docs.anthropic.com/en/docs/models-overviewgoogle : https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/overview VLLM : https://github.com/vllm-project/vllmLlama-CPP : https://github.com/ggerganov/llama.cpp Model Config Puede especificar el proveedor y el modelo en el default-model-config y la base de sintetizador o model_config en la salida de transición.
ModelConfig:
...
executor: [openai|togetherai|anthropic|vllm|llamacpp]
llm_name: [model_name]
La memoria del agente es un diccionario que contiene todas las variables provisionales que crean en estados anteriores y entradas humanas / del sistema.
agent.memory
# -> {
# "[memory_key]": [memory_value]
# }
Las funciones posteriores al proceso solo deben usarse para el código básico de pegamento, toda la funcionalidad principal debe integrarse en las herramientas.
Vaya a "./tools/tofuTool/api.py para ver la funcionalidad.
API de inicio
cd tools/tofuTool
poetry install
poetry run uvicorn api:app --port=5001 --reload
Recuperar especificaciones de API
curl -X GET http://localhost:5001/openapi.json > openapi_schema.json
Definición de herramienta
Puede definir una herramienta como tal con solo el nombre, el punto final API y el esquema de la herramienta Openapi.
tofu_tool = Tool(
name="tofu_tool",
api_endpoint="http://localhost:5001",
api_spec=tool_spec
)
La generación de recuperación de la generación auge es una herramienta poderosa para mejorar las respuestas de LLM al proporcionar ejemplos semánticamente similares o ejercer al material que el LLM intenta generar.
synth_machine es flexiblemente en tal que mientras se herede de synth_machine.RAG y cree:
embed(documents: List[str]) yquery(prompt: str, rag_config: Optional[synth_machine.RAGConfig])Es fácil integrar múltiples proveedores y bases de datos vectoriales. Con el tiempo habrá implementaciones compatibles con el trapo comunitario en una amplia variedad de proveedores de incrustaciones y bases de datos vectoriales.
La siguiente clase RAG es ideal para experimentar con configuraciones locales de trapo en CPU.
pip install qdrant-client, fastembed
Definir clase de trapo
from synth_machine.rag import RAG
from qdrant_client import AsyncQdrantClient
from fastembed import TextEmbedding
from typing import List, Optional
from qdrant_client.models import Distance, VectorParams, PointStruct
class Qdrant(RAG):
"""
VectorDB: Qdrant - https://github.com/qdrant/qdrant
Embeddings: FastEmbed - https://github.com/qdrant/fastembed
This provides fast and lightweight on-device CPU embeddings creation and
similarity search using Qdrant in memory.
"""
def __init__(
self,
collection_name: str,
embedding_model: str="BAAI/bge-small-en-v1.5",
embedding_dimensions: int=384,
embedding_threads: int=-1,
qdrant_location: str=":memory:",
):
self.embedding_model = TextEmbedding(
model_name=embedding_model,
threads=embedding_threads
)
self.embedding_dimensions = embedding_dimensions
self.qdrant = AsyncQdrantClient(qdrant_location)
self.collection_name = collection_name
async def create_collection(self) -> bool:
if await self.qdrant.collection_exists(self.collection_name):
return True
else:
return await self.qdrant.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.embedding_dimensions, # maps to 'BAAI/bge-small-en-v1.5' model dimensions
distance=Distance.COSINE
)
)
async def embed(self, documents: List[str], metadata: Optional[List[dict]]=None):
if metadata and len(documents) != len(metadata):
raise ValueError("documents and metadata must be the same length")
embedding_list = list(
self.embedding_model.embed(documents)
)
upsert_response = await self.qdrant.upsert(
collection_name=self.collection_name,
points=[
PointStruct(
id=i,
vector=list(vector),
payload=metadata[i]
)
for i, vector in enumerate(embedding_list)
]
)
return upsert_response.status
async def query(self, prompt: str, rag_config: RAGConfig) -> List[dict]:
embedding = next(self.embedding_model.embed([prompt]))
similar_responses = await self.qdrant.search(
collection_name=self.collection_name,
query_vector=embedding,
limit=rag_config.n
)
return [
point.payload for point in similar_responses
]
Ahora inicie la clase Qdrant y proporcione al definir Synth .
qdrant = Qdrant(collection_name="tofu_examples")
await qdrant.create_collection()
agent = Synth(
...
rag_runner=Qdrant
)
Las herramientas pueden devolver una variedad de objetos diferentes. Cualquier archivo creado por una herramienta irá automáticamente a su agent.store . Usamos ObjectStore para el almacenamiento de archivos, con ObjectStore(":memory:") como el valor predeterminado.
Para recuperar un archivo: agent.store.get(file_name)
ObjectStore que permite una fácil integración a:
from synth_machine.machine import ObjectStore
agent = Agent(
...
store=ObjectStore("gs://[bucket_name]/[prefix]))
)
Cualquier funcionalidad personalizada se puede definir como una función definida por el usuario (UDF).
Estos toman Synth.memory como entrada y le permite ejecutar la funcionalidad personalizada como parte de la synth-machine .
# Define postprocess function
from synth_machine.user_defined_functions import udf
@udf
def abc_postprocesss(memory):
...
return memory["variable_key"]
agent = Synth(
...
user_defined_functions = {
"abc": abc_postprocess
}
)
...
- key: trigger_udf
inputs:
- key: variable_key
outputs:
- key: example_udf
udf: abc
Nota: Cualquier funcionalidad no trivial debe ser una herramienta y no UDF.