กรอบเวิร์กโฟลว์ง่าย ๆ ที่เขียนใน GO
[TODO แทรกภาพที่นี่]

แบ็กเอนด์มีหน้าที่รับผิดชอบต่อสถานะเวิร์กโฟลว์ที่ยังคงอยู่รวมถึงงานกิจกรรมเมตาดาต้าของเวิร์กโฟลว์รันไทม์
ในตัวอย่างนี้เราจะใช้ PSQL เป็นแบ็กเอนด์
ก่อนอื่นให้เริ่มเซิร์ฟเวอร์ PSQL ในเครื่อง
docker compose -f docker/docker-compose-psql.yaml up -dจากนั้นเริ่มอินสแตนซ์แบ็กเอนด์ที่เชื่อมต่อกับเซิร์ฟเวอร์ PSQL
const (
DbHost = "localhost"
DbPort = 5432
DbName = "postgres"
DbUser = "user"
DbPassword = "123456"
)
func InitPSQLBackend ( logger * zap. Logger ) (backend. Backend , error ) {
hostname , err := os . Hostname ()
if err != nil {
return nil , err
}
db , err := psql . Connect ( DbHost , DbPort , DbUser , DbPassword , DbName , nil )
if err != nil {
return nil , err
}
err = psql . PrepareDB ( db ) // auto-create table if not exists
if err != nil {
return nil , err
}
dataConverter := dataconverter . NewJsonDataConverter ()
be := psql . NewPSQLBackend ( hostname , 5 * time . Minute , dataConverter , db , logger )
return be , nil
} be , err := examples . InitPSQLBackend ( logger )กิจกรรมเป็นฟังก์ชั่นที่ใช้ในการใช้การโทรบริการการดำเนินการ I/O การดำเนินการที่ดำเนินมายาวนานหรือการกระทำที่มีราคาแพงซึ่งไม่ต้องการดำเนินการอีกครั้ง
type PaymentInput struct {
Value int64
}
func PaymentActivity ( ctx context. Context , input * PaymentInput ) ( * Void , error ) {
r := rand . Intn ( 100 )
if r < 30 { // 30% of failure
return & Void {}, nil
} else {
return nil , errors . New ( "payment failed" )
}
}หมายเหตุ #1 : รหัสภายในกิจกรรมจะต้องไม่เป็นที่กำหนดซึ่งคือเมื่อทำงานกิจกรรมสองครั้งมันจะให้ผลลัพธ์เดียวกันเสมอ
หมายเหตุ #2 : หากคุณพบข้อผิดพลาดที่ไม่คาดคิดในขณะที่ดำเนินกิจกรรมเพียงแค่เรียก panic(...) กิจกรรมจะถูกลองใหม่ในภายหลัง
เวิร์กโฟลว์คือการประสานกิจกรรม
type SubscriptionWorkflowInput struct {
TotalAmount int64
Cycles int
CycleDuration time. Duration
}
type SubscriptionWorkflowOutput struct {
Paid int64
Overdue int64
}
func SubscriptionWorkflow ( ctx context. Context , input * SubscriptionWorkflowInput ) ( * SubscriptionWorkflowOutput , error ) {
startTimestamp := workflow . GetWorkflowExecutionStartedTimestamp ( ctx )
paymentAmounts := calculatePaymentCycles ( input . TotalAmount , input . Cycles )
paymentTimings := calculatePaymentTimings ( startTimestamp , input . Cycles , input . CycleDuration )
//
var paid int64 = 0
var overdue int64 = 0
currentCycle := 0
for {
workflow . SetVar ( ctx , "paid" , paid )
workflow . SetVar ( ctx , "overdue" , overdue )
workflow . SetVar ( ctx , "currentCycle" , currentCycle )
if currentCycle >= input . Cycles {
break
}
currentCycleAmount := paymentAmounts [ currentCycle ]
amountToPay := currentCycleAmount + overdue
workflow . SetVar ( ctx , "amountToPay" , amountToPay )
workflow . WaitUntil ( ctx , time . UnixMilli ( paymentTimings [ currentCycle ]))
_ , err := workflow . CallActivity ( ctx , PaymentActivity , & PaymentInput { Value : amountToPay }). Await ()
if err != nil {
overdue += paymentAmounts [ currentCycle ]
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_err" , currentCycle ), err . Error ())
} else {
paid += amountToPay
overdue = 0
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_paid_amount" , currentCycle ), amountToPay )
}
workflow . SetVar ( ctx , "amountToPay" , 0 )
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_completed_at" , currentCycle ), workflow . GetCurrentTimestamp ( ctx ))
currentCycle += 1
}
return & SubscriptionWorkflowOutput {
Paid : paid ,
Overdue : overdue ,
}, nil
}หมายเหตุ #1 : อย่าทำการดำเนินการใด ๆ (การดำเนินการ IO, การโทรภายนอก ฯลฯ ) บนรหัสเวิร์กโฟลว์ใส่ไว้ในรหัสกิจกรรมแทน
คนงานรวมถึง ActivityWorker และ WorkflowWorker มีหน้าที่รับผิดชอบในการดำเนินกิจกรรมและรหัสเวิร์กโฟลว์
aw , err := worker . NewActivityWorkersBuilder ().
WithName ( "demo activity worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterActivities (
PaymentActivity ,
).
WithActivityWorkerOpts (
activity_worker . WithTaskProcessorMaxBackoffInterval ( 1 * time . Minute ),
).
Build () ww , err := worker . NewWorkflowWorkersBuilder ().
WithName ( "demo workflow worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterWorkflows (
SubscriptionWorkflow ,
). Build ()การรวมชิ้นส่วนทั้งหมดเข้าด้วยกันเราสามารถใช้โปรแกรมคนงานของเราได้
func main () {
ctx := context . Background ()
logger , err := examples . GetLogger ()
if err != nil {
panic ( err )
}
be , err := examples . InitPSQLBackend ( logger )
if err != nil {
panic ( err )
}
aw , err := worker . NewActivityWorkersBuilder ().
WithName ( "demo activity worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterActivities (
PaymentActivity ,
).
WithActivityWorkerOpts (
activity_worker . WithTaskProcessorMaxBackoffInterval ( 1 * time . Minute ),
).
Build ()
if err != nil {
panic ( err )
}
ww , err := worker . NewWorkflowWorkersBuilder ().
WithName ( "demo workflow worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterWorkflows (
SubscriptionWorkflow ,
). Build ()
if err != nil {
panic ( err )
}
aw . Start ( ctx )
defer aw . Stop ( ctx )
ww . Start ( ctx )
defer ww . Stop ( ctx )
//
sigs := make ( chan os. Signal , 1 )
signal . Notify ( sigs , syscall . SIGINT , syscall . SIGTERM )
<- sigs
}
หลังจากทำงานอินสแตนซ์ของคนงานแล้วเราสามารถเขียนรหัสเพื่อเริ่มเวิร์กโฟลว์และรอผลลัพธ์ของพวกเขา
หากต้องการกำหนดเวลาเวิร์กโฟลว์โทร ScheduleWorkflow โทรออกจากนั้นกรอกพารามิเตอร์ที่จำเป็นเพื่อเริ่มเวิร์กโฟลว์
err := client . ScheduleWorkflow ( ctx , be , SubscriptionWorkflow , & SubscriptionWorkflowInput {
TotalAmount : totalAmount ,
Cycles : cycles ,
}, client. WorkflowScheduleOptions {
WorkflowID : workflowID ,
Version : "1" ,
})
ก่อนอื่นในการดีบักเวิร์กโฟลว์การทำงานเราต้องใส่ตัวแปรรันไทม์หลายตัวในเวิร์กโฟลว์
เราจะใช้เมธอด SetVar[T any](ctx context.Context, name string, value T) ซึ่งใช้ในการแก้ไขตัวแปรรันไทม์ หลังจากนั้นเราจะใช้ WorkflowDebugger เพื่อแก้ไขข้อบกพร่องของสถานะรันไทม์ปัจจุบันโดยนำตัวแปรเหล่านั้นออก
dbg := debug . NewWorkflowDebugger ( be )
vars , err := dbg . QueryUserDefinedVars ( SubscriptionWorkflow , workflowID )
if err != nil {
panic ( err )
}
PrettyPrint ( vars )
หากต้องการรอการดำเนินการขั้น AwaitWorkflowResult การทำงานให้เสร็จสมบูรณ์และรับผล
workflowResult , workflowErr , err := client . AwaitWorkflowResult ( ctx , be , SubscriptionWorkflow , workflowID )
รหัสข้างต้นทั้งหมดถูกนำมาจากตัวอย่าง substcription_with_debug
ดูตัวอย่าง