Ein einfaches Workflow -Framework in Go geschrieben
[TODO Bild hier einfügen]

Backend ist verantwortlich für den fortdauernden Workflow -Zustand, einschließlich Aufgaben, Veranstaltungen, Workflow -Laufzeitmetadaten.
In diesem Beispiel werden wir PSQL als Backend verwenden.
Starten Sie zunächst den PSQL -Server lokal
docker compose -f docker/docker-compose-psql.yaml up -dInitieren Sie dann eine Backend -Instanz, die eine Verbindung zum PSQL -Server herstellt
const (
DbHost = "localhost"
DbPort = 5432
DbName = "postgres"
DbUser = "user"
DbPassword = "123456"
)
func InitPSQLBackend ( logger * zap. Logger ) (backend. Backend , error ) {
hostname , err := os . Hostname ()
if err != nil {
return nil , err
}
db , err := psql . Connect ( DbHost , DbPort , DbUser , DbPassword , DbName , nil )
if err != nil {
return nil , err
}
err = psql . PrepareDB ( db ) // auto-create table if not exists
if err != nil {
return nil , err
}
dataConverter := dataconverter . NewJsonDataConverter ()
be := psql . NewPSQLBackend ( hostname , 5 * time . Minute , dataConverter , db , logger )
return be , nil
} be , err := examples . InitPSQLBackend ( logger )Aktivität ist eine Funktion, mit der Serviceanrufe, E/A-Betrieb, langlebige Operationen oder kostspielige Aktionen implementiert werden können, die nicht bevorzugt werden können, um erneut ausgeführt zu werden
type PaymentInput struct {
Value int64
}
func PaymentActivity ( ctx context. Context , input * PaymentInput ) ( * Void , error ) {
r := rand . Intn ( 100 )
if r < 30 { // 30% of failure
return & Void {}, nil
} else {
return nil , errors . New ( "payment failed" )
}
}Hinweis 1 : Der Code in der Aktivität muss nicht deterministisch sein. Wenn die Aktivität zweimal ausgeführt wird, ergibt er immer das gleiche Ergebnis
Hinweis Nr. 2 : Wenn Sie bei der Ausführung von Aktivitäten einen unerwarteten Fehler erleben, rufen Sie einfach panic(...) , die Aktivität wird später wiederholt
Workflow ist die Orchestrierung von Aktivitäten
type SubscriptionWorkflowInput struct {
TotalAmount int64
Cycles int
CycleDuration time. Duration
}
type SubscriptionWorkflowOutput struct {
Paid int64
Overdue int64
}
func SubscriptionWorkflow ( ctx context. Context , input * SubscriptionWorkflowInput ) ( * SubscriptionWorkflowOutput , error ) {
startTimestamp := workflow . GetWorkflowExecutionStartedTimestamp ( ctx )
paymentAmounts := calculatePaymentCycles ( input . TotalAmount , input . Cycles )
paymentTimings := calculatePaymentTimings ( startTimestamp , input . Cycles , input . CycleDuration )
//
var paid int64 = 0
var overdue int64 = 0
currentCycle := 0
for {
workflow . SetVar ( ctx , "paid" , paid )
workflow . SetVar ( ctx , "overdue" , overdue )
workflow . SetVar ( ctx , "currentCycle" , currentCycle )
if currentCycle >= input . Cycles {
break
}
currentCycleAmount := paymentAmounts [ currentCycle ]
amountToPay := currentCycleAmount + overdue
workflow . SetVar ( ctx , "amountToPay" , amountToPay )
workflow . WaitUntil ( ctx , time . UnixMilli ( paymentTimings [ currentCycle ]))
_ , err := workflow . CallActivity ( ctx , PaymentActivity , & PaymentInput { Value : amountToPay }). Await ()
if err != nil {
overdue += paymentAmounts [ currentCycle ]
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_err" , currentCycle ), err . Error ())
} else {
paid += amountToPay
overdue = 0
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_paid_amount" , currentCycle ), amountToPay )
}
workflow . SetVar ( ctx , "amountToPay" , 0 )
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_completed_at" , currentCycle ), workflow . GetCurrentTimestamp ( ctx ))
currentCycle += 1
}
return & SubscriptionWorkflowOutput {
Paid : paid ,
Overdue : overdue ,
}, nil
}Hinweis Nr. 1 : Setzen Sie keine kostspieligen Vorgänge (IO -Operationen, externe Serviceanrufe usw.) auf Workflow -Code und geben Sie sie stattdessen in den Aktivitätscode ein
Arbeitnehmer, einschließlich ActivityWorker und WorkflowWorker
aw , err := worker . NewActivityWorkersBuilder ().
WithName ( "demo activity worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterActivities (
PaymentActivity ,
).
WithActivityWorkerOpts (
activity_worker . WithTaskProcessorMaxBackoffInterval ( 1 * time . Minute ),
).
Build () ww , err := worker . NewWorkflowWorkersBuilder ().
WithName ( "demo workflow worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterWorkflows (
SubscriptionWorkflow ,
). Build ()Wenn wir alle Teile zusammenfügen, können wir unser Arbeiterprogramm implementieren
func main () {
ctx := context . Background ()
logger , err := examples . GetLogger ()
if err != nil {
panic ( err )
}
be , err := examples . InitPSQLBackend ( logger )
if err != nil {
panic ( err )
}
aw , err := worker . NewActivityWorkersBuilder ().
WithName ( "demo activity worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterActivities (
PaymentActivity ,
).
WithActivityWorkerOpts (
activity_worker . WithTaskProcessorMaxBackoffInterval ( 1 * time . Minute ),
).
Build ()
if err != nil {
panic ( err )
}
ww , err := worker . NewWorkflowWorkersBuilder ().
WithName ( "demo workflow worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterWorkflows (
SubscriptionWorkflow ,
). Build ()
if err != nil {
panic ( err )
}
aw . Start ( ctx )
defer aw . Stop ( ctx )
ww . Start ( ctx )
defer ww . Stop ( ctx )
//
sigs := make ( chan os. Signal , 1 )
signal . Notify ( sigs , syscall . SIGINT , syscall . SIGTERM )
<- sigs
}
Nachdem wir unsere Arbeiterinstanz laufen, können wir Codes schreiben, um Workflows zu starten und auf ihr Ergebnis zu warten
Um einen Workflow zu vereinbaren, rufen Sie ScheduleWorkflow an und füllen Sie dann die erforderlichen Parameter aus, um den Workflow zu starten
err := client . ScheduleWorkflow ( ctx , be , SubscriptionWorkflow , & SubscriptionWorkflowInput {
TotalAmount : totalAmount ,
Cycles : cycles ,
}, client. WorkflowScheduleOptions {
WorkflowID : workflowID ,
Version : "1" ,
})
Um einen laufenden Workflow zu debuggen, müssen wir zunächst mehrere Laufzeitvariablen in den Workflow einstellen.
Wir verwenden Methoden SetVar[T any](ctx context.Context, name string, value T) , mit dem eine Laufzeitvariable geändert wird. Danach werden wir den Workflowdebugger verwenden, um den aktuellen Laufzeitzustand zu debuggen, indem wir diese Variablen herausholen.
dbg := debug . NewWorkflowDebugger ( be )
vars , err := dbg . QueryUserDefinedVars ( SubscriptionWorkflow , workflowID )
if err != nil {
panic ( err )
}
PrettyPrint ( vars )
Um auf eine Workflow -Ausführung zu warten AwaitWorkflowResult um das Ergebnis zu vervollständigen und zu erzielen
workflowResult , workflowErr , err := client . AwaitWorkflowResult ( ctx , be , SubscriptionWorkflow , workflowID )
Alle obigen Code wurden aus dem Beispiel für Abonnement_With_Debug entnommen
Siehe Beispiele