간단한 워크 플로 프레임 워크
[여기에 이미지 삽입물]

백엔드는 작업, 이벤트, 워크 플로 런타임 메타 데이터를 포함한 워크 플로 상태를 지속시킵니다.
이 예에서는 PSQL을 백엔드로 사용합니다.
먼저 PSQL Server를 로컬로 시작하십시오
docker compose -f docker/docker-compose-psql.yaml up -d그런 다음 PSQL Server에 연결하는 백엔드 인스턴스를 시작하십시오.
const (
DbHost = "localhost"
DbPort = 5432
DbName = "postgres"
DbUser = "user"
DbPassword = "123456"
)
func InitPSQLBackend ( logger * zap. Logger ) (backend. Backend , error ) {
hostname , err := os . Hostname ()
if err != nil {
return nil , err
}
db , err := psql . Connect ( DbHost , DbPort , DbUser , DbPassword , DbName , nil )
if err != nil {
return nil , err
}
err = psql . PrepareDB ( db ) // auto-create table if not exists
if err != nil {
return nil , err
}
dataConverter := dataconverter . NewJsonDataConverter ()
be := psql . NewPSQLBackend ( hostname , 5 * time . Minute , dataConverter , db , logger )
return be , nil
} be , err := examples . InitPSQLBackend ( logger )활동은 서비스 통화, I/O 운영, 장기 운영 운영 또는 다시 실행되는 것을 선호하지 않는 비용이 많이 드는 작업을 구현하는 데 사용되는 기능입니다.
type PaymentInput struct {
Value int64
}
func PaymentActivity ( ctx context. Context , input * PaymentInput ) ( * Void , error ) {
r := rand . Intn ( 100 )
if r < 30 { // 30% of failure
return & Void {}, nil
} else {
return nil , errors . New ( "payment failed" )
}
}참고 #1 : 코드 내부 활동은 비 결정적이어야합니다. 이는 활동을 두 번 실행할 때 항상 동일한 결과를 얻습니다.
참고 #2 : 활동을 실행하는 동안 예상치 못한 오류가 발생하면 panic(...) 에게 전화하면 활동이 나중에 retsed됩니다.
워크 플로는 활동의 오케스트레이션입니다
type SubscriptionWorkflowInput struct {
TotalAmount int64
Cycles int
CycleDuration time. Duration
}
type SubscriptionWorkflowOutput struct {
Paid int64
Overdue int64
}
func SubscriptionWorkflow ( ctx context. Context , input * SubscriptionWorkflowInput ) ( * SubscriptionWorkflowOutput , error ) {
startTimestamp := workflow . GetWorkflowExecutionStartedTimestamp ( ctx )
paymentAmounts := calculatePaymentCycles ( input . TotalAmount , input . Cycles )
paymentTimings := calculatePaymentTimings ( startTimestamp , input . Cycles , input . CycleDuration )
//
var paid int64 = 0
var overdue int64 = 0
currentCycle := 0
for {
workflow . SetVar ( ctx , "paid" , paid )
workflow . SetVar ( ctx , "overdue" , overdue )
workflow . SetVar ( ctx , "currentCycle" , currentCycle )
if currentCycle >= input . Cycles {
break
}
currentCycleAmount := paymentAmounts [ currentCycle ]
amountToPay := currentCycleAmount + overdue
workflow . SetVar ( ctx , "amountToPay" , amountToPay )
workflow . WaitUntil ( ctx , time . UnixMilli ( paymentTimings [ currentCycle ]))
_ , err := workflow . CallActivity ( ctx , PaymentActivity , & PaymentInput { Value : amountToPay }). Await ()
if err != nil {
overdue += paymentAmounts [ currentCycle ]
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_err" , currentCycle ), err . Error ())
} else {
paid += amountToPay
overdue = 0
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_paid_amount" , currentCycle ), amountToPay )
}
workflow . SetVar ( ctx , "amountToPay" , 0 )
workflow . SetVar ( ctx , fmt . Sprintf ( "cycle_%d_completed_at" , currentCycle ), workflow . GetCurrentTimestamp ( ctx ))
currentCycle += 1
}
return & SubscriptionWorkflowOutput {
Paid : paid ,
Overdue : overdue ,
}, nil
}참고 #1 : 워크 플로 코드에 값 비싼 작업 (IO 운영, 외부 서비스 통화 등)을 넣지 마십시오. 대신 활동 코드에 넣으십시오.
ActivityWorker 및 WorkflowWorker 포함한 근로자는 활동 및 워크 플로 코드를 실행할 책임이 있습니다.
aw , err := worker . NewActivityWorkersBuilder ().
WithName ( "demo activity worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterActivities (
PaymentActivity ,
).
WithActivityWorkerOpts (
activity_worker . WithTaskProcessorMaxBackoffInterval ( 1 * time . Minute ),
).
Build () ww , err := worker . NewWorkflowWorkersBuilder ().
WithName ( "demo workflow worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterWorkflows (
SubscriptionWorkflow ,
). Build ()모든 작품을 정리하면 작업자 프로그램을 구현할 수 있습니다.
func main () {
ctx := context . Background ()
logger , err := examples . GetLogger ()
if err != nil {
panic ( err )
}
be , err := examples . InitPSQLBackend ( logger )
if err != nil {
panic ( err )
}
aw , err := worker . NewActivityWorkersBuilder ().
WithName ( "demo activity worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterActivities (
PaymentActivity ,
).
WithActivityWorkerOpts (
activity_worker . WithTaskProcessorMaxBackoffInterval ( 1 * time . Minute ),
).
Build ()
if err != nil {
panic ( err )
}
ww , err := worker . NewWorkflowWorkersBuilder ().
WithName ( "demo workflow worker" ).
WithBackend ( be ).
WithLogger ( logger ).
RegisterWorkflows (
SubscriptionWorkflow ,
). Build ()
if err != nil {
panic ( err )
}
aw . Start ( ctx )
defer aw . Stop ( ctx )
ww . Start ( ctx )
defer ww . Stop ( ctx )
//
sigs := make ( chan os. Signal , 1 )
signal . Notify ( sigs , syscall . SIGINT , syscall . SIGTERM )
<- sigs
}
작업자 인스턴스를 실행 한 후에는 코드를 작성하여 워크 플로를 시작하고 결과를 기다릴 수 있습니다.
워크 플로를 예약하려면 ScheduleWorkflow 호출 한 다음 필요한 매개 변수를 작성하여 워크 플로를 시작하십시오.
err := client . ScheduleWorkflow ( ctx , be , SubscriptionWorkflow , & SubscriptionWorkflowInput {
TotalAmount : totalAmount ,
Cycles : cycles ,
}, client. WorkflowScheduleOptions {
WorkflowID : workflowID ,
Version : "1" ,
})
먼저, 실행중인 워크 플로를 디버깅하려면 워크 플로 내에 여러 런타임 변수를 넣어야합니다.
런타임 변수를 수정하는 데 사용되는 메소드 SetVar[T any](ctx context.Context, name string, value T) 사용합니다. 그런 다음 워크 플로우 데 버거를 사용하여 해당 변수를 제거하여 현재 런타임 상태를 디버깅합니다.
dbg := debug . NewWorkflowDebugger ( be )
vars , err := dbg . QueryUserDefinedVars ( SubscriptionWorkflow , workflowID )
if err != nil {
panic ( err )
}
PrettyPrint ( vars )
워크 플로 실행이 완료되고 결과를 얻을 때까지 기다리려면 AwaitWorkflowResult 방법을 호출하십시오.
workflowResult , workflowErr , err := client . AwaitWorkflowResult ( ctx , be , SubscriptionWorkflow , workflowID )
위의 모든 코드는 subscription_with_debug 예제에서 가져 왔습니다
예를 참조하십시오