Created
July 30, 2025 03:23
-
-
Save arun0009/2d3d13bfbb027fb6a34a319f5c1396f3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"log" | |
"strings" | |
"github.com/restatedev/sdk-go/restate" | |
"github.com/restatedev/sdk-go/server" | |
) | |
// WorkflowInput defines the input to the workflow. | |
type WorkflowInput struct { | |
AssetID int `json:"assetId"` | |
WorkflowID string `json:"workflowId"` | |
CustomAttributes map[string]interface{} `json:"customAttributes"` | |
} | |
// WorkflowConfig defines the JSON workflow structure. | |
type WorkflowConfig struct { | |
ID string `json:"id"` | |
AssetType string `json:"assetType"` | |
Stages []StageConfig `json:"stages"` | |
Transitions []TransitionConfig `json:"transitions"` | |
} | |
// StageConfig defines a stage in the workflow. | |
type StageConfig struct { | |
ID string `json:"id"` | |
Name string `json:"name"` | |
Functionalities []Functionality `json:"functionalities"` | |
} | |
// Functionality defines stage-specific actions (e.g., notifications). | |
type Functionality struct { | |
Type string `json:"type"` | |
Channel string `json:"channel"` | |
Recipients []string `json:"recipients"` | |
Message string `json:"message"` | |
} | |
// TransitionConfig defines a state transition. | |
type TransitionConfig struct { | |
FromStage string `json:"fromStage"` | |
ToStage string `json:"toStage"` | |
Action string `json:"action"` | |
} | |
// loadWorkflow simulates loading a JSON workflow configuration (e.g., from a database). | |
// For demonstration, it returns a hardcoded JSON workflow for a "Document" asset type. | |
func loadWorkflow(workflowID string) (WorkflowConfig, error) { | |
configJSON := `{ | |
"id": "document-workflow", | |
"assetType": "Document", | |
"stages": [ | |
{ | |
"id": "draft", | |
"name": "Draft", | |
"functionalities": [ | |
{ | |
"type": "notification", | |
"channel": "email", | |
"recipients": ["[email protected]"], | |
"message": "Document {asset_id} created in Draft stage" | |
} | |
] | |
}, | |
{ | |
"id": "review", | |
"name": "Review", | |
"functionalities": [ | |
{ | |
"type": "notification", | |
"channel": "email", | |
"recipients": ["[email protected]"], | |
"message": "Document {asset_id} moved to Review stage" | |
} | |
] | |
}, | |
{ | |
"id": "approved", | |
"name": "Approved", | |
"functionalities": [ | |
{ | |
"type": "notification", | |
"channel": "email", | |
"recipients": ["[email protected]"], | |
"message": "Document {asset_id} approved" | |
} | |
] | |
} | |
], | |
"transitions": [ | |
{"fromStage": "draft", "toStage": "review", "action": "submit"}, | |
{"fromStage": "review", "toStage": "approved", "action": "approve"}, | |
{"fromStage": "review", "toStage": "draft", "action": "reject"} | |
] | |
}` | |
var config WorkflowConfig | |
err := json.Unmarshal([]byte(configJSON), &config) | |
if err != nil { | |
return WorkflowConfig{}, fmt.Errorf("failed to parse workflow config: %w", err) | |
} | |
if config.ID != workflowID { | |
return WorkflowConfig{}, fmt.Errorf("workflow ID %s not found", workflowID) | |
} | |
return config, nil | |
} | |
// sendNotification simulates sending a notification (e.g., email, Slack). | |
// Replace with actual integration (e.g., SMTP, HTTP client) in production. | |
func sendNotification(ctx context.Context, channel string, recipients []string, message string, assetID int) error { | |
msg := strings.ReplaceAll(message, "{asset_id}", fmt.Sprintf("%d", assetID)) | |
log.Printf("Sending %s notification to %v: %s", channel, recipients, msg) | |
// Simulate potential transient failure (uncomment to test failure handling). | |
// if rand.Float32() < 0.1 { | |
// return fmt.Errorf("network failure") | |
// } | |
return nil | |
} | |
// findStageConfig retrieves the stage configuration by ID. | |
func findStageConfig(config WorkflowConfig, stageID string) StageConfig { | |
for _, stage := range config.Stages { | |
if stage.ID == stageID { | |
return stage | |
} | |
} | |
return StageConfig{} | |
} | |
// findTransition finds a valid transition for the given stage and action. | |
func findTransition(config WorkflowConfig, fromStage, action string) *TransitionConfig { | |
for _, t := range config.Transitions { | |
if t.FromStage == fromStage && t.Action == action { | |
return &t | |
} | |
} | |
return nil | |
} | |
// AssetWorkflow is the Restate workflow handler that processes JSON-driven workflows. | |
func AssetWorkflow(ctx restate.WorkflowContext, input WorkflowInput) restate.Workflow { | |
// Load workflow configuration | |
config, err := restate.Run(ctx, func(ctx restate.RunContext) (WorkflowConfig, error) { | |
return loadWorkflow(input.WorkflowID) | |
}) | |
if err != nil { | |
log.Printf("Failed to load workflow %s: %v", input.WorkflowID, err) | |
return ctx | |
} | |
// Initialize state if not set | |
currentStage, err := ctx.Get("currentStage") | |
if err != nil || currentStage == "" { | |
currentStage = config.Stages[0].ID | |
ctx.Set("currentStage", currentStage) | |
} | |
// Execute functionalities for the initial stage | |
currentStageConfig := findStageConfig(config, currentStage) | |
for _, f := range currentStageConfig.Functionalities { | |
if f.Type == "notification" { | |
_, err := restate.Run(ctx, func(ctx restate.RunContext) (interface{}, error) { | |
return nil, sendNotification(ctx, f.Channel, f.Recipients, f.Message, input.AssetID) | |
}) | |
if err != nil { | |
log.Printf("Notification failed for stage %s: %v", currentStage, err) | |
} | |
} | |
} | |
// Listen for actions indefinitely | |
for { | |
// Wait for an action (e.g., "submit", "approve", "reject") | |
action := restate.Await(ctx) | |
transition := findTransition(config, currentStage, action) | |
if transition == nil { | |
log.Printf("Invalid action %s for stage %s", action, currentStage) | |
continue | |
} | |
// Update stage | |
newStage := transition.ToStage | |
ctx.Set("currentStage", newStage) | |
// Execute functionalities for the new stage | |
newStageConfig := findStageConfig(config, newStage) | |
for _, f := range newStageConfig.Functionalities { | |
if f.Type == "notification" { | |
_, err := restate.Run(ctx, func(ctx restate.RunContext) (interface{}, error) { | |
return nil, sendNotification(ctx, f.Channel, f.Recipients, f.Message, input.AssetID) | |
}) | |
if err != nil { | |
log.Printf("Notification failed for stage %s: %v", newStage, err) | |
} | |
} | |
} | |
// Update current stage for next iteration | |
currentStage = newStage | |
} | |
} | |
func main() { | |
// Create and start the Restate server | |
server := server.NewRestate() | |
server.Workflow("AssetWorkflow", AssetWorkflow) | |
if err := server.Start(context.Background(), ":9080"); err != nil { | |
log.Fatalf("Failed to start server: %v", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment