Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1 #15

Merged
merged 9 commits into from
Sep 17, 2024
Merged

V1 #15

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# DHT Prometheus

A bridge to scrape Prometheus metrics from self-registering services, all using direct, end-to-end encrypted peer-to-peer connections.
A bridge to scrape Prometheus metrics from self-registering services, all using direct, end-to-end encrypted peer-to-peer connections (not http).

Its main advantage is that it does not use http: service discovery is done with a decentralised hash table ([HyperDHT](https://github.com/holepunchto/hyperdht)). This means that both this service and the clients it scrapes can live behind a firewall and need no reverse proy nor DNS entries.
Service discovery is done with a decentralised hash table ([HyperDHT](https://github.com/holepunchto/hyperdht)). This means that both this service and the clients it scrapes can live behind a firewall and need no reverse proy nor DNS entries.

Another advantage is the small amount of configuration required. [Clients](https://gitlab.com/dcent-tech/dht-prom-client) register themselves with the DHT-Prometheus service, so no manual list of targets needs to be maintained. All a client needs to register itself, is the DHT-Prometheus service's public key, and a shared secret.
An advantage is the small amount of configuration required. [Clients](https://gitlab.com/dcent-tech/dht-prom-client) register themselves with the DHT-Prometheus service, so no manual list of targets needs to be maintained. All a client needs to register itself, is the DHT-Prometheus service's public key, and a shared secret.

## Deployment

Expand All @@ -16,6 +16,14 @@ The DHT-prometheus service fulfils two complementary roles:

### Run

Configuration is done through environment variables:

- `DHT_PROM_KEY_PAIR_SEED`: 32-byte seed passed to `HyperDHT.keyPair()`, set as hex or z32. Set this to have a consistent public key (otherwise random, which is only useful for tests).
- `DHT_PROM_SHARED_SECRET`: 32-byte secret key, set as hex or z32.
- `DHT_PROM_LOG_LEVEL`: defaults to info
- `DHT_PROM_HTTP_PORT`: port where the http server listens. Defaults to a random port.
- `DHT_PROM_HTTP_HOST`: host where the http server listens. Defaults to 127.0.0.1

#### Docker

```
Expand All @@ -26,6 +34,8 @@ The intent is for the prometheus service to read its config from a read-only bin

Note: `/etc/prometheus/config/prometheus-dht-targets` should be writable by the container's user.

Note: `--network=host` is optional, but HyperDHT holepunching can struggle using the default bridge network, particularly for LAN and localhost connections.

#### CLI

Install:
Expand Down
136 changes: 90 additions & 46 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ class PrometheusDhtBridge extends ReadyResource {
}

async _open () {
await this._loadAliases()

// It is important that the aliases are first loaded
// otherwise the old aliases might get overwritten
await this.aliasRpcServer.ready()
await this._loadAliases()
await this.swarm.listen()

this._checkExpiredsInterval = setInterval(
() => this.cleanupExpireds(),
Expand All @@ -94,13 +93,9 @@ class PrometheusDhtBridge extends ReadyResource {
clearInterval(this._checkExpiredsInterval)
}

await this.aliasRpcServer.close()

await Promise.all([
[...this.aliases.values()].map(a => {
return a.close().catch(safetyCatch)
})]
)
for (const entry of this.aliases.values()) {
entry.close()
}

await this.swarm.destroy()

Expand All @@ -120,7 +115,7 @@ class PrometheusDhtBridge extends ReadyResource {
return updated
}

current.close().catch(safetyCatch)
current.close()
}

const entry = new AliasesEntry(
Expand All @@ -131,7 +126,6 @@ class PrometheusDhtBridge extends ReadyResource {
)

this.aliases.set(alias, entry)
// TODO: just emit entry?
this.emit('set-alias', { alias, entry })
const updated = true

Expand All @@ -142,25 +136,6 @@ class PrometheusDhtBridge extends ReadyResource {
return updated
}

// Should be kept sync (or think hard)
cleanupExpireds () {
const toRemove = []
for (const [alias, entry] of this.aliases) {
if (entry.isExpired) toRemove.push(alias)
}

for (const alias of toRemove) {
const entry = this.aliases.get(alias)
this.aliases.delete(alias)
entry.close().catch(safetyCatch)
this.emit('alias-expired', { publicKey: entry.targetKey, alias })
}

if (toRemove.length > 0) {
this._writeAliases().catch(safetyCatch)
}
}

async _handleGet (req, reply) {
const alias = req.params.alias

Expand All @@ -172,16 +147,16 @@ class PrometheusDhtBridge extends ReadyResource {
return
}

if (!entry.opened) {
await entry.ready()
if (this._forceFlushOnClientReady) await entry.scrapeClient.swarm.flush()
if (this._forceFlushOnClientReady && !entry.hasHandledGet) {
await entry.scrapeClient.swarm.flush()
}
entry.hasHandledGet = true

const scrapeClient = entry.scrapeClient

let res
try {
res = await scrapeClient.lookup()
res = await scrapeClient.requestMetrics()
} catch (e) {
this.emit('upstream-error', e)
reply.code(502)
Expand Down Expand Up @@ -217,19 +192,94 @@ class PrometheusDhtBridge extends ReadyResource {
this.putAlias(alias, z32PubKey, hostname, service, { write: false })
}
} catch (e) {
// An error is expected if the file does not yet exist
// (typically first run only)
this.emit('load-aliases-error', e)
}
}

// Should be kept sync (or think hard)
cleanupExpireds () {
const toRemove = []
for (const [alias, entry] of this.aliases) {
if (entry.isExpired) toRemove.push(alias)
}

for (const alias of toRemove) {
const entry = this.aliases.get(alias)
this.aliases.delete(alias)
entry.close()
this.emit('alias-expired', { publicKey: entry.targetKey, alias })
}

if (toRemove.length > 0) {
this._writeAliases().catch(safetyCatch)
}
}

registerLogger (logger) {
this.on('set-alias', ({ alias, entry }) => {
const scrapeClient = entry.scrapeClient
const publicKey = scrapeClient.targetKey
const { service, hostname } = entry

logger.info(`Registered alias: ${alias} -> ${idEnc.normalize(publicKey)} (${service} on host ${hostname})`)

scrapeClient.on('connection-open', ({ uid, remotePublicKey, remoteAddress }) => {
logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} opened connection to ${remoteAddress} (uid: ${uid})`)
})
scrapeClient.on('connection-close', ({ uid, remotePublicKey, remoteAddress }) => {
logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} closed connection to ${remoteAddress} (uid: ${uid})`)
})
scrapeClient.on('connection-error', ({ error, uid, remotePublicKey, remoteAddress }) => {
logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} at ${remoteAddress} connection error (uid: ${uid}): ${error.stack}`)
})

if (logger.level === 'debug') {
scrapeClient.on('connection-ignore', ({ uid, remotePublicKey, remoteAddress }) => {
logger.debug(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} at ${remoteAddress} ignored connection (uid: ${uid})`)
})
}
})

this.on('aliases-updated', (loc) => {
logger.info(`Updated the aliases file at ${loc}`)
})

this.on('alias-expired', ({ alias, publicKey }) => {
logger.info(`Alias entry expired: ${alias} -> ${idEnc.normalize(publicKey)}`)
})

this.on('load-aliases-error', e => {
// Expected first time the service starts (creates it then)
logger.error(`failed to load aliases file: ${e.stack}`)
})

this.on('upstream-error', e => {
logger.info(`upstream error: ${e.stack}`)
})

this.on('write-aliases-error', e => {
logger.error(`Failed to write aliases file ${e.stack}`)
})

this.aliasRpcServer.registerLogger(logger)
}
}

class AliasesEntry extends ReadyResource {
class AliasesEntry {
constructor (scrapeClient, hostname, service, expiry) {
super()

this.scrapeClient = scrapeClient
this.hostname = hostname
this.service = service
this.expiry = expiry
this.hasHandledGet = false

this.scrapeClient.ready()
}

get closed () {
return this.scrapeClient.closed
}

get targetKey () {
Expand All @@ -244,14 +294,8 @@ class AliasesEntry extends ReadyResource {
this.expiry = expiry
}

async _open () {
await this.scrapeClient.ready()
}

async _close () {
if (this.scrapeClient.opening) {
await this.scrapeClient.close()
}
close () {
this.scrapeClient.close()
}
}

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
"dependencies": {
"b4a": "^1.6.6",
"debounceify": "^1.1.0",
"dht-prom-alias-rpc": "^0.0.1-alpha.1",
"dht-prom-client": "^0.0.1-alpha.10",
"dht-prom-alias-rpc": "^1.0.0",
"dht-prom-client": "^1.0.1",
"fastify": "^4.28.0",
"graceful-goodbye": "^1.3.0",
"hypercore-id-encoding": "^1.3.0",
Expand Down
100 changes: 2 additions & 98 deletions run.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ function loadConfig () {
prometheusTargetsLoc: process.env.DHT_PROM_PROMETHEUS_TARGETS_LOC || './prometheus/targets.json',
logLevel: (process.env.DHT_PROM_LOG_LEVEL || 'info').toLowerCase(),
httpPort: process.env.DHT_PROM_HTTP_PORT || 0,
httpHost: '127.0.0.1',
httpHost: process.env.DHT_PROM_HTTP_HOST || '127.0.0.1',
_forceFlushOnClientReady: process.env._DHT_PROM_FORCE_FLUSH || 'false' // Tests only
}

Expand Down Expand Up @@ -75,7 +75,7 @@ async function main () {
serverLogLevel
})

setupLogging(bridge, logger)
bridge.registerLogger(logger)

goodbye(async () => {
logger.info('Shutting down')
Expand All @@ -90,100 +90,4 @@ async function main () {
logger.info(`DHT RPC ready at public key ${idEnc.normalize(bridge.publicKey)}`)
}

function setupLogging (bridge, logger) {
bridge.on('set-alias', ({ alias, entry }) => {
const scrapeClient = entry.scrapeClient
const publicKey = scrapeClient.targetKey
const { service, hostname } = entry

logger.info(`Registered alias: ${alias} -> ${idEnc.normalize(publicKey)} (${service} on host ${hostname})`)

scrapeClient.on('connection-open', ({ uid, targetKey, peerInfo }) => {
logger.info(`Scraper for ${alias}->${idEnc.normalize(targetKey)} opened connection from ${idEnc.normalize(peerInfo.publicKey)} (uid: ${uid})`)
})
scrapeClient.on('connection-close', ({ uid }) => {
logger.info(`Scraper for ${alias} closed connection (uid: ${uid})`)
})
scrapeClient.on('connection-error', ({ error, uid }) => {
logger.info(`Scraper for ${alias} connection error (uid: ${uid})`)
logger.info(error)
})

if (logger.level === 'debug') {
scrapeClient.on('connection-ignore', ({ uid }) => {
logger.debug(`Scraper for ${alias} ignored connection (uid: ${uid})`)
})
}
})

bridge.on('aliases-updated', (loc) => {
logger.info(`Updated the aliases file at ${loc}`)
})

bridge.on('alias-expired', ({ alias, publicKey }) => {
logger.info(`Alias entry expired: ${alias} -> ${idEnc.normalize(publicKey)}`)
})

bridge.on('load-aliases-error', e => { // TODO: test
// Expected first time the service starts (creates it then)
logger.error('failed to load aliases file')
logger.error(e)
})

bridge.on('upstream-error', e => { // TODO: test
logger.info('upstream error:')
logger.info(e)
})

bridge.on('write-aliases-error', e => {
logger.error('Failed to write aliases file')
logger.error(e)
})

bridge.aliasRpcServer.on(
'alias-request',
({ uid, remotePublicKey, targetPublicKey, alias }) => {
logger.info(`Alias request from ${idEnc.normalize(remotePublicKey)} to set ${alias}->${idEnc.normalize(targetPublicKey)} (uid ${uid})`)
}
)
bridge.aliasRpcServer.on(
'register-success', ({ uid, alias, targetPublicKey, updated }) => {
logger.info(`Alias success for ${alias}->${idEnc.normalize(targetPublicKey)}--updated: ${updated} (uid: ${uid})`)
}
)
// TODO: log IP address + rate limit
bridge.aliasRpcServer.on(
'alias-unauthorised', ({ uid, remotePublicKey, targetPublicKey, alias }) => {
logger.info(`Unauthorised alias request from ${idEnc.normalize(remotePublicKey)} to set alias ${alias}->${idEnc.normalize(targetPublicKey)} (uid: ${uid})`)
}
)
bridge.aliasRpcServer.on(
'register-error', ({ uid, error }) => {
logger.info(`Alias error: ${error} (${uid})`)
}
)

bridge.aliasRpcServer.on(
'connection-open',
({ uid, peerInfo }) => {
const remotePublicKey = idEnc.normalize(peerInfo.publicKey)
logger.info(`Alias server opened connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`)
}
)
bridge.aliasRpcServer.on(
'connection-close',
({ uid, peerInfo }) => {
const remotePublicKey = idEnc.normalize(peerInfo.publicKey)
logger.info(`Alias server closed connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`)
}
)
bridge.aliasRpcServer.on(
'connection-error',
({ uid, error, peerInfo }) => {
const remotePublicKey = idEnc.normalize(peerInfo.publicKey)
logger.info(`Alias server socket error: ${error.stack} on connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`)
}
)
}

main()
Loading
Loading