Skip to content

Commit

Permalink
fixes to service-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
grs committed Sep 4, 2019
1 parent 69e995f commit 3280396
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 38 deletions.
97 changes: 65 additions & 32 deletions bin/deployer.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
var kubernetes = require('../lib/kubernetes.js').client();
var labels = require("../lib/labels.js");
var log = require("../lib/log.js").logger();
var service_utils = require("../lib/service_utils.js");
var service_sync = require("../lib/service_sync.js");

const DEPLOYMENT = {
Expand All @@ -34,6 +35,14 @@ function Deployer(service_account, service_sync_origin) {
this.service_sync = service_sync(service_sync_origin);
this.service_watcher.on('updated', this.service_sync.updated.bind(this.service_sync));
}
if (process.env.OWNER_NAME && process.env.OWNER_UID) {
this.owner_references = [{
apiVersion: 'apps/v1',
kind: 'Deployment',
name: process.env.OWNER_NAME,
uid: process.env.OWNER_UID
}];
}
}

function is_success_code(code) {
Expand Down Expand Up @@ -66,7 +75,7 @@ Deployer.prototype.deployments_updated = function (deployments) {

Deployer.prototype.verify_deployment = function (service) {
var self = this;
log.info('Verifying proxy deployment for %s', service.metadata.name);
log.info('Verifying proxy deployment for service %s', service.metadata.name);
//ensure a proxy is deployed for the service and the service is
//correctly configured for it
var deployment_name = get_deployment_name(service);
Expand Down Expand Up @@ -94,8 +103,14 @@ Deployer.prototype.reconcile = function (service, deployment) {
this.undeploy(service, deployment);
} else {
var update = false;
update = !this.verify_network(get_network_name(), deployment);
update = !this.verify_proxy_config(service, deployment);
if (!this.verify_proxy_config(service, deployment)) {
log.info('proxy config changed for deployment %s', deployment.metadata.name);
update = true;
}
if (!this.verify_proxy_pod_selector(service, deployment)) {
log.info('proxy pod selector changed for deployment %s', deployment.metadata.name);
update = true;
}

if (update) {
log.info('Updating proxy deployment %s', deployment.metadata.name);
Expand All @@ -104,12 +119,15 @@ Deployer.prototype.reconcile = function (service, deployment) {
}).catch(function (code, error) {
log.error('Failed to update proxy deployment for %s: %s %s', deployment.metadata.name, code, error);
});
} else {
log.info('No update required for proxy deployment %s', deployment.metadata.name);
}
}

//check selector on service is as expected
if (!this.verify_proxy_selector(service, deployment)) {
log.info('Updating service selector for %s', service.metadata.name);
service_utils.set_last_applied(service);
kubernetes.put('services', service).then(function () {
log.info('Updated service selector for %s', service.metadata.name);
}).catch(function (code, error) {
Expand All @@ -120,6 +138,7 @@ Deployer.prototype.reconcile = function (service, deployment) {
}

Deployer.prototype.verify_matches_service = function (deployment) {
log.info('Verifying that service matches deployment %s', deployment.metadata.name);
var self = this;
var service_name = deployment.metadata.annotations[labels.SERVICE];
if (service_name === undefined) {
Expand Down Expand Up @@ -152,15 +171,37 @@ function equivalent_selector (a, b) {
return true;
}

//verify that the service selector matches the deployment (which will be the proxy deployment)
Deployer.prototype.verify_proxy_selector = function (service, deployment) {
if (equivalent_selector(service.spec.selector, deployment.spec.selector.matchLabels)) {
return true;
} else {
service.metadata.annotations[labels.ORIGINAL_SELECTOR] = stringify_selector(service.spec.selector);
service.spec.selector = deployment.spec.selector.matchLabels;
return false;
}
};

//verify that the proxy is targetting the right set of pods
Deployer.prototype.verify_proxy_pod_selector = function (service, deployment) {
var desired = service.metadata.annotations[labels.ORIGINAL_SELECTOR];
if (desired === undefined) {
if (!equivalent_selector(service.spec.selector, deployment.spec.selector.matchLabels)) {
desired = stringify_selector(service.spec.selector);
} else {
log.error('Cannot determine correct pod selector for proxy from service %s, deployment %s', service.metadata.name, deployment.metadata.name);
return true;//can't update deployment
}
}
var actual = get_container_env_value(deployment, 'proxy', 'ICPROXY_POD_SELECTOR');
if (actual === desired) {
return true;
} else {
set_container_env_value(deployment, 'proxy', 'ICPROXY_POD_SELECTOR', desired)
return false;
}
};

function equivalent_proxy_config (a, b) {
return a.sort().join(',') === b.sort().join(',');
}
Expand All @@ -187,32 +228,11 @@ Deployer.prototype.verify_proxy_config = function (service, deployment) {
return true;
} else {
set_container_env_value(deployment, 'proxy', 'ICPROXY_CONFIG', desired.join(','))
log.info('updated proxy config for %s: %j', deployment.metadata.name, desired);
return false;
}
};

Deployer.prototype.verify_network = function (network, deployment) {
function match_network (volume) {
return volume.secret === network;
}
//does the deployment have a connect.json mounted in for the
//correct network?
if (deployment.spec.template.spec.volumes && deployment.spec.template.spec.volumes.connect) {
if (deployment.spec.template.spec.volumes.connect.secret === network) {
return true;
} else {
deployment.spec.template.spec.volumes.connect.secret = network;
return false;
}
} else {
//TODO: need to mount in any credentials required for connect as well
deployment.spec.template.spec.volumes = [{name: 'connect', secret: { secretName: network }}];
// now also need to verify the volumeMounts in deployment.spec.template.spec.containers.proxy
return false;
}
};


function stringify_selector (selector) {
var elements = [];
for (var k in selector) {
Expand Down Expand Up @@ -272,6 +292,8 @@ function set_container_env_value(deployment, container_name, key, value) {
var container = get_container(deployment, container_name);
if (container) {
set_env_value(container.env, key, value);
} else {
log.error('Could not set env var %s on container %s: no such container in %s', key, container_name, deployment.metadata.name);
}
}

Expand All @@ -287,6 +309,7 @@ Deployer.prototype.undeploy = function (service, deployment) {
log.info('Restoring selector for %s', service.metadata.name);
kubernetes.update('services', service.metadata.name, function (original) {
original.spec.selector = selector;
service_utils.set_last_applied(original);
return original;
}).then(function (result) {
var code = result.code;
Expand All @@ -310,10 +333,14 @@ Deployer.prototype.undeploy = function (service, deployment) {
});
};

function get_proxy_selector(service) {
var o = {};
o[labels.SERVICE] = service.metadata.name;
return o;
}

Deployer.prototype.deploy = function (service) {
var proxy_selector = {};
proxy_selector[labels.SERVICE] = service.metadata.name;
var original_selector = service.spec.selector || {'implements':service.metadata.name};
var original_selector = stringify_selector(service.spec.selector);
// deploy the proxy
var deployment = {
metadata: {
Expand All @@ -322,7 +349,7 @@ Deployer.prototype.deploy = function (service) {
},
spec: {
selector: {
matchLabels: proxy_selector,
matchLabels: get_proxy_selector(service),
},
template : {
metadata: {
Expand All @@ -339,7 +366,7 @@ Deployer.prototype.deploy = function (service) {
},
{
name: 'ICPROXY_POD_SELECTOR',
value: stringify_selector(original_selector)
value: original_selector
}
],
image: 'quay.io/skupper/icproxy',
Expand All @@ -358,6 +385,9 @@ Deployer.prototype.deploy = function (service) {
}
}
};
if (this.owner_references) {
deployment.metadata.ownerReferences = this.owner_references;
}
deployment.metadata.annotations[labels.SERVICE] = service.metadata.name;
deployment.spec.template.metadata.labels[labels.SERVICE] = service.metadata.name;
log.info('Deploying proxy for %s', service.metadata.name);
Expand All @@ -367,7 +397,9 @@ Deployer.prototype.deploy = function (service) {
// change the selector
log.info('Updating selector for %s', service.metadata.name);
kubernetes.update('services', service.metadata.name, function (original) {
original.spec.selector = proxy_selector;
original.spec.selector = get_proxy_selector(service);
original.metadata.annotations[labels.ORIGINAL_SELECTOR] = original_selector;
service_utils.set_last_applied(original);
return original;
}).then(function (result) {
var code = result.code;
Expand All @@ -384,8 +416,9 @@ Deployer.prototype.deploy = function (service) {
}
}).catch(function (code, error) {
log.error('Failed to deploy proxy for %s: %s %s', service.metadata.name, code, error);
console.error(error);
});

};

var deployer = new Deployer(process.env.ICPROXY_SERVICE_ACCOUNT || 'icproxy', process.env.SKUPPER_SERVICE_SYNC_ORIGIN);
var deployer = new Deployer(process.env.ICPROXY_SERVICE_ACCOUNT || 'icproxy', process.env.SKUPPER_SERVICE_SYNC_ORIGIN);
2 changes: 2 additions & 0 deletions lib/labels.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const ADDRESS = BASE_QUALIFIER + "/address";
const PROXY = BASE_QUALIFIER + "/proxy";
const SERVICE = INTERNAL_QUALIFIER + "/service";
const ORIGIN = INTERNAL_QUALIFIER + "/origin";
const ORIGINAL_SELECTOR = INTERNAL_QUALIFIER + "/original-selector";
const TYPE = INTERNAL_QUALIFIER + "/type";
const TYPE_PROXY = TYPE + '=proxy';

Expand All @@ -27,6 +28,7 @@ module.exports = {
PROXY: PROXY,
SERVICE: SERVICE,
ORIGIN: ORIGIN,
ORIGINAL_SELECTOR: ORIGINAL_SELECTOR,
TYPE: TYPE,
TYPE_PROXY: TYPE_PROXY
};
24 changes: 18 additions & 6 deletions lib/service_sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var kubernetes = require('./kubernetes.js').client();
var labels = require('./labels.js');
var log = require('./log.js').logger();
var myutils = require('./utils.js');
var service_utils = require('./service_utils.js');

const SERVICE_SYNC_ADDRESS="mc/$skupper-service-sync"

Expand All @@ -26,12 +27,15 @@ function is_local_proxied (service) {
}

function to_service_record (service) {
return {
var record = {
name: service.metadata.name,
proxy: service.metadata.annotations[labels.PROXY],
address: service.metadata.annotations[labels.ADDRESS],
ports: service.spec.ports
};
if (service.metadata.annotations[labels.ADDRESS]) {
record.address = service.metadata.annotations[labels.ADDRESS];
}
return record;
}

function equivalent_ports(a, b) {
Expand Down Expand Up @@ -75,6 +79,7 @@ ServiceSync.prototype.send_local = function () {
ServiceSync.prototype.on_message = function (context) {
if (context.message.subject === 'service-sync-update') {
if (context.message.application_properties.origin !== this.origin) {
log.info('Received service-sync-update from %s: %j', context.message.application_properties.origin, context.message.body);
this.reconcile(context.message.application_properties.origin, context.message.body);
}
} else if (context.message.subject === 'service-sync-request') {
Expand Down Expand Up @@ -109,7 +114,11 @@ function ensure_annotation(service, name, value) {
if (service.metadata.annotations[name] === value) {
return false; //no update needed
} else {
service.metadata.annotations[name] = value;
if (value) {
service.metadata.annotations[name] = value;
} else {
delete service.metadata.annotations[name];
}
return true;
}
} else if (value) {
Expand All @@ -130,7 +139,7 @@ function ensure_ports(service, ports) {
function service_update_function (origin, def) {
return function (service) {
if (service === undefined) {
log.info('creating service for %j', def);
log.info('service-sync creating service for %j', def);
service = {
apiVersion: 'v1',
kind: 'Service',
Expand All @@ -139,21 +148,24 @@ function service_update_function (origin, def) {
annotations: {}
},
spec: {
ports: def.ports
ports: def.ports,
selector: {'skupper.io/implements':def.name}
}
};
service.metadata.annotations[labels.PROXY] = def.proxy;
service.metadata.annotations[labels.ORIGIN] = origin;
if (def.address) {
service.metadata.annotations[labels.ADDRESS] = def.address;
}
service_utils.set_last_applied(service);
return service;
} else {
var changed = ensure_annotation(service, labels.PROXY, def.proxy);
changed = ensure_annotation(service, labels.ADDRESS, def.address) || changed;
changed = ensure_ports(service, def.ports) || changed;
if (changed) {
log.info('updating service for %j', def);
service_utils.set_last_applied(service);
log.info('service-sync updating service for %j', def);
return service;
} else {
log.debug('Service has not changed %j', def);
Expand Down
31 changes: 31 additions & 0 deletions lib/service_utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';

module.exports.set_last_applied = function (service) {
var subset = {
apiVersion:"v1",
kind:"Service",
metadata: {
annotations: service.metadata.annotations || {},
name: service.metadata.name
},
spec:{
ports: service.spec.ports,
selector: service.spec.selector
}
};
delete subset.metadata.annotations["kubectl.kubernetes.io/last-applied-configuration"];
service.metadata.annotations["kubectl.kubernetes.io/last-applied-configuration"] = JSON.stringify(subset);
};

0 comments on commit 3280396

Please sign in to comment.