Skip to content

Commit

Permalink
signup metrics (#157)
Browse files Browse the repository at this point in the history
* have a table tracking user signups and activity

* fix set

* rename batched_activity to seen_users, create the table if it doesn't exist, add comments

* wrong sql

* connect_pg

* fix constraint

* rearrange and comment

* use uuid

* rename get_safe_key/get_safe_value and always hash seen uuids

* fix name

* rearrange into cryptography.py

* remove List

* tweak CHANGELOG

* use METRICS_SALT
  • Loading branch information
technillogue authored Mar 31, 2022
1 parent f517040 commit 25aabc3
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- `ask_email_question` (#175)
- improve type hints (#171)
- add values() and items() to pdict (#179)
- new user_activity table holds first seen and last seen per-user per-bot. (#157)

## 1.2.5

Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM registry.gitlab.com/packaging/signal-cli/signal-cli-native:latest as signal
RUN signal-cli --version | tee /signal-version
RUN mv /usr/bin/signal-cli-native /usr/bin/signal-cli

FROM python:3.9 as libbuilder
WORKDIR /app
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ These are the environment variables and flags that the bots read to work. Not al
- `GOOGLE_MAPS_API`: google maps api key
- `PAUTH`: used for PersistDict; see </pdictng_docs/README.md>
- `SALT`: used for PersistDict
- `METRICS_SALT`: used for logging when users were first and last seen. Must be set to log

## Binary flags
- `DOWNLOAD`: download/upload datastore from the database instead of using what's in the current working directory.
Expand Down
54 changes: 53 additions & 1 deletion forest/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from typing import (
Any,
Awaitable,
Coroutine,
Callable,
Coroutine,
Mapping,
Optional,
Tuple,
Expand All @@ -43,6 +43,7 @@
)

import aiohttp
import asyncpg
import termcolor
from aiohttp import web
from phonenumbers import NumberParseException
Expand All @@ -53,6 +54,7 @@
# framework
import mc_util
from forest import autosave, datastore, payments_monitor, pghelp, string_dist, utils
from forest.cryptography import hash_salt
from forest.message import AuxinMessage, Message, StdioMessage

JSON = dict[str, Any]
Expand Down Expand Up @@ -83,6 +85,20 @@ def rpc(
}


ActivityQueries = pghelp.PGExpressions(
table="user_activity",
create_table="""CREATE TABLE user_activity (
id SERIAL PRIMARY KEY,
account TEXT,
first_seen TIMESTAMP default now(),
last_seen TIMESTAMP default now(),
bot TEXT,
UNIQUE (account, bot));""",
log="""INSERT INTO user_activity (account, bot) VALUES ($1, $2)
ON CONFLICT ON CONSTRAINT user_activity_account_bot_key DO UPDATE SET last_seen=now()""",
)


class Signal:
"""
Represents a signal-cli/auxin-cli session.
Expand Down Expand Up @@ -575,6 +591,12 @@ def __init__(self, bot_number: Optional[str] = None) -> None:
if not hasattr(getattr(self, f"do_{name}"), "hide")
]
super().__init__(bot_number)
self.activity = pghelp.PGInterface(
query_strings=ActivityQueries, database=utils.get_secret("DATABASE_URL")
)
# set of users we've received messages from in the last minute
self.seen_users: set[str] = set()
self.log_activity_task = asyncio.create_task(self.log_activity())
self.restart_task = asyncio.create_task(
self.start_process()
) # maybe cancel on sigint?
Expand All @@ -585,14 +607,44 @@ def __init__(self, bot_number: Optional[str] = None) -> None:
self.restart_task_callback(self.handle_messages)
)

async def log_activity(self) -> None:
"""
every 60s, update the user_activity table with users we've seen
runs in the bg as batches to avoid a seperate db query for every message
used for signup metrics
"""
if not self.activity.pool:
await self.activity.connect_pg()
# mypy can't infer that connect_pg creates pool
assert self.activity.pool
while 1:
await asyncio.sleep(60)
if not self.seen_users:
continue
try:
async with self.activity.pool.acquire() as conn:
# executemany batches this into an atomic db query
await conn.executemany(
self.activity.queries["log"],
[(name, utils.APP_NAME) for name in self.seen_users],
)
logging.debug("recorded %s seen users", len(self.seen_users))
self.seen_users = set()
except asyncpg.UndefinedTableError:
logging.info("creating user_activity table")
await self.activity.create_table()

async def handle_messages(self) -> None:
"""
Read messages from the queue. If it matches a pending request to auxin-cli/signal-cli,
set the result for that request. If said result is being rate limited, retry sending it
after pausing. Otherwise, concurrently respond to each message.
"""
metrics_salt = utils.get_secret("METRICS_SALT")
while True:
message = await self.inbox.get()
if metrics_salt and message.uuid:
self.seen_users.add(hash_salt(message.uuid, metrics_salt))
if message.id and message.id in self.pending_requests:
logging.debug("setting result for future %s: %s", message.id, message)
self.pending_requests[message.id].set_result(message)
Expand Down
54 changes: 54 additions & 0 deletions forest/cryptography.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import gzip
import hashlib
import logging
import os
from typing import Union, cast

import base58
from Crypto.Cipher import AES, _mode_eax

SALT = os.getenv("SALT", "ECmG8HtNNMWb4o2bzyMqCmPA6KTYJPCkd")
# build your AESKEY envvar with this: cat /dev/urandom | head -c 32 | base58
AESKEY = base58.b58decode(os.getenv("AESKEY", "kWKuomB9Ty3GcJ9yA1yED").encode()) * 2

if not AESKEY or len(AESKEY) not in [16, 32, 64]:
logging.error(
"Need to set 128b or 256b (16 or 32 byte) AESKEY envvar for persistence. It should be base58 encoded."
)

if len(AESKEY) == 64:
AESKEY = AESKEY[:32]


def encrypt(data: bytes, key: bytes) -> bytes:
"""Accepts data (as arbitrary length bytearray) and key (as 16B or 32B bytearray) and returns authenticated and encrypted blob (as bytearray)"""
cipher = cast(_mode_eax.EaxMode, AES.new(key, AES.MODE_EAX))
ciphertext, authtag = cipher.encrypt_and_digest(data) # pylint: disable
return cipher.nonce + authtag + ciphertext


def decrypt(data: bytes, key: bytes) -> bytes:
"""Accepts ciphertext (as arbitrary length bytearray) and key (as 16B or 32B bytearray) and returns decrypted (plaintext) blob (as bytearray)"""
cipher = cast(_mode_eax.EaxMode, AES.new(key, AES.MODE_EAX, data[:16]))
return cipher.decrypt_and_verify(data[32:], data[16:32]) # pylint: disable


def hash_salt(key_: str, salt: str = SALT) -> str:
"""returns a base58 encoded sha256sum of a salted key"""
return base58.b58encode(hashlib.sha256(f"{salt}{key_}".encode()).digest()).decode()


def get_ciphertext_value(value_: Union[str, bytes]) -> str:
"""returns a base58 encoded aes128 AES EAX mode encrypted gzip compressed value"""
if isinstance(value_, str):
value_bytes = value_.encode()
elif isinstance(value_, bytes):
value_bytes = value_
else:
raise ValueError
return base58.b58encode(encrypt(gzip.compress(value_bytes), AESKEY)).decode()


def get_cleartext_value(value_: str) -> str:
"""decrypts, decodes, decompresses a b58 blob returning cleartext"""
return gzip.decompress(decrypt(base58.b58decode(value_), AESKEY)).decode()
1 change: 0 additions & 1 deletion forest/payments_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import ssl
import time
from typing import Any, Optional

import aiohttp
import asyncpg

Expand Down
71 changes: 10 additions & 61 deletions forest/pdictng.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,22 @@
# Copyright (c) 2022 MobileCoin Inc.
# Copyright (c) 2022 Ilia Daniher <i@mobilecoin.com>
# MIT LICENSE

import asyncio
import gzip
import hashlib
import json
import os
import time
from typing import Any, Generic, Optional, TypeVar, Union, cast, overload

from typing import Any, Generic, Optional, TypeVar, overload
import aiohttp
import base58
from Crypto.Cipher import AES, _mode_eax
from forest.cryptography import get_ciphertext_value, get_cleartext_value, hash_salt

NAMESPACE = os.getenv("FLY_APP_NAME") or open("/etc/hostname").read().strip()
SALT = os.getenv("SALT", "ECmG8HtNNMWb4o2bzyMqCmPA6KTYJPCkd")
# build your AESKEY envvar with this: cat /dev/urandom | head -c 32 | base58
AESKEY = base58.b58decode(os.getenv("AESKEY", "kWKuomB9Ty3GcJ9yA1yED").encode()) * 2

if not AESKEY or len(AESKEY) not in [16, 32, 64]:
raise ValueError(
"Need to set 128b or 256b (16 or 32 byte) AESKEY envvar for persistence. It should be base58 encoded."
)

if len(AESKEY) == 64:
AESKEY = AESKEY[:32]

pAUTH = os.getenv("PAUTH", "")
pURL = os.getenv("PURL", "https://gusc1-charming-parrot-31440.upstash.io")

if not pAUTH:
raise ValueError("Need to set PAUTH envvar for persistence")


def encrypt(data: bytes, key: bytes) -> bytes:
"""Accepts data (as arbitrary length bytearray) and key (as 16B or 32B bytearray) and returns authenticated and encrypted blob (as bytearray)"""
cipher = cast(_mode_eax.EaxMode, AES.new(key, AES.MODE_EAX))
ciphertext, authtag = cipher.encrypt_and_digest(data) # pylint: disable
return cipher.nonce + authtag + ciphertext


def decrypt(data: bytes, key: bytes) -> bytes:
"""Accepts ciphertext (as arbitrary length bytearray) and key (as 16B or 32B bytearray) and returns decrypted (plaintext) blob (as bytearray)"""
cipher = cast(_mode_eax.EaxMode, AES.new(key, AES.MODE_EAX, data[:16]))
return cipher.decrypt_and_verify(data[32:], data[16:32]) # pylint: disable


def get_safe_key(key_: str) -> str:
"""returns a base58 encoded sha256sum of a salted key"""
return base58.b58encode(hashlib.sha256(f"{SALT}{key_}".encode()).digest()).decode()


def get_safe_value(value_: Union[str, bytes]) -> str:
"""returns a base58 encoded aes128 AES EAX mode encrypted gzip compressed value"""
if isinstance(value_, str):
value_bytes = value_.encode()
elif isinstance(value_, bytes):
value_bytes = value_
else:
raise ValueError
return base58.b58encode(encrypt(gzip.compress(value_bytes), AESKEY)).decode()


def get_cleartext_value(value_: str) -> str:
"""decrypts, decodes, decompresses a b58 blob returning cleartext"""
return gzip.decompress(decrypt(base58.b58decode(value_), AESKEY)).decode()


class persistentKVStoreClient:
async def post(self, key: str, data: str) -> str:
raise NotImplementedError
Expand All @@ -92,15 +41,15 @@ def __init__(
self.url = base_url
self.conn = aiohttp.ClientSession()
self.auth = auth_str
self.namespace = get_safe_key(namespace)
self.namespace = hash_salt(namespace)
self.exists: dict[str, bool] = {}
self.headers = {
"Authorization": f"Bearer {self.auth}",
}

async def post(self, key: str, data: str) -> str:
key = get_safe_key(f"{self.namespace}_{key}")
data = get_safe_value(data)
key = hash_salt(f"{self.namespace}_{key}")
data = get_ciphertext_value(data)
# try to set
async with self.conn.post(
f"{self.url}/SET/{key}", headers=self.headers, data=data
Expand All @@ -109,7 +58,7 @@ async def post(self, key: str, data: str) -> str:

async def get(self, key: str) -> str:
"""Get and return value of an object with the specified key and namespace"""
key = get_safe_key(f"{self.namespace}_{key}")
key = hash_salt(f"{self.namespace}_{key}")
async with self.conn.get(f"{self.url}/GET/{key}", headers=self.headers) as resp:
res = await resp.json()
if "result" in res:
Expand Down Expand Up @@ -149,7 +98,7 @@ def __init__(
self.url = base_url
self.conn = aiohttp.ClientSession()
self.auth = auth_str
self.namespace = get_safe_key(namespace)
self.namespace = hash_salt(namespace)
self.exists: dict[str, bool] = {}
self.headers = {
"Content-Type": "application/json",
Expand All @@ -159,8 +108,8 @@ def __init__(
}

async def post(self, key: str, data: str) -> str:
key = get_safe_key(key)
data = get_safe_value(data)
key = hash_salt(key)
data = get_ciphertext_value(data)
# try to set
if self.exists.get(key):
async with self.conn.patch(
Expand Down Expand Up @@ -208,7 +157,7 @@ async def post(self, key: str, data: str) -> str:

async def get(self, key: str) -> str:
"""Get and return value of an object with the specified key and namespace"""
key = get_safe_key(key)
key = hash_salt(key)
async with self.conn.get(
f"{self.url}?select=value&key_=eq.{key}&namespace=eq.{self.namespace}",
headers={
Expand Down

0 comments on commit 25aabc3

Please sign in to comment.