Skip to content

Commit

Permalink
Add working version of coder adapter (#871)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrejcermak authored Mar 7, 2025
1 parent ec19744 commit 9c10c77
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 0 deletions.
120 changes: 120 additions & 0 deletions lib/ood_core/job/adapters/coder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
require "ood_core/refinements/hash_extensions"
require "ood_core/refinements/array_extensions"
require 'net/http'
require 'json'
require 'etc'

module OodCore
module Job
class Factory
using Refinements::HashExtensions

def self.build_coder(config)
batch = Adapters::Coder::Batch.new(config.to_h.symbolize_keys)
Adapters::Coder.new(batch)
end
end

module Adapters
attr_reader :host, :token

# The adapter class for Kubernetes.
class Coder < Adapter

using Refinements::ArrayExtensions
using Refinements::HashExtensions

require "ood_core/job/adapters/coder/batch"

attr_reader :batch
def initialize(batch)
@batch = batch
end

# Submit a job with the attributes defined in the job template instance
# @example Submit job template to cluster
# solver_id = job_adapter.submit(solver_script)
# #=> "1234.server"
# @example Submit job that depends on previous job
# post_id = job_adapter.submit(
# post_script,
# afterok: solver_id
# )
# #=> "1235.server"
# @param script [Script] script object that describes the
# script and attributes for the submitted job
# @param after [#to_s, Array<#to_s>] this job may be scheduled for execution
# at any point after dependent jobs have started execution
# @param afterok [#to_s, Array<#to_s>] this job may be scheduled for
# execution only after dependent jobs have terminated with no errors
# @param afternotok [#to_s, Array<#to_s>] this job may be scheduled for
# execution only after dependent jobs have terminated with errors
# @param afterany [#to_s, Array<#to_s>] this job may be scheduled for
# execution after dependent jobs have terminated
# @return [String] the job id returned after successfully submitting a job
def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
raise ArgumentError, 'Must specify the script' if script.nil?
batch.submit(script)
rescue Batch::Error => e
raise JobAdapterError, e.message
end

# Retrieve info for all jobs from the resource manager
# @abstract Subclass is expected to implement {#info_all}
# @raise [NotImplementedError] if subclass did not define {#info_all}
# @param attrs [Array<symbol>] defaults to nil (and all attrs are provided)
# This array specifies only attrs you want, in addition to id and status.
# If an array, the Info object that is returned to you is not guarenteed
# to have a value for any attr besides the ones specified and id and status.
#
# For certain adapters this may speed up the response since
# adapters can get by without populating the entire Info object
# @return [Array<Info>] information describing submitted jobs
def info_all(attrs: nil)
# TODO - implement info all for namespaces?
batch.method_missing(attrs: attrs)
rescue Batch::Error => e
raise JobAdapterError, e.message
end

# Whether the adapter supports job arrays
# @return [Boolean] - assumes true; but can be overridden by adapters that
# explicitly do not
def supports_job_arrays?
false
end

# Retrieve job info from the resource manager
# @abstract Subclass is expected to implement {#info}
# @raise [NotImplementedError] if subclass did not define {#info}
# @param id [#to_s] the id of the job
# @return [Info] information describing submitted job
def info(id)
batch.info(id.to_s)
rescue Batch::Error => e
raise JobAdapterError, e.message
end

# Retrieve job status from resource manager
# @note Optimized slightly over retrieving complete job information from server
# @abstract Subclass is expected to implement {#status}
# @raise [NotImplementedError] if subclass did not define {#status}
# @param id [#to_s] the id of the job
# @return [Status] status of job
def status(id)
info(id)["job"]["status"]
end

# Delete the submitted job.
#
# @param id [#to_s] the id of the job
# @return [void]
def delete(id)
res = batch.delete(id)
rescue Batch::Error => e
raise JobAdapterError, e.message
end
end
end
end
end
170 changes: 170 additions & 0 deletions lib/ood_core/job/adapters/coder/batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
require "ood_core/refinements/hash_extensions"
require "json"

# Utility class for the Coder adapter to interact with the Coders API.
class OodCore::Job::Adapters::Coder::Batch
require_relative "coder_job_info"
class Error < StandardError; end
def initialize(config)
@host = config[:host]
@token = config[:token]
end

def get_os_app_credentials(username, project_id)
credentials_file = File.read("/home/#{username}/application_credentials.json")
credentials = JSON.parse(credentials_file)
credentials.find { |cred| cred["project_id"] == project_id }
end

def get_rich_parameters(coder_parameters, project_id, os_app_credentials)
rich_parameter_values = [
{ name: "application_credential_name", value: os_app_credentials["name"] },
{ name: "application_credential_id", value: os_app_credentials["id"] },
{ name: "application_credential_secret", value: os_app_credentials["secret"] },
{name: "project_id", value: project_id }
]
if coder_parameters
coder_parameters.each do |key, value|
rich_parameter_values << { name: key, value: value.to_s}
end
end
rich_parameter_values
end

def get_headers(coder_token)
{
'Content-Type' => 'application/json',
'Accept' => 'application/json',
'Coder-Session-Token' => coder_token
}
end

def submit(script)
org_id = script.native[:org_id]
project_id = script.native[:project_id]
coder_parameters = script.native[:coder_parameters]
endpoint = "https://#{@host}/api/v2/organizations/#{org_id}/members/#{username}/workspaces"
os_app_credentials = get_os_app_credentials(username, project_id)
headers = get_headers(@token)
body = {
template_id: script.native[:template_id],
template_version_name: script.native[:template_version_name],
name: "#{username}-#{script.native[:workspace_name]}-#{rand(2_821_109_907_456).to_s(36)}",
rich_parameter_values: get_rich_parameters(coder_parameters, project_id, os_app_credentials),
}

resp = api_call('post', endpoint, headers, body)
resp["id"]
end

def delete(id)
endpoint = "https://#{@host}/api/v2/workspaces/#{id}/builds"
headers = get_headers(@token)
body = {
'orphan' => false,
'transition' => 'delete'
}
res = api_call('post', endpoint, headers, body)
end

def info(id)
endpoint = "https://#{@host}/api/v2/workspaces/#{id}?include_deleted=true"
headers = get_headers(@token)
workspace_info_from_json(api_call('get', endpoint, headers))
end

def coder_state_to_ood_status(coder_state)
case coder_state
when "starting"
"queued"
when "failed"
"suspended"
when "running"
"running"
when "deleted"
"completed"
when "stopped"
"completed"
else
"undetermined"
end
end

def build_coder_job_info(json_data, status)
coder_output_metadata = json_data["latest_build"]["resources"]
&.find { |resource| resource["name"] == "coder_output" }
&.dig("metadata")
coder_output_hash = coder_output_metadata&.map { |meta| [meta["key"].to_sym, meta["value"]] }&.to_h || {}
OodCore::Job::Adapters::Coder::CoderJobInfo.new(**{
id: json_data["id"],
job_name: json_data["workspace_name"],
status: OodCore::Job::Status.new(state: status),
job_owner: json_data["workspace_owner_name"],
submission_time: json_data["created_at"],
dispatch_time: json_data.dig("updated_at"),
wallclock_time: wallclock_time(json_data, status),
ood_connection_info: { host: coder_output_hash[:floating_ip], port: 80 },
native: coder_output_hash
})
end

def wallclock_time(json_data, status)
start_time = start_time(json_data)
end_time = end_time(json_data, status)
end_time - start_time
end

def start_time(json_data)
start_time_string = json_data.dig("updated_at")
DateTime.parse(start_time_string).to_time.to_i
end

def end_time(json_data, status)
if status == 'deleted'
end_time_string = json_data["latest_build"].dig("updated_at")
et = DateTime.parse(end_time_string).to_time.to_i
else
et = DateTime.now.to_time.to_i
end
et
end

def workspace_info_from_json(json_data)
state = json_data.dig("latest_build", "status") || json_data.dig("latest_build", "job", "status")
status = coder_state_to_ood_status(state)
build_coder_job_info(json_data, status)
end

def api_call(method, endpoint, headers, body = nil)
uri = URI(endpoint)

case method.downcase
when 'get'
request = Net::HTTP::Get.new(uri, headers)
when 'post'
request = Net::HTTP::Post.new(uri, headers)
when 'delete'
request = Net::HTTP::Delete.new(uri, headers)
else
raise ArgumentError, "Invalid HTTP method: #{method}"
end

request.body = body.to_json if body

response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: uri.scheme == 'https') do |http|
http.request(request)
end

case response
when Net::HTTPSuccess
JSON.parse(response.body)
else
raise Error, "HTTP Error: #{response.code} #{response.message} for request #{endpoint} and body #{body}"
end
end

def username
@username ||= Etc.getlogin
end

end
8 changes: 8 additions & 0 deletions lib/ood_core/job/adapters/coder/coder_job_info.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class OodCore::Job::Adapters::Coder::CoderJobInfo < OodCore::Job::Info
attr_reader :ood_connection_info

def initialize(options)
super(**options)
@ood_connection_info = options[:ood_connection_info]
end
end

0 comments on commit 9c10c77

Please sign in to comment.