Skip to content

PithomLabs/rea

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

7 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Rea Framework

High-level abstraction framework for building robust, durable distributed applications with Restate

The Rea ("Restate Enhanced Abstractions") framework is a comprehensive Go library that wraps the Restate SDK with opinionated best practices, type safety, and developer-friendly APIs. It minimizes boilerplate and provides robust patterns for distributed orchestration.

Package details at https://pkg.go.dev/github.com/pithomlabs/rea


🎯 Core Philosophy

Control Plane vs Data Plane Separation

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    CONTROL PLANE                             β”‚
β”‚  (Orchestration Layer - Workflows, Sagas, Coordination)     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  β€’ Workflow Orchestrators (WorkflowContext)                  β”‚
β”‚  β€’ Virtual Object Coordinators (ObjectContext)               β”‚
β”‚  β€’ Saga Controllers (compensation logic)                     β”‚
β”‚  β€’ Promise/Awakeable Coordination                            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                  β”‚ Invokes via Request/RequestFuture/Send
                  β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     DATA PLANE                               β”‚
β”‚     (Execution Layer - Business Logic, Side Effects)        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  β€’ Stateless Services (Basic Service - Context)              β”‚
β”‚  β€’ Stateful Services (Virtual Objects - read-only queries)   β”‚
β”‚  β€’ External Integration Services (Run wrappers)              β”‚
β”‚  β€’ Compute/Transform Services                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“š Categories & Features

1. Service Classification & Service Types

Clear separation between orchestration and business logic with type-safe abstractions.

Service Types:

  • Control Plane Services - Workflow orchestration, saga management, coordination
  • Data Plane Services - Business logic execution, external calls, side effects
  • Virtual Objects - Stateful, key-addressable services with single-writer semantics
  • Workflows - Long-running orchestrations with durable promises and human-in-the-loop

Client Types:

  • ServiceClient[I, O] - For stateless services
  • ObjectClient[I, O] - For Virtual Objects (key-based addressing)
  • WorkflowClient[I, O] - For workflow lifecycle (Submit, Attach, Signal)

πŸ“– Service Patterns Guide


2. Durable Saga Framework

Distributed transaction compensation with automatic retry, dead-letter queue, and compensation strategies.

Features:

  • Pre-action compensation registration (enforced)
  • Automatic LIFO rollback on errors
  • Exponential backoff with configurable retry policies
  • Dead-letter queue for irrecoverable failures
  • Idempotent compensation validation helpers
  • Multiple compensation strategies (all, completed, best-effort, until-success)

Example:

saga := NewSaga(ctx, "payment-flow", nil)
saga.Register("charge_card", func(rc restate.RunContext, payload []byte) error {
    return refundCard(payload) // Must be idempotent!
})

// Compensation persisted BEFORE action
saga.Add("charge_card", chargeData, false)

// If error occurs, saga automatically compensates
defer saga.CompensateIfNeeded(&err)

πŸ“– Saga Guide


3. Type-Safe State Management

Runtime-enforced read/write permissions with compile-time type safety.

Features:

  • State[T] - Type-safe state accessor with runtime validation
  • Read-only contexts (ObjectSharedContext, WorkflowSharedContext) reject writes
  • Exclusive contexts (ObjectContext, WorkflowContext) allow writes
  • Automatic error handling for invalid context usage

Example:

// Read-safe from any context
balance := NewState[int](ctx, "balance")
value, err := balance.Get()

// Write requires exclusive context
if err := balance.Set(1000); err != nil {
    // Returns error if called from shared context
}

πŸ“– Type Safety Guide


4. Workflow Automation & Retention

Utilities for long-running workflows with state retention policies.

Features:

  • WorkflowTimer - Durable timers and sleep utilities
  • PromiseRacer - Race promises against timeouts
  • WorkflowLoop - Safe looping with iteration limits
  • WorkflowStatus - Progress tracking via shared handlers
  • WorkflowConfig - State retention and cleanup policies

Workflow Configuration:

cfg := WorkflowConfig{
    StateRetentionDays:      30,  // Restate retention limit
    AutoCleanupOnCompletion: true, // Purge state on success
    MaxStateSizeBytes:       1_000_000, // Warn threshold
}

πŸ“– Workflow Automation Guide


5. Idempotency Key Management

Auto-detection of unnecessary idempotency keys with validation and framework policy controls.

Key Principle: Use idempotency keys for cross-invocation protection, not within journaled handlers.

When to use: βœ… External calls (ingress)
βœ… Cross-handler attach semantics
βœ… Deduplication across invocations

When NOT to use: ❌ Same-handler execution (journaling provides guarantees)
❌ Sequential calls within same handler
❌ Fire-and-forget Send within handler

Auto-Detection:

// Framework detects and warns about unnecessary keys
client.Call(ctx, input, CallOption{
    IdempotencyKey: "unnecessary", // ⚠️ Logged as redundant
})

