Skip to content

Commit

Permalink
Add counter of unique visitors
Browse files Browse the repository at this point in the history
Done without storing any personal information such as IP address.
Uses Redis HyperLogLog to keep count.
Daily count and total count.

Signed-off-by: Brian Evans <ebrian101@gmail.com>
  • Loading branch information
mrbrianevans committed Sep 9, 2023
1 parent fcf943c commit 2dd6361
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 33 deletions.
96 changes: 63 additions & 33 deletions server/src/redis/streamFromRedis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,53 @@ import express from "express"
import { WebSocketServer } from "ws"
import { EventEmitter } from "events"
import { listenRedisStream } from "./listenRedisStream.js"
import {streamFromRedisLogger as logger} from '../utils/loggers.js'
import {setTimeout} from "node:timers/promises"
import { streamFromRedisLogger as logger } from "../utils/loggers.js"
import { setTimeout } from "node:timers/promises"
import { saveCompanyNumber } from "./saveCompanyNumber.js"
import { streamPaths } from "../streams/streamPaths.js"
import { updateSchemaForEvent } from "../schemas/maintainSchemas.js"
import { VisitorCounterService } from "./visitorCounter.js"

const eventEmitter = new EventEmitter({})
eventEmitter.setMaxListeners(1_000_000) // increase max listeners (this is clients x num of streams)

const app = express()

let clients = 0
app.get("/health", async (req, res) => {
const commandClient = await getRedisClient()
const health = { currentWsConnections: 0 }
const health = { currentWsConnections: 0, connections: clients }
for (const streamPath of streamPaths) {
const lastHeartbeat = await commandClient.hGet('heartbeats', streamPath).then(t => new Date(parseInt(t || "0")))
const lastHeartbeat = await commandClient.hGet("heartbeats", streamPath).then(t => new Date(parseInt(t || "0")))
health[streamPath] = Date.now() - lastHeartbeat.getTime() < 60_000 // more than 60 seconds indicates stream offline
}
health.currentWsConnections = await commandClient.get('currentWsConnections').then(value => value ? parseInt(value) : 0)
health.currentWsConnections = await commandClient.get("currentWsConnections").then(value => value ? parseInt(value) : 0)
await commandClient.quit()
res.json(health)
})

