mirrored from https://chromium.googlesource.com/infra/luci/luci-py
-
Notifications
You must be signed in to change notification settings - Fork 36
/
config.py
602 lines (480 loc) · 19.4 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Adapter between config service client and the rest of auth_service.
Basically a cron job that each minute refetches config files from config service
and modifies auth service datastore state if anything changed.
Following files are fetched:
imports.cfg - configuration for group importer cron job.
ip_allowlist.cfg - IP allowlists.
oauth.cfg - OAuth client_id allowlist.
Configs are ASCII serialized protocol buffer messages. The schema is defined in
proto/config.proto.
Storing infrequently changing configuration in the config service (implemented
on top of source control) allows to use code review workflow for configuration
changes as well as removes a need to write some UI for them.
"""
import collections
import logging
import posixpath
import re
from google import protobuf
from google.appengine.ext import ndb
from components import config
from components import datastore_utils
from components import gitiles
from components import utils
from components.auth import ipaddr
from components.auth import model
from components.auth.proto import security_config_pb2
from components.config import validation
from proto import config_pb2
import importer
# Config file revision number and where it came from.
Revision = collections.namedtuple('Revision', ['revision', 'url'])
class CannotLoadConfigError(Exception):
"""Raised when fetching configs if they are missing or invalid."""
def is_remote_configured():
"""True if config service backend URL is defined.
If config service backend URL is not set auth_service will use datastore
as source of truth for configuration (with some simple web UI to change it).
If config service backend URL is set, UI for config management will be read
only and all config changes must be performed through the config service.
"""
return bool(get_remote_url())
def get_remote_url():
"""Returns URL of a config service if configured, to display in UI."""
settings = config.ConfigSettings.cached()
if settings and settings.service_hostname:
return 'https://%s' % settings.service_hostname
return None
def get_revisions():
"""Returns a mapping {config file name => Revision instance or None}."""
return dict(utils.async_apply(_CONFIG_SCHEMAS, _get_config_revision_async))
def _get_config_revision_async(path):
"""Returns tuple with info about last imported config revision."""
assert path in _CONFIG_SCHEMAS, path
schema = _CONFIG_SCHEMAS.get(path)
return schema['revision_getter']()
@utils.cache_with_expiration(expiration_sec=60)
def get_settings():
"""Returns auth service own settings (from settings.cfg) as SettingsCfg proto.
Returns default settings if the ones in the datastore are no longer valid.
"""
text = _get_service_config('settings.cfg')
if not text:
return config_pb2.SettingsCfg()
# The config MUST be valid, since we do validation before storing it. If it
# doesn't, better to revert to default setting rather than fail all requests.
try:
msg = config_pb2.SettingsCfg()
protobuf.text_format.Merge(text, msg)
return msg
except protobuf.text_format.ParseError as ex:
logging.error('Invalid settings.cfg: %s', ex)
return config_pb2.SettingsCfg()
def refetch_config(force=False):
"""Refetches all configs from luci-config (if enabled).
Called as a cron job.
"""
# Grab and validate all new configs in parallel.
try:
configs = _fetch_configs(_CONFIG_SCHEMAS)
except CannotLoadConfigError as exc:
logging.error('Failed to fetch configs\n%s', exc)
return
# Figure out what needs to be updated.
dirty = {}
dirty_in_authdb = {}
cur_revs = dict(utils.async_apply(configs, _get_config_revision_async))
for path, (new_rev, conf) in sorted(configs.items()):
assert path in _CONFIG_SCHEMAS, path
cur_rev = cur_revs[path]
if cur_rev != new_rev or force:
if _CONFIG_SCHEMAS[path]['use_authdb_transaction']:
dirty_in_authdb[path] = (new_rev, conf)
else:
dirty[path] = (new_rev, conf)
else:
logging.info('Config %s is up-to-date at rev %s', path, cur_rev.revision)
# First update configs that do not touch AuthDB, one by one.
for path, (rev, conf) in sorted(dirty.items()):
dirty = _CONFIG_SCHEMAS[path]['updater'](None, rev, conf)
logging.info(
'Processed %s at rev %s: %s', path, rev.revision,
'updated' if dirty else 'up-to-date')
# Configs that touch AuthDB are updated in a single transaction so that config
# update generates single AuthDB replication task instead of a bunch of them.
if dirty_in_authdb:
_update_authdb_configs(dirty_in_authdb)
### Integration with config validation framework.
@validation.self_rule('settings.cfg', config_pb2.SettingsCfg)
def validate_settings_cfg(conf, ctx):
assert isinstance(conf, config_pb2.SettingsCfg)
if conf.auth_db_gs_path:
chunks = conf.auth_db_gs_path.split('/')
if len(chunks) < 2 or any(not ch for ch in chunks):
ctx.error('auth_db_gs_path: must have form <bucket>/<path>')
# TODO(vadimsh): Below use validation context for real (e.g. emit multiple
# errors at once instead of aborting on the first one).
@validation.self_rule('imports.cfg')
def validate_imports_config(conf, ctx):
try:
importer.validate_config(conf)
except ValueError as exc:
ctx.error(str(exc))
@validation.self_rule('ip_allowlist.cfg', config_pb2.IPAllowlistConfig)
def validate_ip_allowlist_config(conf, ctx):
try:
_validate_ip_allowlist_config(conf)
except ValueError as exc:
ctx.error(str(exc))
@validation.self_rule('oauth.cfg', config_pb2.OAuthConfig)
def validate_oauth_config(conf, ctx):
try:
_validate_oauth_config(conf)
except ValueError as exc:
ctx.error(str(exc))
# Simple auth_service own configs stored in the datastore as plain text.
# They are different from imports.cfg (no GUI to update them other), and from
# ip_allowlist.cfg and oauth.cfg (not tied to AuthDB changes).
class _AuthServiceConfig(ndb.Model):
"""Text config file imported from luci-config.
Root entity. Key ID is config file name.
"""
# The body of the config itself.
config = ndb.TextProperty()
# Last imported SHA1 revision of the config.
revision = ndb.StringProperty(indexed=False)
# URL the config was imported from.
url = ndb.StringProperty(indexed=False)
@ndb.tasklet
def _get_service_config_rev_async(cfg_name):
"""Returns last processed Revision of given config."""
e = yield _AuthServiceConfig.get_by_id_async(cfg_name)
raise ndb.Return(Revision(e.revision, e.url) if e else None)
def _get_service_config(cfg_name):
"""Returns text of given config file or None if missing."""
e = _AuthServiceConfig.get_by_id(cfg_name)
return e.config if e else None
@ndb.transactional
def _update_service_config(cfg_name, rev, conf):
"""Stores new config (and its revision).
This function is called only if config has already been validated.
"""
assert isinstance(conf, basestring)
e = _AuthServiceConfig.get_by_id(cfg_name) or _AuthServiceConfig(id=cfg_name)
old = e.config
e.populate(config=conf, revision=rev.revision, url=rev.url)
e.put()
return old != conf
### Group importer config implementation details.
@ndb.tasklet
def _get_imports_config_revision_async():
"""Returns Revision of last processed imports.cfg config."""
e = yield importer.config_key().get_async()
if not e or not isinstance(e.config_revision, dict):
raise ndb.Return(None)
desc = e.config_revision
raise ndb.Return(Revision(desc.get('rev'), desc.get('url')))
def _update_imports_config(_root, rev, conf):
"""Applies imports.cfg config."""
# Rewrite existing config even if it is the same (to update 'rev').
cur = importer.read_config()
importer.write_config(conf, {'rev': rev.revision, 'url': rev.url})
return cur != conf
### Implementation of configs expanded to AuthDB entities.
class _ImportedConfigRevisions(ndb.Model):
"""Stores mapping config path -> {'rev': SHA1, 'url': URL}.
Parent entity is AuthDB root (auth.model.root_key()). Updated in a transaction
when importing configs.
"""
# Disable memcache and always fetch from Datastore. This means we can see
# changes from the Go version of the app.
_use_cache = False
_use_memcache = False
revisions = ndb.JsonProperty()
def _imported_config_revisions_key():
return ndb.Key(_ImportedConfigRevisions, 'self', parent=model.root_key())
@ndb.tasklet
def _get_authdb_config_rev_async(path):
"""Returns Revision of last processed config given its name."""
mapping = yield _imported_config_revisions_key().get_async()
if not mapping or not isinstance(mapping.revisions, dict):
raise ndb.Return(None)
desc = mapping.revisions.get(path)
if not isinstance(desc, dict):
raise ndb.Return(None)
raise ndb.Return(Revision(desc.get('rev'), desc.get('url')))
@datastore_utils.transactional
def _update_authdb_configs(configs):
"""Pushes new configs to AuthDB entity group.
Args:
configs: dict {config path -> (Revision tuple, <config>)}.
Returns:
True if anything has changed since last import.
"""
# Get model.AuthGlobalConfig entity, to potentially update it.
root = model.root_key().get()
orig = root.to_dict()
revs = _imported_config_revisions_key().get()
if not revs:
revs = _ImportedConfigRevisions(
key=_imported_config_revisions_key(),
revisions={})
ingested_revs = {} # path -> Revision
for path, (rev, conf) in sorted(configs.items()):
dirty = _CONFIG_SCHEMAS[path]['updater'](root, rev, conf)
revs.revisions[path] = {'rev': rev.revision, 'url': rev.url}
logging.info(
'Processed %s at rev %s: %s', path, rev.revision,
'updated' if dirty else 'up-to-date')
if dirty:
ingested_revs[path] = rev
if root.to_dict() != orig:
assert ingested_revs
report = ', '.join(
'%s@%s' % (p, rev.revision) for p, rev in sorted(ingested_revs.items())
)
logging.info('Global config has been updated: %s', report)
root.record_revision(
modified_by=model.get_service_self_identity(),
modified_ts=utils.utcnow(),
comment='Importing configs: %s' % report)
root.put()
revs.put()
if ingested_revs:
model.replicate_auth_db()
return bool(ingested_revs)
def _validate_ip_allowlist_config(conf):
if not isinstance(conf, config_pb2.IPAllowlistConfig):
raise ValueError('Wrong message type: %s' % conf.__class__.__name__)
allowlists = set()
for ip_allowlist in conf.ip_allowlists:
if not model.IP_WHITELIST_NAME_RE.match(ip_allowlist.name):
raise ValueError('Invalid IP allowlist name: %s' % ip_allowlist.name)
if ip_allowlist.name in allowlists:
raise ValueError('IP allowlist %s is defined twice' % ip_allowlist.name)
allowlists.add(ip_allowlist.name)
for net in ip_allowlist.subnets:
# Raises ValueError if subnet is not valid.
ipaddr.subnet_from_string(net)
if conf.assignments:
raise ValueError('assignments are not supported anymore')
_resolve_ip_allowlist_includes(conf.ip_allowlists)
def _resolve_ip_allowlist_includes(allowlists):
"""Takes a list of IPAllowlist, returns map {name -> [subnets]}.
Subnets are returned as sorted list of strings.
"""
by_name = {m.name: m for m in allowlists}
def resolve_one(wl, visiting):
if wl.name in visiting:
raise ValueError(
'IP allowlist %s is part of an include cycle %s' %
(wl.name, visiting + [wl.name]))
visiting.append(wl.name)
subnets = set(wl.subnets)
for inc in wl.includes:
if inc not in by_name:
raise ValueError(
'IP allowlist %s includes unknown allowlist %s' % (wl.name, inc))
subnets |= resolve_one(by_name[inc], visiting)
visiting.pop()
return subnets
return {m.name: sorted(resolve_one(m, [])) for m in allowlists}
def _update_ip_allowlist_config(root, rev, conf):
assert ndb.in_transaction(), 'Must be called in AuthDB transaction'
assert isinstance(root, model.AuthGlobalConfig), root
now = utils.utcnow()
# Existing allowlist entities.
existing_ip_allowlists = {
e.key.id(): e
for e in model.AuthIPWhitelist.query(ancestor=model.root_key())
}
# Allowlists being imported (name => [list of subnets]).
imported_ip_allowlists = _resolve_ip_allowlist_includes(conf.ip_allowlists)
to_put = []
to_delete = []
# New or modified IP allowlists.
for name, subnets in imported_ip_allowlists.items():
# An existing allowlist and it hasn't changed?
wl = existing_ip_allowlists.get(name)
if wl and wl.subnets == subnets:
continue
# Update the existing (to preserve auth_db_prev_rev) or create a new one.
if not wl:
wl = model.AuthIPWhitelist(
key=model.ip_whitelist_key(name),
created_ts=now,
created_by=model.get_service_self_identity())
wl.subnets = subnets
wl.description = 'Imported from ip_allowlist.cfg'
to_put.append(wl)
# Removed IP allowlists.
for wl in existing_ip_allowlists.values():
if wl.key.id() not in imported_ip_allowlists:
to_delete.append(wl)
if conf.assignments:
raise ValueError('assignments are not supported anymore')
if not to_put and not to_delete:
return False
comment = 'Importing ip_allowlist.cfg at rev %s' % rev.revision
for e in to_put:
e.record_revision(
modified_by=model.get_service_self_identity(),
modified_ts=now,
comment=comment)
for e in to_delete:
e.record_deletion(
modified_by=model.get_service_self_identity(),
modified_ts=now,
comment=comment)
futures = []
futures.extend(ndb.put_multi_async(to_put))
futures.extend(ndb.delete_multi_async(e.key for e in to_delete))
for f in futures:
f.check_success()
return True
def _validate_oauth_config(conf):
if not isinstance(conf, config_pb2.OAuthConfig):
raise ValueError('Wrong message type')
if conf.token_server_url:
utils.validate_root_service_url(conf.token_server_url)
def _update_oauth_config(root, _rev, conf):
assert ndb.in_transaction(), 'Must be called in AuthDB transaction'
assert isinstance(root, model.AuthGlobalConfig), root
existing_as_dict = {
'oauth_client_id': root.oauth_client_id,
'oauth_client_secret': root.oauth_client_secret,
'oauth_additional_client_ids': list(root.oauth_additional_client_ids),
'token_server_url': root.token_server_url,
}
new_as_dict = {
'oauth_client_id': conf.primary_client_id,
'oauth_client_secret': conf.primary_client_secret,
'oauth_additional_client_ids': list(conf.client_ids),
'token_server_url': conf.token_server_url,
}
if new_as_dict == existing_as_dict:
return False
root.populate(**new_as_dict)
return True
### SecurityConfig ingestion.
@validation.self_rule('security.cfg', security_config_pb2.SecurityConfig)
def validate_security_config(conf, ctx):
with ctx.prefix('internal_service_regexp: '):
for regexp in conf.internal_service_regexp:
try:
re.compile('^' + regexp + '$')
except re.error as exc:
ctx.error('bad regexp %r - %s', str(regexp), exc)
def _update_security_config(root, _rev, conf):
assert ndb.in_transaction(), 'Must be called in AuthDB transaction'
assert isinstance(root, model.AuthGlobalConfig), root
# Any changes? Compare semantically, not as byte blobs, since it is not
# guaranteed that the byte blob serialization is stable.
existing = security_config_pb2.SecurityConfig()
if root.security_config:
existing.MergeFromString(root.security_config)
if existing == conf:
return False
# Note: this byte blob will be pushed to all service as is.
root.security_config = conf.SerializeToString()
return True
### Description of all known config files: how to validate and import them.
# Config file name -> {
# 'proto_class': protobuf class of the config or None to keep it as text,
# 'revision_getter': lambda: ndb.Future with <latest imported Revision>
# 'validator': lambda config: <raises ValueError on invalid format>
# 'updater': lambda root, rev, config: True if applied, False if not.
# 'use_authdb_transaction': True to call 'updater' in AuthDB transaction.
# Transactional updaters receive mutable AuthGlobalConfig entity as
# 'root'. Non-transactional updaters receive None instead.
# 'default': Default config value to use if the config file is missing.
# }
_CONFIG_SCHEMAS = {
'imports.cfg': {
'proto_class': None, # importer configs are stored as text
'revision_getter': _get_imports_config_revision_async,
'updater': _update_imports_config,
'use_authdb_transaction': False,
},
'ip_allowlist.cfg': {
'proto_class': config_pb2.IPAllowlistConfig,
'revision_getter':
lambda: _get_authdb_config_rev_async('ip_allowlist.cfg'),
'updater': _update_ip_allowlist_config,
'use_authdb_transaction': True,
},
'oauth.cfg': {
'proto_class': config_pb2.OAuthConfig,
'revision_getter': lambda: _get_authdb_config_rev_async('oauth.cfg'),
'updater': _update_oauth_config,
'use_authdb_transaction': True,
},
'settings.cfg': {
'proto_class':
None, # settings are stored as text in datastore
'default':
'', # it's fine if config file is not there
'revision_getter':
lambda: _get_service_config_rev_async('settings.cfg'),
'updater':
lambda _, rev, c: _update_service_config('settings.cfg', rev, c),
'use_authdb_transaction':
False,
},
'security.cfg': {
'proto_class': security_config_pb2.SecurityConfig,
'default': security_config_pb2.SecurityConfig(),
'revision_getter': lambda: _get_authdb_config_rev_async('security.cfg'),
'updater': _update_security_config,
'use_authdb_transaction': True,
},
}
@utils.memcache('auth_service:get_configs_url', time=300)
def _get_configs_url():
"""Returns URL where luci-config fetches configs from."""
url = config.get_config_set_location(config.self_config_set())
return url or 'about:blank'
def _fetch_configs(paths):
"""Fetches a bunch of config files in parallel and validates them.
Returns:
dict {path -> (Revision tuple, <config>)}.
Raises:
CannotLoadConfigError if some config is missing or invalid.
"""
paths = sorted(paths)
configs_url = _get_configs_url()
out = {}
configs = utils.async_apply(
paths,
lambda p: config.get_self_config_async(
p, dest_type=_CONFIG_SCHEMAS[p]['proto_class'], store_last_good=False)
)
for path, (rev, conf) in configs:
if conf is None:
default = _CONFIG_SCHEMAS[path].get('default')
if default is None:
raise CannotLoadConfigError('Config %s is missing' % path)
rev, conf = '0'*40, default
try:
validation.validate(config.self_config_set(), path, conf)
except ValueError as exc:
raise CannotLoadConfigError(
'Config %s at rev %s failed to pass validation: %s' %
(path, rev, exc))
out[path] = (Revision(rev, _gitiles_url(configs_url, rev, path)), conf)
return out
def _gitiles_url(configs_url, rev, path):
"""URL to a directory in gitiles -> URL to a file at concrete revision."""
try:
location = gitiles.Location.parse(configs_url)
return str(gitiles.Location(
hostname=location.hostname,
project=location.project,
treeish=rev,
path=posixpath.join(location.path, path)))
except ValueError:
# Not a gitiles URL, return as is.
return configs_url