πŸ“– Idempotency Guide


6. Side Effects (Run Wrappers)

Context-capture prevention and retry utilities for durable side effects.

Features:

  • RunDo[T] - Execute side effects with result
  • RunDoVoid - Execute void side effects
  • RunWithRetry - Automatic retry with exponential backoff
  • RunAsync / RunAsyncWithRetry - Asynchronous execution
  • Anti-pattern guards (prevents accidental context capture)

Example:

// βœ… Correct: Only uses RunContext inside Run block
user, err := RunDo(ctx, func(rc restate.RunContext) (User, error) {
    return fetchUserFromDB(userID) // External call
}, restate.WithName("fetch-user"))

// ❌ Wrong: Captures outer ctx (causes non-determinism)
// restate.Run(ctx, func(rc restate.RunContext) {
//     ctx.Sleep(time.Second) // ❌ Uses outer ctx!
// })

πŸ“– Run Side Effects Guide


7. Concurrency Patterns

Type-safe concurrent execution with deterministic coordination.

Features:

  • RequestFuture - Start concurrent calls, wait later
  • restate.Wait() - Wait for all futures (fan-in)
  • restate.WaitFirst() - Race multiple futures
  • RunAsync - Concurrent side effects
  • Iterator-based result collection (no channels needed)

Example:

// Fan-out: Start concurrent calls
fut1 := inventoryClient.RequestFuture(ctx, "product-1", req)
fut2 := paymentClient.RequestFuture(ctx, "account-1", req)

// Fan-in: Collect results deterministically
for fut, err := range restate.Wait(ctx, fut1, fut2) {
    if err != nil {
        return err
    }
    // Process result
}

πŸ“– Concurrency Patterns Guide


8. Security & Validation

Cryptographic request validation and input/output validation.

Security Features:

  • Ed25519 signature verification
  • HTTPS enforcement
  • Origin whitelisting
  • Request replay protection
  • Configurable validation modes (strict, permissive, disabled)

Input Validation:

type OrderRequest struct {
    Amount   float64 `validate:"required,gt=0"`
    Quantity int     `validate:"required,min=1,max=100"`
}

// Automatic validation with framework
if err := ValidateInput(req); err != nil {
    return restate.TerminalError(err, 400)
}

πŸ“– Security Guide


9. Ingress Client (External Calls)

Type-safe HTTP client for calling Restate from outside handler context.

Client Types:

  • IngressClient - Base HTTP client with auth
  • IngressServiceClient - For stateless services
  • IngressObjectClient - For Virtual Objects
  • IngressWorkflowClient - For workflows

Usage:

// External application calling Restate
ingressClient := NewIngressClient("http://localhost:8080", "auth-key")

// Call service from outside
svcClient := IngressService[Order, OrderResult](
    ingressClient, "OrderService", "create",
)
result, err := svcClient.Call(context.Background(), order, 
    IngressCallOption{
        IdempotencyKey: "order-123", // βœ… Required for ingress
    },
)

⚠️ Critical: Never use IngressClient inside a Restate handler - bypasses journaling!

πŸ“– Ingress Client Guide


10. Observability & Metrics

Prometheus-compatible metrics, OpenTelemetry tracing, and custom hooks.

Features:

  • MetricsCollector - Counters, gauges, histograms
  • TracingContext - OpenTelemetry span creation
  • ObservabilityHooks - Custom event callbacks
  • InstrumentedServiceClient - Automatic observability for calls

Example:

metrics := NewMetricsCollector()
tracing := NewTracingContext(ctx)
hooks := DefaultObservabilityHooks(logger)

// Wrap client with automatic observability
instrumentedClient := NewInstrumentedClient(
    client, metrics, tracing, hooks,
)

// All calls automatically traced and metered
result, err := instrumentedClient.Call(ctx, req)

// Export metrics for Prometheus
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
    json.NewEncoder(w).Encode(metrics.GetMetrics())
})

πŸ“– Observability Guide


11. Framework Policy & Guardrails

Unified policy control for all runtime validations.

Policy Modes:

  • PolicyStrict - Fail-fast on violations (CI/production)
  • PolicyWarn - Log warnings but continue (development)
  • PolicyDisabled - Skip validation (testing only)

Configuration:

// Environment-based auto-detection
export RESTATE_FRAMEWORK_POLICY=strict  # CI=true defaults to strict

// Or set programmatically
SetFrameworkPolicy(PolicyStrict)

Guardrail Coverage:

  • Idempotency key validation
  • State write permission checks
  • Security validation
  • Saga compensation registration
  • Context usage validation

πŸ“– Framework Policy


12. Microservices Orchestration

High-level patterns for coordinating multiple services.

Patterns:

  • Saga Pattern - Distributed transactions with compensation
  • Circuit Breaker - Fault tolerance for external services
  • Request Aggregation - Combine multiple service calls
  • Service Mesh Integration - Distributed tracing context propagation
  • Human-in-the-Loop - Approval workflows with awakeables