app.options('/randomCompanyNumbers', (req, res)=>{
res.setHeader('Access-Control-Allow-Origin', 'https://companiesdb.co.uk')
res.setHeader('Access-Control-Allow-Methods', 'GET')
res.setHeader('Access-Control-Allow-Headers', 'Content-Type')
app.options("/randomCompanyNumbers", (req, res) => {
res.setHeader("Access-Control-Allow-Origin", "https://companiesdb.co.uk")
res.setHeader("Access-Control-Allow-Methods", "GET")
res.setHeader("Access-Control-Allow-Headers", "Content-Type")
res.end()
})
/** Returns an array of random company numbers */
app.get("/randomCompanyNumbers", async (req, res) => {
const qty = 1 // this could be a search query param
const companyNumbers = await counterClient.sRandMemberCount('companyNumbers', qty)
res.setHeader('Access-Control-Allow-Origin', 'https://companiesdb.co.uk')
const companyNumbers = await counterClient.sRandMemberCount("companyNumbers", qty)
res.setHeader("Access-Control-Allow-Origin", "https://companiesdb.co.uk")
res.json(companyNumbers)
})
/** Returns the `qty` most recent events in the `streamPath` stream. Eg last 100 filing events. */
app.get("/downloadHistory/:streamPath", async (req, res) => {
const {streamPath} = req.params
const { streamPath } = req.params
const { qty } = req.query
const COUNT = parseInt(String(qty)) || 100 // default to send 100 events, unless specified
if(COUNT > 10_000){
res.status(400).json({statusCode:400, message: 'Qty exceeds maximum. Must be less than 10,000. Received: '+COUNT})
if (COUNT > 10_000) {
res.status(400).json({
statusCode: 400,
message: "Qty exceeds maximum. Must be less than 10,000. Received: " + COUNT
})
return
}
if (streamPaths.has(streamPath)) {
Expand Down Expand Up @@ -103,6 +107,28 @@ app.get("/schemas", async (req, res) => {
res.json(schemas)
})

const counterClient = await getRedisClient()
const visitorCounter = new VisitorCounterService(counterClient)
app.get("/visitors", async (req, res) => {
const total = await visitorCounter.getTotalCount()
const today = await visitorCounter.getCount(new Date().toISOString().split("T")[0])
res.json({ total, today })
})
app.get("/visitors/:date", async (req, res) => {
const { date } = req.params
if (!/[0-9-]{10}/.test(date)) res.status(400).json({ statusCode: 400, message: "Bad date format" })
else {
if (new Date(date) < new Date("2023-09-09")) res.status(416).json({
statusCode: 416,
message: "Records only began on 2023-09-09. Request a date after that"
})
else {
const count = await visitorCounter.getCount(date)
res.json({ [date]: count })
}
}
})

const server = app.listen(3000, () => console.log("Listening on port 3000"))
server.on("request", (req) => console.log("Request to server", req.url))

Expand All @@ -112,32 +138,34 @@ function getListenerCounts() {
counts[streamPath] = eventEmitter.listenerCount(streamPath)
return counts
}
const counterClient = await getRedisClient()

const totalListeners = () => Object.values(getListenerCounts()).reduce((p, c) => p + c)
let clients = 0

// web socket server for sending events to client
const wss = new WebSocketServer({ noServer: true })
wss.on("connection", async function connection(ws, req) {
const stream = new URL(req.url??'/events', `wss://${req.headers.host}`).searchParams.get("stream")
const stream = new URL(req.url ?? "/events", `wss://${req.headers.host}`).searchParams.get("stream")
const send = event => ws.send(JSON.stringify(event))
const requestedStreams = [...streamPaths].filter(streamPath => stream === streamPath || stream === null || stream === "all")
for (const streamPath of requestedStreams)
for (const streamPath of requestedStreams) {
eventEmitter.addListener(streamPath, send)
}
await visitorCounter.count(req.socket.remoteAddress ?? "unknown")
clients++
const redisCount = await counterClient.incr('currentWsConnections')
const redisCount = await counterClient.incr("currentWsConnections")
console.log("Websocket connected.", totalListeners(), "event listeners", { clients, redisCount })
ws.on("close", async (code, reason) => {
for (const streamPath of requestedStreams)
eventEmitter.removeListener(streamPath, send)
clients--
const redisCount = await counterClient.decr('currentWsConnections')
console.log("Websocket disconnected with code.",code, totalListeners(), "event listeners", { clients, redisCount })
const redisCount = await counterClient.decr("currentWsConnections")
console.log("Websocket disconnected with code.", code, totalListeners(), "event listeners", { clients, redisCount })
})
eventEmitter.on('close', () => ws.terminate()) // ws.close() doesn't seem to work. Code should be 1112
eventEmitter.on("close", () => ws.terminate()) // ws.close() doesn't seem to work. Code should be 1112
})
// handles websocket on /events path of server
server.on("upgrade", function upgrade(request, socket, head) {
const url = new URL(request.url??'/events', `wss://${request.headers.host}`)
const url = new URL(request.url ?? "/events", `wss://${request.headers.host}`)
if (url.pathname === "/events") {
wss.handleUpgrade(request, socket, head, function done(ws) {
wss.emit("connection", ws, request)
Expand All @@ -147,9 +175,10 @@ server.on("upgrade", function upgrade(request, socket, head) {
}
})
const ac = new AbortController()
const {signal} = ac
async function shutdown(){
try{
const { signal } = ac

async function shutdown() {
try {
logger.flush()
console.log("Graceful shutdown", new Date())
eventEmitter.emit("close")
Expand All @@ -159,22 +188,23 @@ async function shutdown(){
await counterClient.quit()
logger.flush()
wss.close()
}finally {
} finally {
process.exit()
}
}
process.on('SIGINT', shutdown) // quit on ctrl-c when running docker in terminal
process.on('SIGTERM', shutdown)// quit properly on docker stop

const eventStream = listenRedisStream({streamKeys: [...streamPaths].map(stream=>({stream})), signal})
process.on("SIGINT", shutdown) // quit on ctrl-c when running docker in terminal
process.on("SIGTERM", shutdown)// quit properly on docker stop

const eventStream = listenRedisStream({ streamKeys: [...streamPaths].map(stream => ({ stream })), signal })

for await(const event of eventStream) {
const streamPath = event.stream.split(":")[1]
let parsedEvent = JSON.parse(event.data.event)
eventEmitter.emit(streamPath, { streamPath, ...parsedEvent })
if(streamPath === 'companies')
if (streamPath === "companies")
await saveCompanyNumber(counterClient, parsedEvent, streamPath)
.catch(e=>logger.error(e, 'Error saving company number'))
.catch(e => logger.error(e, "Error saving company number"))
await updateSchemaForEvent(parsedEvent, counterClient)
}

42 changes: 42 additions & 0 deletions server/src/redis/visitorCounter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type { RedisClient } from "./getRedisClient"


/**
* Keeps usage statistics without ever storing the clients IP address.
* Uses a HyperLogLog in redis to count items WITHOUT storing them.
* Preserves privacy and allows basic statistic reporting.
*/
export class VisitorCounterService {

redisClient: RedisClient

constructor(redisClient: RedisClient) {
this.redisClient = redisClient
}

/**
* Counts an IP address.
*/
async count(ip: string) {
const date = new Date().toISOString().slice(0, 10)
await this.redisClient.pfAdd(`visitors-${date}`, ip)
await this.redisClient.pfAdd(`visitors-total`, ip)
}

/**
* Get the count for a specific day.
* @param date - ISO date string of the day to retrieve.
*/
async getCount(date?: string) {
date ??= new Date().toISOString().slice(0, 10)
return await this.redisClient.pfCount(`visitors-${date}`)
}

/**
* Get the total number counted since records began (9 September 2023).
*/
async getTotalCount() {
return await this.redisClient.pfCount(`visitors-total`)
}

}

0 comments on commit 2dd6361

Please sign in to comment.