From 30cd63c55e81817446a7e628844b6684aaa1f43b Mon Sep 17 00:00:00 2001 From: Matthias Mohr Date: Fri, 4 Aug 2023 17:05:50 +0200 Subject: [PATCH] GDC draft --- src/connection.js | 57 ++++++++++- src/gdc/capabilities.js | 219 ++++++++++++++++++++++++++++++++++++++++ src/gdc/migrate.js | 148 +++++++++++++++++++++++++++ src/gdc/parser.js | 96 ++++++++++++++++++ src/openeo.js | 6 +- 5 files changed, 520 insertions(+), 6 deletions(-) create mode 100644 src/gdc/capabilities.js create mode 100644 src/gdc/migrate.js create mode 100644 src/gdc/parser.js diff --git a/src/connection.js b/src/connection.js index e2c249e..179ef03 100644 --- a/src/connection.js +++ b/src/connection.js @@ -8,7 +8,8 @@ const AuthProvider = require('./authprovider'); const BasicProvider = require('./basicprovider'); const OidcProvider = require('./oidcprovider'); -const Capabilities = require('./capabilities'); +const GdcCapabilities = require('./gdc/capabilities'); +const GdcMigrate = require('./gdc/migrate'); const FileTypes = require('./filetypes'); const UserFile = require('./userfile'); const Job = require('./job'); @@ -18,6 +19,7 @@ const Service = require('./service'); const Builder = require('./builder/builder'); const BuilderNode = require('./builder/node'); + const CONFORMANCE_RELS = [ 'conformance', 'http://www.opengis.net/def/rel/ogc/1.0/conformance' @@ -120,7 +122,8 @@ class Connection { } } - this.capabilitiesObject = new Capabilities(data); + GdcMigrate.connection = this; + this.capabilitiesObject = new GdcCapabilities(data); return this.capabilitiesObject; } @@ -802,6 +805,45 @@ class Connection { return await pg.describeUserProcess(); } + isOgcProcess(process) { + let nodes = Object.values(process.process_graph); + return Boolean(nodes.find(node => { + let process = this.processes.get(node.process_id); + return Utils.isObject(process) && Boolean(process.ogcapi); + })); + } + + async executeOgcProcess(process, abortController = null) { + let openEO = this._normalizeUserProcess(process); + let mode = null; + let p = Object.values(openEO.process.process_graph).find(v => { + let spec = this.processes.get(v.process_id); + if (Array.isArray(spec.jobControlOptions) && spec.jobControlOptions.includes("async-execute")) { + mode = 'async'; + } + return Boolean(spec && spec.ogcapi); + }); + let requestBody = GdcMigrate.execute(openEO); + let headers = {}; + if (mode === 'async') { + headers.Prefer = 'respond-async'; + } + console.log(p.process_id, requestBody, headers); // @todo remove + let response = await this._post(`/processes/${p.process_id}/execution`, requestBody, Environment.getResponseType(), abortController, headers); + let syncResult = { + data: response.data, + costs: null, + type: null, + logs: [] + }; + + if (typeof response.headers['content-type'] === 'string') { + syncResult.type = response.headers['content-type']; + } + + return syncResult; + } + /** * Executes a process synchronously and returns the result as the response. * @@ -823,6 +865,9 @@ class Connection { budget: budget }) ); + if (this.isOgcProcess(requestBody.process)) { + return this.executeOgcProcess(process, abortController); + } let response = await this._post('/result', requestBody, Environment.getResponseType(), abortController); let syncResult = { data: response.data, @@ -1131,16 +1176,18 @@ class Connection { * @param {*} body * @param {string} responseType - Response type according to axios, defaults to `json`. * @param {?AbortController} [abortController=null] - An AbortController object that can be used to cancel the request. + * @param {Array.>} [headers={}] - Headers * @returns {Promise} * @throws {Error} * @see https://github.com/axios/axios#request-config */ - async _post(path, body, responseType, abortController = null) { + async _post(path, body, responseType, abortController = null, headers = {}) { let options = { method: 'post', - responseType: responseType, + responseType, url: path, - data: body + data: body, + headers }; return await this._send(options, abortController); } diff --git a/src/gdc/capabilities.js b/src/gdc/capabilities.js new file mode 100644 index 0000000..8abbdf5 --- /dev/null +++ b/src/gdc/capabilities.js @@ -0,0 +1,219 @@ +const Capabilities = require("../capabilities"); +const Utils = require('@openeo/js-commons/src/utils'); +const Migrate = require('./migrate'); + +class GdcCapabilities extends Capabilities { + + constructor(data) { + super(data); + Object.assign(this.featureMap, { + describeCoverage: 'get /collections/{}/coverage', + describeCoverageDomainset: 'get /collections/{}/coverage/domainset', + describeCoverageRangetype: 'get /collections/{}/coverage/rangetype', + describeCoverageRangeset: 'get /collections/{}/coverage/rangeset', + describeCoverageMetadata: 'get /collections/{}/coverage/metadata', + executeOgcProcess: 'post /processes/{}/execution', + }); + this.checkConformance(); + } + + getConformanceClasses() { + if(!Array.isArray(this.data.conformsTo)) { + return []; + } + return this.data.conformsTo; + } + + hasConformance(uri) { + if(!Array.isArray(this.data.conformsTo)) { + return false; + } + return this.data.conformsTo.includes(uri); + } + + _getLink(rel) { + if (!Array.isArray(this.data.links)) { + return null; + } + return this.data.links.find(link => link.rel === rel) || null; + } + + checkConformance() { + if (!Array.isArray(this.data.endpoints)) { + this.data.endpoints = []; + } + const isCoverage = this.hasConformance('http://www.opengis.net/spec/ogcapi-coverages-1/0.0/conf/geodata-coverage'); + const isFeatures = this.hasConformance('http://www.opengis.net/spec/ogcapi-features-1/1.0/conf/core'); + if (isCoverage || isFeatures) { + this.data.endpoints.push({ + "path": "/collections", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}", + "methods": ["GET"] + }); + } + // if (isFeatures) { + // this.data.endpoints.push({ + // "path": "/collections/{collection_id}/items", + // "methods": ["GET"] + // }); + // this.data.endpoints.push({ + // "path": "/collections/{collection_id}/items/{item_id}", + // "methods": ["GET"] + // }); + // } + if (isCoverage) { + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage/domainset", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage/rangetype", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage/rangeset", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/collections/{collection_id}/coverage/metadata", + "methods": ["GET"] + }); + } + const isProcessApi = this.hasConformance('http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/core'); + const processDismiss = this.hasConformance('http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/dismiss'); + const processJobList = this.hasConformance('http://www.opengis.net/spec/ogcapi-processes-1/1.0/req/job-list'); + const processLink = this._getLink('https://www.opengis.net/def/rel/ogc/1.0/processes'); + if (isProcessApi || processLink) { + this.data.endpoints.push({ + "path": "/processes", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/processes/{processId}", + "methods": ["GET"] + }); + this.data.endpoints.push({ + "path": "/processes/{processId}/execution", + "methods": ["POST"] + }); + let jobMethods = ["GET"]; + if (processDismiss) { // @todo Is dismiss equivalent to openEO job cancellation or deletion? + jobMethods.push("DELETE"); + } + this.data.endpoints.push({ + "path": "/jobs/{job_id}", + "methods": jobMethods + }); + this.data.endpoints.push({ + "path": "/jobs/{job_id}/results", + "methods": ["GET"] + }); + } + const jobLink = this._getLink('https://www.opengis.net/def/rel/ogc/1.0/job-list'); + if (processJobList || jobLink) { + this.data.endpoints.push({ + "path": "/jobs", + "methods": ["GET"] + }); + } + this.init(); + } + + /** + * Initializes the class. + * + * @protected + */ + init() { + if (Array.isArray(this.data.endpoints)) { + super.init(); + } + } + + /** + * Validates the capabilities. + * + * Throws an error in case of an issue, otherwise just passes. + * + * @protected + * @throws {Error} + */ + validate() { + if(!Utils.isObject(this.data)) { + throw new Error("No capabilities retrieved."); + } + } + + /** + * Returns the openEO API version implemented by the back-end. + * + * @returns {string} openEO API version number.F + */ + apiVersion() { + return this.data.api_version; + } + + /** + * Returns the GDC API version implemented by the back-end. + * + * @returns {string} GDC API version number. + */ + gdcVersion() { + return this.data.gdc_version || "1.0.0-beta"; + } + + isEndpoint(response, method, endpoint) { + if (response.config.method !== method) { + return false; + } + if (endpoint.includes('{}')) { + let pattern = '^' + endpoint.replace('{}', '[^/]+') + '$'; + let regex = new RegExp(pattern); + return regex.test(response.config.url); + } + return endpoint === response.config.url; + } + + /** + * Migrates a response, if required. + * + * @param {AxiosResponse} response + * @protected + * @returns {AxiosResponse} + */ + migrate(response) { + if (this.isEndpoint(response, 'get', '/collections')) { + response.data.collections = response.data.collections.map(collection => Migrate.collection(collection, response)); + } + else if (this.isEndpoint(response, 'get', '/collections/{}')) { + response.data = Migrate.collection(response.data, response); + } + else if (this.isEndpoint(response, 'get', '/processes')) { + response.data.processes = response.data.processes.map(process => Migrate.process(process, response)); + } + else if (this.isEndpoint(response, 'get', '/jobs')) { + response.data.jobs = response.data.jobs.map(job => Migrate.job(job, response)); + } + else if (this.isEndpoint(response, 'get', '/jobs/{}')) { + response.data = Migrate.job(response.data, response); + } + + response = Migrate.all(response); + + return response; + } +} + + +module.exports = GdcCapabilities; \ No newline at end of file diff --git a/src/gdc/migrate.js b/src/gdc/migrate.js new file mode 100644 index 0000000..41c6392 --- /dev/null +++ b/src/gdc/migrate.js @@ -0,0 +1,148 @@ +const Utils = require('@openeo/js-commons/src/utils'); +const StacMigrate = require('@radiantearth/stac-migrate'); +const PgParser = require('./parser'); + +const JobStatusMap = { + accepted: 'created', + running: 'running', + successful: 'finished', + failed: 'error', + dismissed: 'canceled' +}; + +const Migrate = { + + connection: null, + + all(response) { + if (Array.isArray(response.data.links)) { + response.data.links = this.connection.makeLinksAbsolute(response.data.links, response); + } + return response; + }, + + collection(collection, response) { + if (collection.stac_version) { + return collection; + } + + // Make sure the required properties are present + collection = StacMigrate.collection(collection); + collection.ogcapi = true; + // Make links absolute + if (Array.isArray(collection.links)) { + collection.links = this.connection.makeLinksAbsolute(collection.links, response); + } + + return collection; + }, + + process(process, response) { + if (process.parameters || process.returns) { + return process; + } + + process.ogcapi = true; + process.summary = process.title; + + process.parameters = []; + for(let name in process.inputs) { + let input = process.inputs[name]; + process.parameters.push({ + name, + description: [input.title, input.description].filter(v => Boolean(v)).join("\n\n"), + schema: input.schema, + optional: typeof input.schema.default !== 'undefined' || input.minOccurs === 0 + }); + } + + let addOutputParam = (p, name, output) => { + output = Object.assign({}, output); + if (Array.isArray(output.schema.oneOf) && output.schema.oneOf.every(s => s.type === 'string' && Boolean(s.contentMediaType))) { + output.schema = { + type: 'string', + enum: output.schema.oneOf.map(s => s.contentMediaType) + }; + } + p.parameters.push(Object.assign({name: `output:${name}`}, output)); + }; + + if (Utils.size(process.outputs) === 1) { + let [name, output] = Object.entries(process.outputs)[0]; + process.returns = { + description: [output.title, output.description].filter(v => Boolean(v)).join("\n\n"), + schema: output.schema + }; + // @todo workaround for now + addOutputParam(process, name, output); + } + else { + process.returns = { + description: 'see process description', + schema: [] + }; + for(let name in process.outputs) { + let output = process.outputs[name]; + let schema = Object.assign({}, output.schema, {title: output.title, description: output.description}); + process.returns.schema.push(schema); + // @todo workaround for now + addOutputParam(process, name, output); + } + } + + // Make links absolute + if (Array.isArray(process.links)) { + process.links = this.connection.makeLinksAbsolute(process.links, response); + } + + return process; + }, + + job(job, response) { + if (!job.jobID) { + return job; + } + + job.ogcapi = true; + job.id = job.jobID; + if (job.processID) { + job.process = { + process_graph: { + [job.processID]: { + process_id: job.processID, + arguments: {}, + description: "Process description incomplete as the information is missing in OGC API responses.", + result: true + } + } + }; + } + job.status = JobStatusMap[job.status]; + job.created = job.created || job.started; + job.updated = job.updated || job.finished; + job.description = job.message; + + if (Array.isArray(job.links)) { + job.links = this.connection.makeLinksAbsolute(job.links, response); + } + + return job; + }, + + execute(requestBody) { + const graph = Object.values(requestBody.process.process_graph); + const valid = graph.every(node => { + let spec = this.connection.processes.get(node.process_id); + return Boolean(spec && (spec.ogcapi || spec.id === 'load_collection')); + }); + if (!valid) { + throw new Error('Process must consist only of OGC Processes and Collections'); + } + + const parser = new PgParser(requestBody.process, this.connection.getBaseUrl()); + return parser.parse(); + }, + +}; + +module.exports = Migrate; \ No newline at end of file diff --git a/src/gdc/parser.js b/src/gdc/parser.js new file mode 100644 index 0000000..a1dc418 --- /dev/null +++ b/src/gdc/parser.js @@ -0,0 +1,96 @@ +const Utils = require('@openeo/js-commons/src/utils'); + +class PgParser { + + constructor(process, baseUrl) { + this.process = process; + this.url = baseUrl; + } + + parse(process = null) { + if (process === null) { + process = Utils.deepClone(this.process); + } + + for(const key in process.process_graph) { + const node = process.process_graph[key]; + node.arguments = this.parseArgs(node.arguments, process.process_graph); + } + + const resultNode = Object.values(process.process_graph) + .find(node => node.result); + return this.toOgcProcess(resultNode); + } + + toOgcProcess(node) { + // Get output parameters from arguments + // @todo This is just a workaround for now + let outputs; + for(let key in node.arguments) { + if (key.startsWith('output:')) { + const mediaType = node.arguments[key]; + delete node.arguments[key]; + if (node.result) { + if (!outputs) { + outputs = {}; + } + let name = key.substring(7); + outputs[name] = { + format: { mediaType } + }; + } + } + } + + return { + "process": `${this.url}/processes/${node.process_id}`, + inputs: node.arguments, + outputs + } + } + + parseArgs(args, graph) { + for(let key in args) { + args[key] = this.parseDeep(args[key], args, graph); + } + return args; + } + + parseDeep(value, parent, graph) { + const isObject = Utils.isObject(value); + if (isObject && typeof value.from_node === 'string') { + const refNode = graph[value.from_node]; + if (refNode.process_id === 'load_collection') { + const c = refNode.arguments; + const url = new URL(`${this.url}/collections/${c.id}/coverage`); + if (Utils.isObject(c.spatial_extent) && typeof c.spatial_extent.west !== 'undefined') { + const bbox = c.spatial_extent; + const subset = `Lat(${bbox.south}:${bbox.north}),Lon(${bbox.west}:${bbox.east})`; + url.searchParams.append('subset', subset); + } + value = { + href: url.toString() + }; + // @todo: Just needed for GNOSIS? + if (!Array.isArray(parent)) { + value = [value]; + } + } + else { + value = this.toOgcProcess(graph[value.from_node]); + } + } + else if (isObject && Utils.isObject(value.process_graph)) { + value = this.parse(value); + } + else if (isObject || Array.isArray(value)) { + for(let key in value) { + value[key] = this.parseDeep(value[key], value, graph); + } + } + return value; + } + +} + +module.exports = PgParser; \ No newline at end of file diff --git a/src/openeo.js b/src/openeo.js index 9ef90fe..6f17b23 100644 --- a/src/openeo.js +++ b/src/openeo.js @@ -27,6 +27,7 @@ const Formula = require('./builder/formula'); const MIN_API_VERSION = '1.0.0-rc.2'; const MAX_API_VERSION = '1.x.x'; +const GDC_VERSION = '1.0.0-beta'; /** * Main class to start with openEO. Allows to connect to a server. @@ -94,9 +95,12 @@ class OpenEO { // Check whether back-end is accessible and supports a compatible version. let capabilities = await connection.init(); - if (Versions.compare(capabilities.apiVersion(), MIN_API_VERSION, "<") || Versions.compare(capabilities.apiVersion(), MAX_API_VERSION, ">")) { + if (capabilities.apiVersion() && (Versions.compare(capabilities.apiVersion(), MIN_API_VERSION, "<") || Versions.compare(capabilities.apiVersion(), MAX_API_VERSION, ">"))) { throw new Error("Client only supports the API versions between " + MIN_API_VERSION + " and " + MAX_API_VERSION); } + if (!Versions.compare(capabilities.gdcVersion(), GDC_VERSION, "=")) { + throw new Error("Client only supports the GDC version " + GDC_VERSION); + } return connection; }