πŸ“– Microservices Orchestration Guide


πŸš€ Quick Start

Installation

import framework "github.com/pithomlabs/rea"

Basic Service

type GreetingService struct{}

func (GreetingService) Greet(ctx restate.Context, name string) (string, error) {
    return fmt.Sprintf("Hello, %s!", name), nil
}

func main() {
    server := restate.NewRestate()
    server.Bind(restate.Reflect(GreetingService{}))
    server.Start(context.Background(), ":9080")
}

Workflow with Saga

type OrderWorkflow struct{}

func (OrderWorkflow) Run(ctx restate.WorkflowContext, order Order) (err error) {
    saga := framework.NewSaga(ctx, "order-flow", nil)
    defer saga.CompensateIfNeeded(&err)
    
    // Register compensations
    saga.Register("charge_payment", refundPayment)
    saga.Add("charge_payment", order.PaymentData, false)
    
    // Call services
    inventoryClient := framework.ServiceClient[Item, bool]{
        ServiceName: "Inventory",
        HandlerName: "Reserve",
    }
    
    available, err := inventoryClient.Call(ctx, order.Item)
    if err != nil || !available {
        return err // Saga auto-compensates
    }
    
    return nil
}

Virtual Object

type CartObject struct{}

func (CartObject) AddItem(ctx restate.ObjectContext, item Item) error {
    cartState := framework.NewState[Cart](ctx, "cart")
    
    cart, err := cartState.Get()
    if err != nil {
        cart = Cart{Items: []Item{}}
    }
    
    cart.Items = append(cart.Items, item)
    return cartState.Set(cart)
}

πŸ“– Complete Documentation

Core Concepts

Workflows & Orchestration

Concurrency & Side Effects

Best Practices

External Integration

Observability

Configuration & Policy

Reference


βœ… Best Practices Summary

βœ… DO

  1. Use type-specific clients - ObjectClient for Virtual Objects, WorkflowClient for workflows
  2. Wrap external calls in RunDo - Ensures determinism and durability
  3. Register saga compensations BEFORE actions - Framework enforces this
  4. Use idempotency keys for external/ingress calls - Cross-invocation protection
  5. Use framework policy in CI - export RESTATE_FRAMEWORK_POLICY=strict
  6. Instrument service clients - Automatic observability with InstrumentedServiceClient

❌ DON'T

  1. Don't use IngressClient inside handlers - Bypasses journaling, breaks durability
  2. Don't use idempotency keys within same handler - Redundant, journaling provides guarantees
  3. Don't capture outer context in restate.Run - Use RunDo to prevent mistakes
  4. Don't use time.Now() or rand directly - Use deterministic helpers
  5. Don't sleep in exclusive object handlers - Blocks all requests to that key
  6. Don't forget workflow retention limits - Default 24 hours, configure appropriately

πŸŽ“ Anti-Patterns Prevented

The framework actively guards against common Restate anti-patterns:

Anti-Pattern Protection Reference
Using outer context in restate.Run RunDo / RunDoVoid wrappers Run Guide
Unnecessary idempotency keys Auto-detection with warnings Idempotency Guide
State writes from read-only contexts Runtime validation Type Safety Guide
Missing saga compensations Enforced registration Saga Guide
Non-deterministic iteration Ordered map helpers Concurrency Guide
Blocking exclusive handlers Static analysis warnings Service Patterns

πŸ—οΈ Architecture Decisions

Why Control Plane / Data Plane?

Control Plane (Orchestration):

  • Manages "what to do" and "in what order"
  • Uses state for coordination, not business data
  • Calls Data Plane services for actual work
  • Implements compensation logic

Data Plane (Execution):

  • Implements "how to do it"
  • Contains business logic and side effects
  • No orchestration concerns
  • Stateless where possible

This separation ensures:

  • Clear responsibility boundaries
  • Easier testing (mock data plane in control plane tests)
  • Better observability (orchestration vs execution metrics)
  • Simpler reasoning about failures

Why Type-Specific Clients?

Before:

// Unclear what this service is
client := ServiceClient[I, O]{...}
client.Call(ctx, input) // Missing key for objects!

After:

// Clear this is a Virtual Object
client := ObjectClient[I, O]{...}
client.Call(ctx, key, input) // Key required at compile time

Benefits:

  • Compiler enforces correct parameters
  • Self-documenting code
  • Fewer runtime errors
  • Better IDE autocomplete

🀝 Contributing

Contributions should:

  1. Follow the Control Plane / Data Plane separation
  2. Add guardrails for new anti-patterns discovered
  3. Include comprehensive documentation
  4. Provide examples using this framework

πŸ“ License

MIT License


πŸ”— Resources


Made with ❀️ for building reliable distributed systems

About

High-level abstraction framework for building robust, durable distributed applications using Restate Go SDK (in short, the business logic layer)

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages