Un marco de flujo de trabajo simple escrito en Go
[TODO INSERTA IMAGEN AQUÍ]

El backend es responsable de persistir el estado de flujo de trabajo, incluidas tareas, eventos, metadatos de tiempo de ejecución de flujo de trabajo.
En este ejemplo, usaremos PSQL como backend.
Primero, inicie el servidor PSQL localmente
docker compose -f docker/docker-compose-psql.yaml up -dLuego, inicie una instancia de backend que se conecte al servidor PSQL
const (
DbHost = "localhost"
DbPort = 5432
DbName = "postgres"
DbUser = "user"
DbPassword = "123456"
)
func InitPSQLBackend ( logger * zap. Logger ) (backend. Backend , error ) {
hostname , err := os . Hostname ()
if err != nil {
return nil , err
}
db , err := psql . Connect ( DbHost , DbPort , DbUser , DbPassword , DbName , nil )
if err != nil {
return nil , err
}
err = psql . PrepareDB ( db ) // auto-create table if not exists
if err != nil {
return nil , err
}
dataConverter := dataconverter . NewJsonDataConverter ()
be := psql . NewPSQLBackend ( hostname , 5 * time . Minute , dataConverter , db , logger )
return be , nil
} be , err := examples . InitPSQLBackend ( logger )La actividad es una función que se utilizó para implementar llamadas de servicio, operación de E/S, operaciones de larga duración o acciones costosas que no prefieren ser reecuperados
type PaymentInput struct {
Value int64
}
func PaymentActivity ( ctx context. Context , input * PaymentInput ) ( * Void , error ) {
r := rand . Intn ( 100 )
if r < 30 { // 30% of failure
return & Void {}, nil
} else {
return nil , errors . New ( "payment failed" )
}
}Nota #1 : El código interno de la actividad no debe ser determinista, que es cuando se ejecuta la actividad dos veces, siempre produce el mismo resultado
Nota #2 : Si experimenta un error inesperado al ejecutar la actividad, simplemente llame panic(...) , la actividad se volverá a jugar más tarde
El flujo de trabajo es la orquestación de actividades
type SubscriptionWorkflowInput struct {
TotalAmount int64
Cycles int
CycleDuration time. Duration
}
type SubscriptionWorkflowOutput struct {
Paid int64
Overdue int64
}
func SubscriptionWorkflow ( ctx context. Context , input * SubscriptionWorkflowInput ) ( * SubscriptionWorkflowOutput , error ) {
startTimestamp := workflow . GetWorkflowExecutionStartedTimestamp ( ctx )
paymentAmounts := calculatePaymentCycles ( input . TotalAmount , input . Cycles )
paymentTimings := calculatePaymentTimings ( startTimestamp , input . Cycles , input . CycleDuration )
//
var paid int64 = 0
var overdue int64 = 0
currentCycle := 0
for {
workflow . SetVar ( ctx , "paid" , paid )
workflow . SetVar ( ctx , "overdue" , overdue )
workflow . SetVar ( ctx , "currentCycle" , currentCycle )
if currentCycle >= input . Cycles {
break
}
currentCycleAmount := paymentAmounts [ currentCycle ]
amountToPay := currentCycleAmount + overdue
workflow . SetVar ( ctx , "amountToPay" , amountToPay )
workflow . WaitUntil ( ctx , time . UnixMilli ( paymentTimings [ currentCycle ]))
_ , err := workflow . CallActivity ( ctx , PaymentActivity , & PaymentInput { Value : amountToPay }). Await ()
if err != nil {
overdue += paymentAmounts [ currentCycle ]
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_err" , currentCycle ), err . Error ())
} else {
paid += amountToPay
overdue = 0
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_paid_amount" , currentCycle ), amountToPay )
}
workflow . SetVar ( ctx , "amountToPay" , 0 )
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_completed_at" , currentCycle ), workflow . GetCurrentTimestamp ( ctx ))
currentCycle += 1
}
return & SubscriptionWorkflowOutput {
Paid : paid ,
Overdue : overdue ,
}, nil
}Nota #1 : No coloque ninguna operación costosa (operaciones de IO, llamadas de servicio externos, etc.) en el código de flujo de trabajo, coloque el código de actividad en su lugar
Los trabajadores, incluido ActivityWorker y WorkflowWorker son responsables de ejecutar la actividad y los códigos de flujo de trabajo
aw , err := worker . NewActivityWorkersBuilder ().
WithName ( "demo activity worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterActivities (
PaymentActivity ,
).
WithActivityWorkerOpts (
activity_worker . WithTaskProcessorMaxBackoffInterval ( 1 * time . Minute ),
).
Build () ww , err := worker . NewWorkflowWorkersBuilder ().
WithName ( "demo workflow worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterWorkflows (
SubscriptionWorkflow ,
). Build ()Al juntar todas las piezas, podemos implementar nuestro programa de trabajadores
func main () {
ctx := context . Background ()
logger , err := examples . GetLogger ()
if err != nil {
panic ( err )
}
be , err := examples . InitPSQLBackend ( logger )
if err != nil {
panic ( err )
}
aw , err := worker . NewActivityWorkersBuilder ().
WithName ( "demo activity worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterActivities (
PaymentActivity ,
).
WithActivityWorkerOpts (
activity_worker . WithTaskProcessorMaxBackoffInterval ( 1 * time . Minute ),
).
Build ()
if err != nil {
panic ( err )
}
ww , err := worker . NewWorkflowWorkersBuilder ().
WithName ( "demo workflow worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterWorkflows (
SubscriptionWorkflow ,
). Build ()
if err != nil {
panic ( err )
}
aw . Start ( ctx )
defer aw . Stop ( ctx )
ww . Start ( ctx )
defer ww . Stop ( ctx )
//
sigs := make ( chan os. Signal , 1 )
signal . Notify ( sigs , syscall . SIGINT , syscall . SIGTERM )
<- sigs
}
Después de que nuestra instancia de trabajador se ejecute, podemos escribir códigos para iniciar flujos de trabajo y esperar su resultado
Para programar un flujo de trabajo, llame ScheduleWorkflow y luego complete los parámetros necesarios para iniciar el flujo de trabajo
err := client . ScheduleWorkflow ( ctx , be , SubscriptionWorkflow , & SubscriptionWorkflowInput {
TotalAmount : totalAmount ,
Cycles : cycles ,
}, client. WorkflowScheduleOptions {
WorkflowID : workflowID ,
Version : "1" ,
})
Primero, para depurar un flujo de trabajo en ejecución, tenemos que poner varias variables de tiempo de ejecución dentro del flujo de trabajo.
Usaremos el método SetVar[T any](ctx context.Context, name string, value T) que se utiliza para modificar una variable de tiempo de ejecución. Después de eso, utilizaremos el flujo de trabajo deBugger para depurar el estado actual de tiempo de ejecución al sacar esas variables.
dbg := debug . NewWorkflowDebugger ( be )
vars , err := dbg . QueryUserDefinedVars ( SubscriptionWorkflow , workflowID )
if err != nil {
panic ( err )
}
PrettyPrint ( vars )
Para esperar a que una ejecución de flujo de trabajo se complete y obtenga su resultado, llame a AwaitWorkflowResult Method
workflowResult , workflowErr , err := client . AwaitWorkflowResult ( ctx , be , SubscriptionWorkflow , workflowID )
Todo el código anterior se tomó del ejemplo de suscripción_with_debug
Ver ejemplos