From 5c3822c2953fb273db45a80cfe59a8b52999fd45 Mon Sep 17 00:00:00 2001 From: redpinecube Date: Thu, 6 Nov 2025 15:49:17 -0600 Subject: [PATCH 1/2] job creation and persistence Signed-off-by: redpinecube --- src/scheduler.go | 48 +++++++++++++++++++++++++++++++++++------------- src/util.go | 20 +++++++++++++------- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/src/scheduler.go b/src/scheduler.go index c1ca45e..f4ef6a6 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -28,30 +28,52 @@ func NewScheduler(redisAddr string, log *slog.Logger) *Scheduler { } } -func (s *Scheduler) Enqueue(jobType string, payload map[string]interface{}) error { +func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[string]interface{}) error { + // create a new job job := Job{ - ID: generateJobID(), - Type: jobType, - Payload: payload, - Retries: 0, - Created: time.Now(), + ID: generateJobID(), + Type: jobType, + Payload: payload, + Retries: 0, + Created: time.Now(), + RequiredGPU: requiredGPU, + JobState: JobStateScheduled, } - jobData, err := json.Marshal(job) + // marshal the payload + payloadJSON, err := json.Marshal(job.Payload) if err != nil { - return fmt.Errorf("failed to marshal job: %w", err) + s.log.Error("failed to marshal job payload", "error", err) + return err } - result := s.client.XAdd(s.ctx, &redis.XAddArgs{ + // start redis pipeline + pipe := s.client.Pipeline() + + // add payload to redis stream + pipe.XAdd(s.ctx, &redis.XAddArgs{ Stream: StreamName, Values: map[string]interface{}{ - "job_id": job.ID, - "data": string(jobData), + "job_id": job.ID, + "payload": string(payloadJSON), + "job_state": job.JobState, }, }) - if result.Err() != nil { - return fmt.Errorf("failed to enqueue job: %w", result.Err()) + // store metadata in a redis hash + metadataKey := fmt.Sprintf("job:%s", job.ID) + pipe.HSet(s.ctx, metadataKey, map[string]interface{}{ + "type": job.Type, + "retries": job.Retries, + "created": "created": job.Created.Format(time.RFC3339), + "required_gpu": job.RequiredGPU, + "job_state": job.JobState, + }) + + // execute pipeline + if _, err := pipe.Exec(s.ctx); err != nil { + s.log.Error("failed to enqueue job", "error", err) + return err } s.log.Info("enqueued job", "job_id", job.ID, "job_type", job.Type) diff --git a/src/util.go b/src/util.go index 2f5207e..f1816d3 100644 --- a/src/util.go +++ b/src/util.go @@ -25,13 +25,19 @@ const ( ) type Job struct { - ID string `json:"id"` - Type string `json:"type"` - Payload map[string]interface{} `json:"payload"` - Retries int `json:"retries"` - Created time.Time `json:"created"` - RequiredGPU string `json:"gpu"` - JobState JobState `json:"job_state"` + ID string `json:"id"` + Type string `json:"type"` + Payload map[string]interface{} `json:"payload"` + Retries int `json:"retries"` + Created time.Time `json:"created"` + RequiredGPU string `json:"required_gpu,omitempty"` + JobState JobState `json:"job_state"` + ConsumerID *string `json:"consumer_id,omitempty"` + TimeAssigned *time.Time `json:"time_assigned,omitempty"` + TimeStarted *time.Time `json:"time_started,omitempty"` + TimeCompleted *time.Time `json:"time_completed,omitempty"` + Result map[string]interface{} `json:"result,omitempty"` + Error *string `json:"error,omitempty"` } type SupervisorState string From 8bb2d52fbaa016948f7592ae7800438283f1b708 Mon Sep 17 00:00:00 2001 From: redpinecube Date: Mon, 10 Nov 2025 18:32:51 -0600 Subject: [PATCH 2/2] [feature] job scheduling Signed-off-by: redpinecube --- pre_compute_platform.md | 194 +++++++++++++++++++++++++++++++++++++ src/api.go | 2 +- src/int_test.go | 2 +- src/job_scheduling_test.go | 45 +++++++++ src/jobs.md | 42 ++++++++ src/scheduler.go | 77 ++++++++++++++- src/supervisor.go | 72 ++++++++++++-- src/util.go | 1 + 8 files changed, 424 insertions(+), 11 deletions(-) create mode 100644 pre_compute_platform.md create mode 100644 src/job_scheduling_test.go create mode 100644 src/jobs.md diff --git a/pre_compute_platform.md b/pre_compute_platform.md new file mode 100644 index 0000000..870ef40 --- /dev/null +++ b/pre_compute_platform.md @@ -0,0 +1,194 @@ +# Quick guide — how to SSH into the Tenstorrent “quiet box” (shared access) + +Below is a practical, copy-pasteable guide you can share with your team so people can connect to the Tenstorrent quiet box through the GCP SSH jump proxy. It covers key generation, how to share your public key, the SSH `config` entries (exactly what you asked to include), connection steps, the temporary manual reservation process via Discord, rules about containers, troubleshooting, and good safety practices. + +--- + +# 1) What we’re doing (short) + +We expose the quiet box only behind an SSH jump host on GCP. Users get access by giving us their SSH **public key**; access is time-limited until a compute reservation system exists. People must work inside their own container while using the quiet box. Reservations are handled manually through a Discord channel for now — see section 4. + +--- + +# 2) How to create and send your SSH public key + +1. Generate an SSH key pair locally (if you don’t already have one). Recommended: ed25519 (modern, compact, secure). + +```bash +# create a new key (press enter to accept default file and optionally add a passphrase) +ssh-keygen -t ed25519 -C "your.email@domain.com" +``` + +2. Print your public key and copy it: + +```bash +cat ~/.ssh/id_ed25519.pub +``` + +3. Send **only** the public key string (the `.pub` contents) to the admins which are Ambrose Ling and Kenny Cui (e.g., paste into the Discord message or via whatever secure channel the team uses). Example public key looks like: + +``` +ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI... user@host +``` + +(Do **not** send your private key — file `~/.ssh/id_ed25519`.) + +If you already use `~/.ssh/id_rsa` / other keys, you can send whichever public key you prefer — ed25519 is preferred. + +--- + +# 3) Add this to your `~/.ssh/config` + +Put the following in your SSH config file (`~/.ssh/config`) **exactly** as shown (or append if entries already exist): + +``` +Host mist-jump + HostName 34.170.88.115 + User utorontomist + +Host mist + Hostname localhost + User utmist-tt + Port 2025 + ProxyJump mist-jump +``` + +Notes: + +* After the admins add your public key to the jump host / target server, you’ll connect with: + +```bash +ssh mist +``` + +Login in with server password: `welovett2025!` + +* The `ProxyJump` causes your SSH client to first connect to `mist-jump` (the GCP host), then forward into the quiet box on port `2025` as user `utmist-tt`. + +--- + +# 4) Manual reservation (Discord workflow) + +Because there is no automated reservation system yet, we use a Discord channel to manage access. Suggested workflow to keep things fair and safe: + +1. **Channel name**: `#quietbox-reservations` (or similar). +2. **How to claim time**: Post a short message like: + +``` +Claim: @your-name — Project: — From: 2025-10-29 14:00 EDT — To: 2025-10-29 16:00 EDT +``` + +3. **Required fields**: who, project, start & end times (use local timezone, include timezone), and container image or name you’ll use. Until there is clear demand of machine usage times we will stick to this. If we see that multiple projects need to use the same machine +4. **Admins confirm**: An admin reacts/acknowledges the claim (e.g., ✅). Until a claim is acknowledged, you should not start work. +5. **During your slot**: If you finish early, post an “ended” message so others can update the slot. If you need to extend, post in the channel and wait for confirmation. +6. **Enforcement**: If someone is using the box outside their claimed time, ping the channel owner/admin to resolve. + +Suggested message template to claim: + +``` +Claiming quietbox: +- User: @ambrose +- Project: example-ml +- Start: 2025-10-29 14:00 EDT +- End: 2025-10-29 16:00 EDT +- Container Name: +``` + +--- + +# 5) Rules while using the quiet box (must-follow) + +* **Work only inside your container.** Do not run experiments on the host OS. Use Docker / Podman / the approved container runtime. +* **No background or persistent processes** outside your container. Kill any long-running jobs when your slot ends. +* **Follow agreed resource limits.** If admins set GPU/CPU limits, stay within them. +* **Clean up**: remove temporary large files from shared storage after your run, or move them to a location designated for your project. +* **Be respectful**: If another user needs time urgently, coordinate in the Discord channel. +* **Security**: never share private keys or credentials in Discord or public chat. + +--- + +# 6) Example container usage patterns + +Generic Docker example (adjust to your image and options): + +```bash +# pull image +docker pull ghcr.io/tenstorrent/tt-metal/tt-metalium/ubuntu-22.04-dev-amd64:latest + +# run container with GPU access and mount a workspace +docker run -it \ + --name your_name:project_name\ + -v /dev/hugepages-1G:/dev/hugepages-1G \ + --device /dev/tenstorrent \ + -v $(pwd):/home/utmist-tt/UTMIST \ + -v /mnt:/mnt \ + ghcr.io/tenstorrent/tt-metal/tt-metalium/ubuntu-22.04-dev-amd64:latest \ + /bin/bash +``` + +Inside the container you run your jobs. `--rm` ensures the container is removed when you exit. + +If your setup uses a different runtime (Podman, singularity), follow the project's standard container instructions. + +--- + +# 7) Connection checklist & verification + +Before connecting: + +* You’ve sent your public key to admins and they confirmed it was installed. +* Your `~/.ssh/config` contains the `mist-jump` and `mist` entries shown above. + +To connect: + +```bash +ssh mist +``` + +If you want verbose logs (helpful for troubleshooting): + +```bash +ssh -vvv mist +``` + +Quick verification steps: + +* `ssh -G mist` prints the resolved config for debugging. +* Confirm you land inside the correct user (prompt or `whoami`). +* Once connected, check you’re inside a container (e.g., `cat /proc/1/cgroup` or ask admins about the container launch process). + +--- + +# 8) Common troubleshooting + +* **Permission denied (publickey)** + + * Ensure the admin actually installed your `~/.pub` key on the target account. + * Confirm you’re using the right private key (`ssh -i ~/.ssh/id_ed25519 mist` to force it). + * Check file permissions: `~/.ssh` should be `700`, private key `600`. +* **ProxyJump failing / connection to jump host refused** + + * Confirm `mist-jump` HostName IP is reachable: `ssh -vvv mist-jump`. + * Ensure your local firewall or corporate network allows outbound SSH (port 22). +* **Port 2025 connection refused** + + * That implies the service on the jump host isn’t forwarding correctly yet; ask admins to confirm the tunnel on GCP and the quiet box service are up. +* **Agent forwarding issues** + + * If you rely on agent forwarding, make sure you started `ssh-agent` and added keys with `ssh-add`. +* **If you get unexpected host key warnings** + + * Verify fingerprints with an admin before accepting. Do **not** blindly accept host key changes. + +Use `ssh -vvv mist` and share the logs with admins if you need help. + +--- + +# 9) Session end & cleanup checklist + +Before you finish: + +* Stop any running jobs inside your container (`Ctrl+C` or `pkill` inside container). +* `exit` the container and ensure the container is stopped/removed if required. +* Log out of the SSH session (`exit` or `logout`). +* Post in Discord that your slot ended, or update the reservation message. diff --git a/src/api.go b/src/api.go index e2c3744..ccbe9f7 100644 --- a/src/api.go +++ b/src/api.go @@ -173,7 +173,7 @@ func (a *App) enqueueJob(w http.ResponseWriter, r *http.Request) { "task_id": 123, "data": "test_data_123", } - if err := a.scheduler.Enqueue("jobType", payload); err != nil { + if err := a.scheduler.Enqueue("jobType", "gpuType", payload); err != nil { a.log.Error("enqueue failed", "err", err, "payload", payload) http.Error(w, "enqueue failed", http.StatusInternalServerError) return diff --git a/src/int_test.go b/src/int_test.go index ab364b6..851a24f 100644 --- a/src/int_test.go +++ b/src/int_test.go @@ -100,7 +100,7 @@ func TestIntegration(t *testing.T) { "data": fmt.Sprintf("test_data_%d", i), } - if err := scheduler.Enqueue(jobType, payload); err != nil { + if err := scheduler.Enqueue(jobType, "TT", payload); err != nil { t.Errorf("Failed to enqueue job: %v", err) } } diff --git a/src/job_scheduling_test.go b/src/job_scheduling_test.go new file mode 100644 index 0000000..62c165e --- /dev/null +++ b/src/job_scheduling_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "context" + "log/slog" + "os" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +func TestJobEnqueueAndSupervisor(t *testing.T) { + redisAddr := "localhost:6379" + log := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + + client := redis.NewClient(&redis.Options{Addr: redisAddr}) + defer client.Close() + if err := client.Ping(context.Background()).Err(); err != nil { + t.Fatalf("Failed to connect to Redis: %v", err) + } + + client.FlushDB(context.Background()) + + // Scheduler + scheduler := NewScheduler(redisAddr, log) + defer scheduler.Close() + + // Supervisor + supervisor := NewSupervisor(redisAddr, "test_worker_001", "AMD", log) + if err := supervisor.Start(); err != nil { + t.Fatalf("Failed to start supervisor: %v", err) + } + defer supervisor.Stop() + + // Enqueue jobs + for i := 0; i < 3; i++ { + payload := map[string]interface{}{"task": i} + if err := scheduler.Enqueue("test_job_type", "AMD", payload); err != nil { + t.Errorf("Failed to enqueue job %d: %v", i, err) + } + } + + time.Sleep(3 * time.Second) +} diff --git a/src/jobs.md b/src/jobs.md new file mode 100644 index 0000000..1f710b6 --- /dev/null +++ b/src/jobs.md @@ -0,0 +1,42 @@ +Job Lifecycle in MIST. +This document explains how jobs are handled in the scheduler, from creation to completion. + +1. Job Creation + +Jobs are created by the Scheduler. +Each job has: +job_id – unique identifier +job_type – category of work +gpu_type – optional GPU requirement +payload – arbitrary data for processing +Jobs are stored in Redis for persistence and event tracking. + +2. Enqueue + +The Scheduler enqueues jobs in Redis streams. +Each job enters the Scheduled state. +Jobs can be filtered by GPU type, priority, or type if needed. + +3. Assignment to Supervisor + +Supervisors are worker processes that consume jobs from the Scheduler. +A Supervisor subscribes to jobs that match its GPU type and other requirements. +Once picked up, a job state changes to InProgress. + +4. Job Processing + +The Supervisor executes the job logic using the provided payload. +Supervisors track progress and can emit intermediate events (optional). +The Scheduler monitors state changes and logs job activity. + +5. Completion + +When a job finishes successfully, the Supervisor marks it as Completed. +Failed jobs can be marked Failed or re-enqueued based on the retry policy. +All state changes are persisted in Redis for observability. + +6. Redis Storage + +Jobs are stored as hashes keyed by job:: +job_type, job_state, assigned_supervisor, timestamps, and payload. +Job events are emitted to a Redis stream (job_events) to allow real-time tracking. diff --git a/src/scheduler.go b/src/scheduler.go index f4ef6a6..e46cf73 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "time" + "errors" "github.com/redis/go-redis/v9" ) @@ -40,6 +41,13 @@ func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[stri JobState: JobStateScheduled, } + if ok, err := s.JobExists(job.ID); err != nil { + return err + } else if ok { + s.log.Warn("duplicate job skipped", "job_id", job.ID) + return nil + } + // marshal the payload payloadJSON, err := json.Marshal(job.Payload) if err != nil { @@ -56,7 +64,7 @@ func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[stri Values: map[string]interface{}{ "job_id": job.ID, "payload": string(payloadJSON), - "job_state": job.JobState, + "job_state": string(job.JobState), }, }) @@ -65,9 +73,9 @@ func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[stri pipe.HSet(s.ctx, metadataKey, map[string]interface{}{ "type": job.Type, "retries": job.Retries, - "created": "created": job.Created.Format(time.RFC3339), + "created": job.Created.Format(time.RFC3339), "required_gpu": job.RequiredGPU, - "job_state": job.JobState, + "job_state": string(job.JobState), }) // execute pipeline @@ -83,3 +91,66 @@ func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[stri func (s *Scheduler) Close() error { return s.client.Close() } + +func (s *Scheduler) JobExists(jobID string) (bool, error) { + exists, err := s.client.Exists(s.ctx, "job:"+jobID).Result() + if err != nil { + return false, err + } + return exists > 0, nil +} + +func (s *Scheduler) ListenForEvents() { + s.log.Info("listening for job events...", "stream", JobEventStream) + + lastID := "$" + + for { + result, err := s.client.XRead(s.ctx, &redis.XReadArgs{ + Streams: []string{JobEventStream, lastID}, + Count: 10, + Block: 5 * time.Second, + }).Result() + + if err != nil { + if errors.Is(err, redis.Nil) { + continue // no new messages + } + s.log.Error("error reading from event stream", "error", err) + time.Sleep(time.Second) + continue + } + + for _, stream := range result { + for _, msg := range stream.Messages { + s.handleEventMessage(msg) + lastID = msg.ID + } + } + } +} + +func (s *Scheduler) handleEventMessage(msg redis.XMessage) { + jobID, _ := msg.Values["job_id"].(string) + state, _ := msg.Values["state"].(string) + timestamp, _ := msg.Values["timestamp"].(string) + supervisor, _ := msg.Values["supervisor"].(string) + + if jobID == "" { + s.log.Warn("received event with missing job_id", "message_id", msg.ID) + return + } + + metadataKey := fmt.Sprintf("job:%s", jobID) + + // Update job state in Redis + if err := s.client.HSet(s.ctx, metadataKey, "job_state", state, "updated_at", timestamp).Err(); err != nil { + s.log.Error("failed to update job metadata", "job_id", jobID, "error", err) + return + } + + s.log.Info("job state updated", + "job_id", jobID, + "state", state, + "supervisor", supervisor) +} diff --git a/src/supervisor.go b/src/supervisor.go index 5c5756b..aa93e82 100644 --- a/src/supervisor.go +++ b/src/supervisor.go @@ -8,6 +8,7 @@ import ( "log/slog" "sync" "time" + "strconv" "github.com/redis/go-redis/v9" ) @@ -101,20 +102,58 @@ func (s *Supervisor) processJobs() { } func (s *Supervisor) handleMessage(message redis.XMessage) { - jobData, ok := message.Values["data"].(string) + jobID, ok := message.Values["job_id"].(string) if !ok { - s.log.Error("invalid job data in message", "message_id", message.ID) + s.log.Error("invalid job_id in message", "message_id", message.ID) s.ackMessage(message.ID) return } - var job Job - if err := json.Unmarshal([]byte(jobData), &job); err != nil { - s.log.Error("failed to unmarshal job data", "error", err, "message_id", message.ID) + payloadData, ok := message.Values["payload"].(string) + if !ok { + s.log.Error("invalid payload in message", "message_id", message.ID) + s.ackMessage(message.ID) + return + } + + var payload map[string]interface{} + if err := json.Unmarshal([]byte(payloadData), &payload); err != nil { + s.log.Error("failed to unmarshal payload data", "error", err, "message_id", message.ID) s.ackMessage(message.ID) return } + jobKey := fmt.Sprintf("job:%s", jobID) + metadata, err := s.redisClient.HGetAll(s.ctx, jobKey).Result() + if err != nil { + s.log.Error("failed to fetch job metadata", "job_id", jobID, "error", err) + s.ackMessage(message.ID) + return + } + + if len(metadata) == 0 { + s.log.Error("job metadata not found", "job_id", jobID) + s.ackMessage(message.ID) + return + } + + jobType := metadata["type"] + requiredGPU := metadata["required_gpu"] + jobState := metadata["job_state"] + + createdTime, _ := time.Parse(time.RFC3339, metadata["created"]) + retries, _ := strconv.Atoi(metadata["retries"]) + + job := Job{ + ID: jobID, + Type: jobType, + Payload: payload, + Retries: retries, + Created: createdTime, + RequiredGPU: requiredGPU, + JobState: JobState(jobState), + } + // certain jobs require a specific GPU if !s.canHandleJob(job) { s.log.Info("skipping job due to GPU mismatch", @@ -123,7 +162,7 @@ func (s *Supervisor) handleMessage(message redis.XMessage) { return } - s.log.Info("processing job", "job_id", job.ID, "job_type", job.Type) + s.emitJobEvent(job.ID, JobStateInProgress) // Simulate job processing success := s.processJob(job) @@ -153,6 +192,27 @@ func (s *Supervisor) processJob(job Job) bool { return true } + +func (s *Supervisor) emitJobEvent(jobID string, state JobState) { + event := map[string]interface{}{ + "job_id": jobID, + "state": string(state), + "timestamp": time.Now().Format(time.RFC3339), + "supervisor": s.consumerID, + "gpu_type": s.gpuType, + } + + if err := s.redisClient.XAdd(s.ctx, &redis.XAddArgs{ + Stream: JobEventStream, + Values: event, + }).Err(); err != nil { + s.log.Error("failed to emit job event", "job_id", jobID, "state", state, "error", err) + } else { + s.log.Info("emitted job event", "job_id", jobID, "state", state) + } +} + + func (s *Supervisor) ackMessage(messageID string) { result := s.redisClient.XAck(s.ctx, StreamName, ConsumerGroup, messageID) if result.Err() != nil { diff --git a/src/util.go b/src/util.go index f1816d3..420f1bf 100644 --- a/src/util.go +++ b/src/util.go @@ -9,6 +9,7 @@ import ( const ( StreamName = "jobs:stream" ConsumerGroup = "workers" + JobEventStream = "jobs:events" SupervisorStatusKey = "supervisors:status" MaxRetries = 3 RetryDelay = 5 * time.Second