Skip to content

Commit

Permalink
Support for exposing statefulset via a headless service proxy
Browse files Browse the repository at this point in the history
Also restructures controller to operate off an internal definition of desired
services (skupper-services configmap). This can be populated by cli or through
setting annotations on target objects.
  • Loading branch information
grs committed Feb 11, 2020
1 parent f9701ea commit 7995d35
Show file tree
Hide file tree
Showing 9 changed files with 619 additions and 373 deletions.
564 changes: 350 additions & 214 deletions bin/deployer.js

Large diffs are not rendered by default.

116 changes: 61 additions & 55 deletions bin/icproxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,78 +68,84 @@ OutgoingBridgeConfig.prototype.updated = function (pods) {
}
};

function bridge_addr(config) {
var parts = config.split(':');
if (parts.length == 2) {
function get_bridging_functions(protocol) {
if (protocol === 'http') {
return {
protocol: parts[0].toLowerCase(),
addr: parts[1]
ingress: bridges.http_to_amqp,
egress: bridges.amqp_to_http
};
} else if (protocol === 'tcp') {
return {
ingress: bridges.tcp_to_amqp,
egress: bridges.amqp_to_tcp
};
} else if (protocol === 'http2') {
return {
ingress: bridges.http2_to_amqp,
egress: bridges.amqp_to_http2
};
} else {
return undefined;
}
}

function bridge_type(source_protocol, target_protocol) {
return source_protocol + "_to_" + target_protocol;
}

function bridge_config(config) {
var parts = config.split('=>');
if (parts.length == 2) {
var source = bridge_addr(parts[0]);
var target = bridge_addr(parts[1]);
if (source === undefined || target === undefined) {
return undefined;
} else {
return {
type: bridge_type(source.protocol, target.protocol),
source: source.addr,
target: target.addr
};
}
function get_ordinal(hostname) {
var matches = hostname.match(/\d+$/);
if (matches) {
return parseInt(matches[0]);
} else {
return undefined;
log.error('Could not determine ordinal for %s', hostname);
return 0;
}
}

function create_bridge(config_string) {
var config = bridge_config(config_string);
if (config === undefined) {
console.error('Skipping malformed bridge: %s', config_string);
return undefined;
} else {
return bridges.create(config);
}
function get_target_pod_name(hostname, statefulset) {
return statefulset + '-' + get_ordinal(hostname);
}

function Proxy(config, selector) {
var targets = kubernetes.watch('pods', undefined, selector);
var bridgeconfigs = config.split(',').map(bridge_config).filter(function (bridge) { return bridge !== undefined; });
for (var i in bridgeconfigs) {
var bridgeconfig = bridgeconfigs[i];
if (bridgeconfig.type === "amqp_to_http") {
new OutgoingBridgeConfig(bridgeconfig.source, bridgeconfig.target, bridges.amqp_to_http, targets);
} else if (bridgeconfig.type === "amqp_to_http2") {
new OutgoingBridgeConfig(bridgeconfig.source, bridgeconfig.target, bridges.amqp_to_http2, targets);
} else if (bridgeconfig.type === "amqp_to_tcp") {
new OutgoingBridgeConfig(bridgeconfig.source, bridgeconfig.target, bridges.amqp_to_tcp, targets);
} else if (bridgeconfig.type === "http_to_amqp") {
bridges.http_to_amqp(bridgeconfig.source, bridgeconfig.target);
} else if (bridgeconfig.type === "http2_to_amqp") {
bridges.http2_to_amqp(bridgeconfig.source, bridgeconfig.target);
} else if (bridgeconfig.type === "tcp_to_amqp") {
bridges.tcp_to_amqp(bridgeconfig.source, bridgeconfig.target);
function Proxy(config) {
var bridging = get_bridging_functions(config.protocol);
if (bridging) {
if (config.headless === undefined) {
var address = config.address || config.name;
this.ingress = bridging.ingress(config.port, address);

if (config.targets) {
this.egress = config.targets.map(function (target) {
if (target.selector) {
var targets = kubernetes.watch('pods', undefined, target.selector);
return new OutgoingBridgeConfig(address, target.targetPort || config.port, bridging.egress, targets);
} else {
console.error('Ignoring target %s; no selector defined.', target.name);
return {};
}
});
}
} else {
console.error("Unrecognised bridge type: %s", bridgeconfig.type);
//for headless services, only have ingress on remote sites, and only have egress on local site
if (config.origin) {
var address = [config.address || config.name, process.env.HOSTNAME].join('.');
this.ingress = bridging.ingress(config.port, address);
} else {
var podname = get_target_pod_name(process.env.HOSTNAME, config.headless.name);
var address = [config.address || config.name, podname].join('.');
var targetPort = config.headless.targetPort || config.port;
var host = [podname, config.name || config.address, process.env.NAMESPACE, 'svc.cluster.local'].join('.');
this.egress = bridging.egress(address, host, targetPort);
}
}
} else {
console.error('Unrecognised protocol: %s', config.protocol);
}
}

if (process.env.ICPROXY_CONFIG === undefined) {
console.error('ICPROXY_CONFIG must be set');
} else if (process.env.ICPROXY_POD_SELECTOR === undefined) {
console.error('ICPROXY_POD_SELECTOR must be set');
process.on('SIGTERM', function () {
console.log('Exiting due to SIGTERM');
process.exit();
});

if (process.env.SKUPPER_PROXY_CONFIG === undefined) {
console.error('SKUPPER_PROXY_CONFIG must be set');
} else {
var proxy = new Proxy(process.env.ICPROXY_CONFIG, process.env.ICPROXY_POD_SELECTOR);
var proxy = new Proxy(JSON.parse(process.env.SKUPPER_PROXY_CONFIG));
}
2 changes: 1 addition & 1 deletion examples/http/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var http = require('http');
var server = http.createServer(function (request, response) {
console.log('%s %s on %s', request.method, request.url, request.headers.host);
response.statusCode = 200;
response.end(request.method + ' ' + request.url + ' handled by ' + process.env.HOSTNAME);
response.end(request.method + ' ' + request.url + ' from ' + request.socket.remoteAddress + ' handled by ' + process.env.HOSTNAME + '\n');
});
server.listen(process.env.PORT || 8080, function () {
console.log('listening on %s', server.address().port);
Expand Down
2 changes: 1 addition & 1 deletion lib/bridges.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ AmqpToHttpBridge.prototype.incoming = function (context) {
context.connection.send(message_out);
});
});
request.on('error', function () {
request.on('error', function (error) {
console.error(error);
context.delivery.modified();
});
Expand Down
18 changes: 18 additions & 0 deletions lib/kubernetes.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ var log = require("./log.js").logger();

const CLUSTER_SCOPE = {};

const DEPLOYMENT = {
group: 'apps',
version: 'v1',
name: 'deployments',
};

const STATEFULSET = {
group: 'apps',
version: 'v1',
name: 'statefulsets',
};

function watch_handler(collection) {
var partial = undefined;
return function (msg) {
Expand Down Expand Up @@ -58,6 +70,9 @@ function Client(options) {
this.options = options || {};
}

Client.prototype.DEPLOYMENT = DEPLOYMENT;
Client.prototype.STATEFULSET = STATEFULSET;

Client.prototype.host = function () {
return this.options.host || process.env.KUBERNETES_SERVICE_HOST;
};
Expand Down Expand Up @@ -353,3 +368,6 @@ module.exports.CLUSTER_SCOPE = CLUSTER_SCOPE;
module.exports.client = function (options) {
return new Client(options);
};

module.exports.DEPLOYMENT = DEPLOYMENT;
module.exports.STATEFULSET = STATEFULSET;
9 changes: 9 additions & 0 deletions lib/labels.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@

const BASE_QUALIFIER = "skupper.io";
const INTERNAL_QUALIFIER = "internal." + BASE_QUALIFIER;

const ADDRESS = BASE_QUALIFIER + "/address";
const PROXY = BASE_QUALIFIER + "/proxy";
const VERSION = BASE_QUALIFIER + "/version";
const WEIGHT = BASE_QUALIFIER + "/weight";

const CONTROLLED = INTERNAL_QUALIFIER + "/controlled";
const SERVICE = INTERNAL_QUALIFIER + "/service";
const ORIGIN = INTERNAL_QUALIFIER + "/origin";
const ORIGINAL_SELECTOR = INTERNAL_QUALIFIER + "/original-selector";
const TYPE = INTERNAL_QUALIFIER + "/type";

const TYPE_PROXY = TYPE + '=proxy';

module.exports = {
ADDRESS: ADDRESS,
CONTROLLED: CONTROLLED,
PROXY: PROXY,
VERSION: VERSION,
WEIGHT: WEIGHT,
SERVICE: SERVICE,
ORIGIN: ORIGIN,
ORIGINAL_SELECTOR: ORIGINAL_SELECTOR,
Expand Down
33 changes: 33 additions & 0 deletions lib/owner_ref.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,42 @@ if (process.env.OWNER_NAME && process.env.OWNER_UID) {
}];
}

function equivalent_owner_reference(a, b) {
return a.apiVersion === b.apiVersion && a.kind === b.kind && a.name === b.name && a.uid === b.uid;
}

function has_equivalent_owner_reference(list, target) {
for (var i in list) {
if (equivalent_owner_reference(list[i], target)) {
return true;
}
}
return false;
}

function equivalent_owner_references(a, b) {
if (a === undefined) {
return b === undefined;
} else if (a.length === b.length) {
for (var i in a) {
if (!has_equivalent_owner_reference(b, a[i])) {
return false;
}
}
return true;
} else {
return false;
}
}

module.exports.set_owner_references = function (object) {
if (owner_references) {
object.metadata.ownerReferences = owner_references;
}
}

module.exports.is_owner = function (object) {
if (owner_references && object.metadata) {
return equivalent_owner_references(object.metadata.ownerReferences, owner_references);
}
}
Loading

0 comments on commit 7995d35

Please sign in to comment.