A comprehensive guide for implementing Conductor SDKs across all languages (Java, Go, C#, JavaScript/TypeScript, Clojure) based on the Python SDK reference architecture.
This guide provides a complete blueprint for creating or refactoring Conductor SDKs to match the architecture, API design, and documentation standards established in the Python SDK. Each language should maintain its idiomatic patterns while following the core architectural principles.
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β (User's Application Code) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β High-Level Clients β
β (OrkesClients, WorkflowExecutor, Workers) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Domain-Specific Clients β
β (TaskClient, WorkflowClient, SecretClient...) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Orkes Implementations β
β (OrkesTaskClient, OrkesWorkflowClient...) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Resource API Layer β
β (TaskResourceApi, WorkflowResourceApi...) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β HTTP/API Client β
β (ApiClient, HTTP Transport) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
AbstractClient (Interface/ABC)
β
OrkesBaseClient (Shared Implementation)
β
OrkesSpecificClient (Concrete Implementation)
conductor-{language}/
βββ src/
β βββ conductor/
β βββ client/
β β βββ {domain}_client.{ext} # Abstract interfaces
β β βββ orkes/
β β β βββ orkes_base_client.{ext}
β β β βββ orkes_{domain}_client.{ext}
β β β βββ models/
β β βββ http/
β β β βββ api/ # Generated from OpenAPI
β β β β βββ *_resource_api.{ext}
β β β βββ models/ # Generated models
β β β βββ api_client.{ext}
β β βββ automator/
β β β βββ task_runner.{ext}
β β β βββ async_task_runner.{ext}
β β βββ configuration/
β β β βββ configuration.{ext}
β β β βββ settings/
β β βββ worker/
β β β βββ worker_task.{ext}
β β β βββ worker_discovery.{ext}
β β βββ workflow/
β β βββ conductor_workflow.{ext}
β β βββ task/
βββ examples/
β βββ workers_e2e.{ext} # End-to-end example
β βββ {feature}_journey.{ext} # 100% API coverage demos
β βββ README.md # Examples catalog
βββ docs/
β βββ AUTHORIZATION.md # 49 APIs
β βββ METADATA.md # 21 APIs
β βββ INTEGRATION.md # 28+ providers
β βββ TASK_MANAGEMENT.md # 11 APIs
β βββ SECRET_MANAGEMENT.md # 9 APIs
β βββ WORKFLOW_TESTING.md
β βββ ...
βββ tests/
βββ unit/
βββ integration/
βββ e2e/
- Create Configuration class with builder pattern
- Support environment variables
- Implement hierarchical configuration (all β domain β task)
- Add authentication settings (key/secret, token)
- Include retry configuration
- Add connection pooling settings
- Generate models from OpenAPI specification
- Generate resource API classes
- Implement ApiClient with:
- Connection pooling
- Retry logic with exponential backoff
- Request/response interceptors
- Error handling and mapping
- Metrics collection hooks
- Create abstract base clients (interfaces)
- Implement OrkesBaseClient aggregating all APIs
- Add proper dependency injection
- Implement client factory pattern
For each domain, implement:
Abstract Interface (11 methods):
- poll_task(task_type, worker_id?, domain?)
- batch_poll_tasks(task_type, worker_id?, count?, timeout?, domain?)
- get_task(task_id)
- update_task(task_result)
- update_task_by_ref_name(workflow_id, ref_name, status, output, worker_id?)
- update_task_sync(workflow_id, ref_name, status, output, worker_id?)
- get_queue_size_for_task(task_type)
- add_task_log(task_id, message)
- get_task_logs(task_id)
- get_task_poll_data(task_type)
- signal_task(workflow_id, ref_name, data)
Abstract Interface (20+ methods):
- start_workflow(start_request)
- get_workflow(workflow_id, include_tasks?)
- get_workflow_status(workflow_id, include_output?, include_variables?)
- delete_workflow(workflow_id, archive?)
- terminate_workflow(workflow_id, reason?, trigger_failure?)
- pause_workflow(workflow_id)
- resume_workflow(workflow_id)
- restart_workflow(workflow_id, use_latest_def?)
- retry_workflow(workflow_id, resume_subworkflow?)
- rerun_workflow(workflow_id, rerun_request)
- skip_task_from_workflow(workflow_id, task_ref, skip_request)
- test_workflow(test_request)
- search(start?, size?, free_text?, query?)
- execute_workflow(start_request, request_id?, wait_until?, wait_seconds?)
[... additional methods]
- Create worker registration system
- Implement task discovery
- Add worker lifecycle management
- Support both sync and async workers
- Implement TaskRunner with thread pool
- Implement AsyncTaskRunner with event loop
- Add metrics collection
- Implement graceful shutdown
- Add health checks
- Task context injection
- Automatic retries
- TaskInProgress support for long-running tasks
- Error handling (retryable vs terminal)
- Worker discovery from packages
- Implement ConductorWorkflow builder
- Add all task types (Simple, HTTP, Switch, Fork, DoWhile, etc.)
- Support method chaining
- Add workflow validation
- Implement workflow testing utilities
-
workers_e2e- Complete end-to-end example -
worker_example- Worker patterns -
task_context_example- Long-running tasks -
workflow_example- Workflow creation -
test_workflows- Testing patterns
-
authorization_journey- All 49 authorization APIs -
metadata_journey- All 21 metadata APIs -
integration_journey- All integration providers -
schedule_journey- All 15 schedule APIs -
prompt_journey- All 8 prompt APIs -
secret_journey- All 9 secret APIs
- Create all API reference documents (see Documentation section)
- Add Quick Start for each module
- Include complete working examples
- Document all models
- Add error handling guides
- Include best practices
// Package Structure
com.conductor.sdk/
βββ client/
β βββ TaskClient.java // Interface
β βββ orkes/
β β βββ OrkesBaseClient.java
β β βββ OrkesTaskClient.java // Implementation
β βββ http/
β βββ api/ // Generated
β βββ models/ // Generated
// Client Pattern
public interface TaskClient {
Optional<Task> pollTask(String taskType, String workerId, String domain);
List<Task> batchPollTasks(String taskType, BatchPollRequest request);
// ... other methods
}
public class OrkesTaskClient extends OrkesBaseClient implements TaskClient {
@Override
public Optional<Task> pollTask(String taskType, String workerId, String domain) {
return Optional.ofNullable(
taskResourceApi.poll(taskType, workerId, domain)
);
}
}
// Configuration
Configuration config = Configuration.builder()
.serverUrl("http://localhost:8080/api")
.authentication(keyId, keySecret)
.connectionPool(10, 30, TimeUnit.SECONDS)
.retryPolicy(3, 1000)
.build();
// Worker Pattern
@WorkerTask("process_order")
public class OrderProcessor implements Worker {
@Override
public TaskResult execute(Task task) {
OrderInput input = task.getInputData(OrderInput.class);
// Process
return TaskResult.complete(output);
}
}
// Task Runner
TaskRunnerConfigurer configurer = TaskRunnerConfigurer.builder()
.configuration(config)
.workers(new OrderProcessor(), new PaymentProcessor())
.threadCount(10)
.build();
configurer.start();// Package Structure
github.com/conductor-oss/conductor-go/
βββ client/
β βββ task_client.go // Interface
β βββ orkes/
β β βββ base_client.go
β β βββ task_client.go // Implementation
β βββ http/
β βββ api/ // Generated
β βββ models/ // Generated
// Client Pattern
type TaskClient interface {
PollTask(ctx context.Context, taskType string, opts ...PollOption) (*Task, error)
BatchPollTasks(ctx context.Context, taskType string, opts ...PollOption) ([]*Task, error)
// ... other methods
}
type orkesTaskClient struct {
*BaseClient
api *TaskResourceAPI
}
func (c *orkesTaskClient) PollTask(ctx context.Context, taskType string, opts ...PollOption) (*Task, error) {
options := &pollOptions{}
for _, opt := range opts {
opt(options)
}
return c.api.Poll(ctx, taskType, options.WorkerID, options.Domain)
}
// Configuration
config := client.NewConfig(
client.WithServerURL("http://localhost:8080/api"),
client.WithAuthentication(keyID, keySecret),
client.WithConnectionPool(10, 30*time.Second),
client.WithRetryPolicy(3, time.Second),
)
// Worker Pattern
type OrderProcessor struct{}
func (p *OrderProcessor) TaskType() string {
return "process_order"
}
func (p *OrderProcessor) Execute(ctx context.Context, task *Task) (*TaskResult, error) {
var input OrderInput
if err := task.GetInputData(&input); err != nil {
return nil, err
}
// Process
return NewTaskResultComplete(output), nil
}
// Task Runner
runner := worker.NewTaskRunner(
worker.WithConfig(config),
worker.WithWorkers(&OrderProcessor{}, &PaymentProcessor{}),
worker.WithThreadCount(10),
)
runner.Start(ctx)// Package Structure
@conductor-oss/conductor-sdk/
βββ src/
β βββ client/
β β βββ TaskClient.ts // Interface
β β βββ orkes/
β β β βββ OrkesBaseClient.ts
β β β βββ OrkesTaskClient.ts // Implementation
β β βββ http/
β β βββ api/ // Generated
β β βββ models/ // Generated
// Client Pattern
export interface TaskClient {
pollTask(taskType: string, workerId?: string, domain?: string): Promise<Task | null>;
batchPollTasks(taskType: string, options?: BatchPollOptions): Promise<Task[]>;
// ... other methods
}
export class OrkesTaskClient extends OrkesBaseClient implements TaskClient {
async pollTask(taskType: string, workerId?: string, domain?: string): Promise<Task | null> {
return await this.taskApi.poll(taskType, { workerId, domain });
}
}
// Configuration
const config = new Configuration({
serverUrl: 'http://localhost:8080/api',
authentication: {
keyId: 'your-key',
keySecret: 'your-secret'
},
connectionPool: {
maxConnections: 10,
keepAliveTimeout: 30000
},
retry: {
maxAttempts: 3,
backoffMs: 1000
}
});
// Worker Pattern (Decorators)
@WorkerTask('process_order')
export class OrderProcessor implements Worker {
async execute(task: Task): Promise<TaskResult> {
const input = task.inputData as OrderInput;
// Process
return TaskResult.complete(output);
}
}
// Worker Pattern (Functional)
export const processOrder = workerTask('process_order', async (task: Task) => {
const input = task.inputData as OrderInput;
// Process
return output;
});
// Task Runner
const runner = new TaskRunner({
config,
workers: [OrderProcessor, PaymentProcessor],
// or functional: workers: [processOrder, processPayment],
options: {
threadCount: 10,
pollInterval: 100
}
});
await runner.start();// Package Structure
Conductor.Client/
βββ Client/
β βββ ITaskClient.cs // Interface
β βββ Orkes/
β β βββ OrkesBaseClient.cs
β β βββ OrkesTaskClient.cs // Implementation
β βββ Http/
β βββ Api/ // Generated
β βββ Models/ // Generated
// Client Pattern
public interface ITaskClient
{
Task<ConductorTask?> PollTaskAsync(string taskType, string? workerId = null, string? domain = null);
Task<List<ConductorTask>> BatchPollTasksAsync(string taskType, BatchPollOptions? options = null);
// ... other methods
}
public class OrkesTaskClient : OrkesBaseClient, ITaskClient
{
public async Task<ConductorTask?> PollTaskAsync(string taskType, string? workerId = null, string? domain = null)
{
return await TaskApi.PollAsync(taskType, workerId, domain);
}
}
// Configuration
var config = new Configuration
{
ServerUrl = "http://localhost:8080/api",
Authentication = new AuthenticationSettings
{
KeyId = "your-key",
KeySecret = "your-secret"
},
ConnectionPool = new PoolSettings
{
MaxConnections = 10,
KeepAliveTimeout = TimeSpan.FromSeconds(30)
},
Retry = new RetryPolicy
{
MaxAttempts = 3,
BackoffMs = 1000
}
};
// Worker Pattern (Attributes)
[WorkerTask("process_order")]
public class OrderProcessor : IWorker
{
public async Task<TaskResult> ExecuteAsync(ConductorTask task)
{
var input = task.GetInputData<OrderInput>();
// Process
return TaskResult.Complete(output);
}
}
// Task Runner
var runner = new TaskRunner(config)
.AddWorker<OrderProcessor>()
.AddWorker<PaymentProcessor>()
.WithOptions(new RunnerOptions
{
ThreadCount = 10,
PollInterval = TimeSpan.FromMilliseconds(100)
});
await runner.StartAsync();| Operation | Method Pattern | Example |
|---|---|---|
| Create | create{Resource} / save{Resource} |
createWorkflow, saveSchedule |
| Read (single) | get{Resource} |
getTask, getWorkflow |
| Read (list) | list{Resources} / getAll{Resources} |
listTasks, getAllSchedules |
| Update | update{Resource} |
updateTask, updateWorkflow |
| Delete | delete{Resource} |
deleteWorkflow, deleteSecret |
| Search | search{Resources} |
searchWorkflows, searchTasks |
| Execute | {action}{Resource} |
pauseWorkflow, resumeSchedule |
| Test | test{Resource} |
testWorkflow |
Required parameters: Direct method parameters
Optional parameters: Options object or builder pattern
Example:
- pollTask(taskType: string, options?: PollOptions)
- updateTask(taskId: string, result: TaskResult)
docs/
βββ AUTHORIZATION.md # 49 APIs - User, Group, Application, Permissions
βββ METADATA.md # 21 APIs - Task & Workflow definitions
βββ INTEGRATION.md # 28+ providers - AI/LLM integrations
βββ PROMPT.md # 8 APIs - Prompt template management
βββ SCHEDULE.md # 15 APIs - Workflow scheduling
βββ SECRET_MANAGEMENT.md # 9 APIs - Secret storage
βββ TASK_MANAGEMENT.md # 11 APIs - Task operations
βββ WORKFLOW.md # Workflow operations
βββ WORKFLOW_TESTING.md # Testing guide
βββ WORKER.md # Worker implementation
βββ README.md # SDK overview
# [Module] API Reference
Complete API reference for [module] operations in Conductor [Language] SDK.
> π **Complete Working Example**: See [example.ext] for comprehensive implementation.
## Quick Start
```language
// 10-15 line minimal example| Method | Endpoint | Description | Example |
|---|---|---|---|
methodName() |
HTTP_VERB /path |
Description | Link |
[Detailed examples for each API method]
[Model/class definitions]
[Common errors and handling patterns]
[Good vs bad examples with β and β]
[50-150 line runnable example]
---
## π§ͺ Testing Requirements
### Test Coverage Goals
| Component | Unit Tests | Integration Tests | E2E Tests |
|-----------|------------|-------------------|-----------|
| Clients | 90% | 80% | - |
| Workers | 95% | 85% | 70% |
| Workflow DSL | 90% | 80% | - |
| Examples | - | 100% | 100% |
tests/
βββ unit/
β βββ client/
β β βββ test_task_client.{ext}
β β βββ test_workflow_client.{ext}
β βββ worker/
β β βββ test_worker_discovery.{ext}
β βββ workflow/
β βββ test_workflow_builder.{ext}
βββ integration/
β βββ test_worker_execution.{ext}
β βββ test_workflow_execution.{ext}
β βββ test_error_handling.{ext}
βββ e2e/
βββ test_authorization_journey.{ext}
βββ test_complete_flow.{ext}
- Follows layered architecture pattern
- Maintains separation of concerns
- Uses dependency injection
- Implements proper abstractions
- Consistent method naming
- Predictable parameter patterns
- Strong typing with models
- Comprehensive error handling
- 100% API coverage
- Quick start for each module
- Complete working examples
- Best practices documented
- >90% unit test coverage
- Integration tests for all APIs
- Journey tests demonstrate 100% API usage
- Examples are executable tests
- Intuitive API design
- Excellent IDE support
- Clear error messages
- Comprehensive logging
Before considering an SDK complete:
- Follows language idioms
- Consistent code style
- No code duplication
- Proper error handling
- Comprehensive logging
- All 49 Authorization APIs
- All 21 Metadata APIs
- All 15 Schedule APIs
- All 11 Task APIs
- All 9 Secret APIs
- All 8 Prompt APIs
- All Integration providers
- All API docs created
- Quick starts work
- Examples run successfully
- Cross-references valid
- No broken links
- Unit test coverage >90%
- Integration tests pass
- Journey examples complete
- CI/CD configured
- Published to package registry
- Versioning follows semver
- CHANGELOG maintained
- LICENSE included
- OpenAPI Generator for API/models
- Custom generators for boilerplate
- Language-appropriate build tool
- Dependency management
- Version management
- Package publishing
- Unit tests on every commit
- Integration tests on PR
- Documentation generation
- Package publishing on release
For SDK implementation questions:
- Reference Python SDK for patterns
- Check this guide for architecture
- Maintain consistency across SDKs
- Prioritize developer experience
Remember: The goal is to make Conductor easy to use in every language while maintaining consistency and completeness.
After every coding session, run the full test suite to ensure zero failures:
# All suites β expect 0 failures, 0 errors
python3 -m pytest tests/unit tests/backwardcompatibility tests/serdesertest tests/chaos tests/integration -v
# Expected results:
# Unit tests: ~626 passed
# Backward compatibility: ~1015 passed
# Serialization: ~58 passed
# Chaos: 2 skipped (require special setup)
# Integration: 128 skipped (require live Conductor server)
# TOTAL: 0 failures, 0 errorsIntegration tests skip gracefully when the Conductor server is not available (no CONDUCTOR_SERVER_URL / CONDUCTOR_AUTH_KEY / CONDUCTOR_AUTH_SECRET env vars). When a server is available, they run against it. There should be NO failures in any suite.