forked from micado-scale/component-policy-keeper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandle_k8s.py
executable file
·148 lines (135 loc) · 5.6 KB
/
handle_k8s.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import kubernetes.client
import kubernetes.config
import logging
import pk_config
import time
def query_list_of_nodes(endpoint,status='ready'):
log=logging.getLogger('pk_docker')
list_of_nodes=[]
if pk_config.simulate():
return None
kubernetes.config.load_kube_config()
client = kubernetes.client.CoreV1Api()
try:
if status=='ready':
nodes = [x for x in client.list_node().items if not x.spec.taints]
elif status=='down':
nodes = [x for x in client.list_node().items if x.spec.taints and 'master' not in x.spec.taints[0].key]
for n in nodes:
a = {}
a['ID']=n.metadata.name
a['Addr']=n.status.addresses[0].address
list_of_nodes.append(a.copy())
return list_of_nodes
except Exception as e:
log.exception('(Q) Query of k8s nodes failed.')
return None
def scale_docker_service(endpoint,service_name,replicas):
service_name = '-'.join(service_name.split('_')[1:])
log=logging.getLogger('pk_docker')
log.info('(S) => m_container_count: {0}'.format(replicas))
if pk_config.simulate():
return
kubernetes.config.load_kube_config()
client = kubernetes.client.ExtensionsV1beta1Api()
try:
dep = client.read_namespaced_deployment(service_name, "default")
dep.spec.replicas = replicas
client.patch_namespaced_deployment_scale(service_name, "default", dep)
except Exception as e:
log.warning('(S) Scaling of k8s service "{0}" failed: {1}'.format(service_name,str(e)))
return
def query_docker_service_replicas(endpoint,service_name):
service_name = '-'.join(service_name.split('_')[1:])
log=logging.getLogger('pk_docker')
instance = 1
if pk_config.simulate():
return
kubernetes.config.load_kube_config()
client = kubernetes.client.ExtensionsV1beta1Api()
try:
dep = client.read_namespaced_deployment(service_name, "default")
replicas = dep.spec.replicas
log.debug('(C) => m_container_count for {0}: {1}'.format(service_name,replicas))
except Exception as e:
log.warning('(C) Querying k8s service "{0}" replicas failed: {1}'.format(service_name,str(e)))
return instance
def query_service_network(endpoint, stack_name, service_name):
id = None
log=logging.getLogger('pk_docker')
#client = docker.DockerClient(base_url=endpoint)
#full_service_name = stack_name + "_" + service_name
#if pk_config.simulate():
#return None
#service_list = client.services.list()
#i = 0
#while i < len(service_list) and service_list[i].name != full_service_name:
#i += 1
#if i < len(service_list) and service_list[i].name == full_service_name:
#if len(service_list[i].attrs.get("Spec").get("TaskTemplate").get("Networks")) == 1:
#id = service_list[i].attrs.get("Spec").get("TaskTemplate").get("Networks")[0].get("Target")
#log.debug('Docker service "{0}" in stack "{1}" is connected to network "{2}" with id "{3}".'
#.format(service_name, stack_name, client.networks.get(id).name),str(id))
#else:
#log.warning('Docker service "{0}" is connected to more than one network.'.format(full_service_name))
#else:
log.warning('Docker networking not supported in kubernetes, cannot query')
return id
def attach_container_to_network(endpoint, container, network_id):
log=logging.getLogger('pk_docker')
#client = docker.DockerClient(base_url=endpoint)
#network = client.networks.get(network_id)
#if client.containers.get(container).status == "running":
#network.connect(container)
#log.info('Container "{0}" is connected to network "{1}".'.format(container, network.name))
#else:
log.warning('Docker networking not supported in kubernetes, attach container failed')
return
def detach_container_from_network(endpoint, container, network_id):
log=logging.getLogger('pk_docker')
#client = docker.DockerClient(base_url=client_address, version=client_version)
#network = client.networks.get(network_id)
#if client.containers.get(container).status == "running":
#network.disconnect(container)
#log.info('Container "{0}" is disconnected from network "{1}".'.format(container, network.name))
#else:
log.warning('Docker networking not supported in kubernetes, detach container failed')
return
down_nodes_stored={}
def remove_node(endpoint,id):
log=logging.getLogger('pk_docker')
if pk_config.simulate():
return
kubernetes.config.load_kube_config()
client = kubernetes.client.CoreV1Api()
try:
client.delete_node(id, {})
except Exception:
log.error('(M) => Removing k8s node failed.')
return
def down_nodes_cleanup_by_list(stored, actual):
setStored = { v['ID'] for k,v in stored.items() }
setActual = { x['ID'] for x in actual }
missing = { x for x in setStored if x not in setActual }
for x in missing:
del stored[x]
def down_nodes_add_from_list(stored, actual):
for node in actual:
if 'ID' in node and node['ID'] not in stored:
stored[node['ID']]=node
stored[node['ID']]['micado_timestamp']=int(time.time())
def down_nodes_cleanup_by_timeout(endpoint, stored, timeout):
log=logging.getLogger('pk_docker')
current_time = int(time.time())
for id, node in stored.items():
if node['micado_timestamp']+timeout < current_time:
log.info('(M) => Node {0} is down for more than {1} seconds, removing.'.format(id,timeout))
remove_node(endpoint,id)
del stored[id]
def down_nodes_maintenance(endpoint, down_nodes_timeout = 120):
log=logging.getLogger('pk_docker')
down_nodes_actual = query_list_of_nodes(endpoint,status='down')
down_nodes_cleanup_by_list(down_nodes_stored, down_nodes_actual)
down_nodes_add_from_list(down_nodes_stored, down_nodes_actual)
down_nodes_cleanup_by_timeout(endpoint, down_nodes_stored, down_nodes_timeout)
return