Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions onboarding-assistant/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
problem: migrate onboarding assistant persistence from Airtable to Google Sheets
backends:
- airtable
- google sheets
flags:
DATA_BACKEND:
description: primary persistence backend
values: [airtable, sheets]
WRITE_BACKENDS:
description: comma-separated list of backends to write to
values: [airtable, sheets, airtable,sheets]
gaps:
- does not emulate Airtable RECORD_ID() values
- does not preserve Airtable formula/lookup/rollup fields
- does not preserve Airtable auto-number fields
- does not preserve Airtable linked-record UI semantics
---

# Onboarding Assistant: Airtable → Google Sheets context

## Goal
Migrate the onboarding assistant data layer from Airtable to Google Sheets using a backend adapter and feature flags.

## Current code structure
- `onboarding-assistant/code/modules/database/__init__.py`
- selects the active backend using `DATA_BACKEND`
- supports dual writes using `WRITE_BACKENDS`
- `onboarding-assistant/code/modules/database/airtable.py`
- current Airtable provider
- handles `Volunteers`, `Email Addresses`, and `Events`/engagement tables
- `onboarding-assistant/code/modules/database/sheets.py`
- new Google Sheets provider implementation
- stores `Volunteers`, `Email Addresses`, `Events`, and normalized tag sheets
- `onboarding-assistant/code/modules/analytics/__init__.py`
- sends analytics events through the same database facade

## Data model and schemas
### Primary records
- `Volunteers`
- `Email Addresses`
- `Events`

### Tag/lookup sheets
- `Skills`
- `Languages`
- `Industries`

### Google Sheets workbook layout
One spreadsheet with tabs:
- `Volunteers`
- `Email Addresses`
- `Events`
- `Skills`
- `Languages`
- `Industries`

## Migration strategy
1. Keep Airtable as the authoritative read source:
- `DATA_BACKEND=airtable`
- `WRITE_BACKENDS=airtable,sheets`
2. Seed Sheets with all writes while reads remain on Airtable
3. Validate parity and fix any data mapping issues
4. Cut over reads to Sheets:
- `DATA_BACKEND=sheets`
5. Optionally keep dual writes while Sheets proves stable
6. Retire Airtable writes once cutover is complete

## Known limitations
- Sheets provider creates its own stable row IDs, not Airtable `RECORD_ID()` values
- Airtable auto-generated and formula-derived fields are not mirrored
- lookup/rollup fields are not automatically reproduced in Sheets
- Google Sheets only emulates relationships via serialized tag IDs and normalized tag tables
- Data model is intentionally denormalized compared to Airtable’s linked records

## Important notes for future agents
- use `DATA_BACKEND` to choose the active backend
- use `WRITE_BACKENDS` to write to both backends during migration
- the Sheets provider requires `google_sheets_spreadsheet_id` and `google_service_account_key`
- `amazon.configuration` loads AWS Secrets Manager values for runtime config

## Files touched
- `onboarding-assistant/code/modules/database/__init__.py`
- `onboarding-assistant/code/modules/database/airtable.py`
- `onboarding-assistant/code/modules/database/sheets.py`
- `onboarding-assistant/code/airtable_schema_discovery.py`
- `onboarding-assistant/code/requirements.txt`
- `onboarding-assistant/code/pyproject.toml`
138 changes: 138 additions & 0 deletions onboarding-assistant/code/airtable_schema_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""Discover Airtable schema for a base and dump it to disk.

Run this script with a read-only Airtable token and a base ID.
It writes these files into the current directory:
- airtable_schema.json
- airtable_schema_summary.md

Expected environment variables:
- AIRTABLE_TOKEN_RO: Airtable read-only personal access token
- AIRTABLE_BASE_ID: Airtable base ID

If the metadata endpoint is unavailable, the script will fail with an
explanation and leave the repository untouched.
"""

import json
import os
import re
import sys
from pathlib import Path

import requests


OUTPUT_DIR = Path(__file__).resolve().parent


def get_env(name, default=None):
value = os.environ.get(name, default)
if value is None:
print(f"Missing required environment variable: {name}", file=sys.stderr)
sys.exit(1)
return value


def get_base_ids():
ids = os.environ.get("AIRTABLE_BASE_IDS")
if ids:
return [base_id.strip() for base_id in ids.split(",") if base_id.strip()]
return [get_env("AIRTABLE_BASE_ID")]


def normalize_base_id(base_id):
return re.sub(r"[^A-Za-z0-9_-]", "_", base_id)


def fetch_base_metadata(base_id, token):
url = f"https://api.airtable.com/v0/meta/bases/{base_id}/tables"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
resp = requests.get(url, headers=headers)
if resp.status_code == 403:
raise RuntimeError(
"Airtable metadata API returned 403. Ensure the token has metadata or "
"read access to the base."
)
if resp.status_code == 404:
raise RuntimeError(
"Airtable metadata endpoint not available for this token/base. "
"Use a token that can access the metadata API."
)
resp.raise_for_status()
return resp.json()


def table_id_to_name_map(tables):
return {table["id"]: table["name"] for table in tables}


def summarize_tables(metadata):
table_names = table_id_to_name_map(metadata["tables"])
lines = []
lines.append(f"# Airtable schema summary")
lines.append("")
lines.append(f"Base ID: {metadata.get('baseId', 'unknown')}")
lines.append("")
lines.append(f"Generated by: {Path(__file__).name}")
lines.append("")

for table in metadata["tables"]:
lines.append(f"## {table['name']}")
lines.append("")
lines.append(f"- table_id: `{table['id']}`")
lines.append(f"- record_name_field: `{table.get('primaryFieldId', '')}`")
lines.append("")
lines.append("| field name | type | linked table | options |")
lines.append("|---|---|---|---|")

for field in table.get("fields", []):
field_name = field.get("name", "")
field_type = field.get("type", "")
options = field.get("options")
linked_table = ""
if options and isinstance(options, dict):
linked_table_id = options.get("linkedTableId")
if linked_table_id:
linked_table = table_names.get(linked_table_id, linked_table_id)
options_text = json.dumps(options, ensure_ascii=False) if options else ""
lines.append(
f"| {field_name} | {field_type} | {linked_table} | {options_text} |"
)

lines.append("")

return "\n".join(lines)


def dump_schema(metadata, base_id):
slug = normalize_base_id(base_id)
output_json = OUTPUT_DIR / f"airtable_schema_{slug}.json"
output_md = OUTPUT_DIR / f"airtable_schema_{slug}.md"

with output_json.open("w", encoding="utf-8") as handle:
json.dump(metadata, handle, indent=2, sort_keys=True, ensure_ascii=False)

summary_text = summarize_tables(metadata)
with output_md.open("w", encoding="utf-8") as handle:
handle.write(summary_text)

print(f"Wrote schema metadata to {output_json}")
print(f"Wrote schema summary to {output_md}")


def main():
base_ids = get_base_ids()
token = get_env("AIRTABLE_TOKEN_RO")

for base_id in base_ids:
print(f"Discovering Airtable schema for base {base_id}")
metadata = fetch_base_metadata(base_id, token)
dump_schema(metadata, base_id)


if __name__ == "__main__":
main()
125 changes: 45 additions & 80 deletions onboarding-assistant/code/modules/database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,100 +1,65 @@
"""Airtable database driver and volunteer record ingestor.
This module exposes high-level functions for storing volunteer records.
>>>
"""Database facade for the onboarding assistant.

This module provides a provider-agnostic interface for the app. It
routes reads and writes to the configured backend and supports feature
flag-style write-through for migration.
"""
from airtable import airtable
import amazon

import os

from . import airtable, sheets


BACKENDS = {
"airtable": airtable,
"sheets": sheets
}


DATA_BACKEND = os.environ.get("DATA_BACKEND", "airtable").lower()
WRITE_BACKENDS = [
backend.strip().lower()
for backend in os.environ.get("WRITE_BACKENDS", DATA_BACKEND).split(",")
if backend.strip()
]


volunteers = airtable.Airtable(
amazon.configuration["airtable_volunteers_base"],
api_key=amazon.configuration["airtable_token"]
)
mails = airtable.Airtable(
amazon.configuration["airtable_mails_base"],
api_key=amazon.configuration["airtable_token"]
)
engagement = airtable.Airtable(
amazon.configuration["airtable_engagement_base"],
api_key=amazon.configuration["airtable_token"]
)
def _backend(name):
try:
return BACKENDS[name]
except KeyError as exc:
raise ValueError(
f"Unsupported database backend: {name}. "
f"Available backends: {', '.join(BACKENDS)}"
) from exc


def set_field(base, table, field, value):
return base.create(table, {field: value})["id"]
def _primary():
return _backend(DATA_BACKEND)


def get_records(base, table, field, value):
filter = {"filter_by_formula": filter_formula(field, value)}
return base.get(table, **filter)["records"]
def _write_backends():
return [_backend(name) for name in WRITE_BACKENDS]


def tags(base, table, field, values):
response=[]
for value in values:
response += [
records[0]["id"]
if (records := get_records(base, table, field, value)) else
set_field(base, table, field, value)
]
return list(set(response))
def _read(method, *args, **kwargs):
return getattr(_primary(), method)(*args, **kwargs)


def filter_formula(field, value):
"""Creates an Airtable filter formula that returns fields == value."""
field = field.replace("{", r"\{").replace("}", r"\}")
value = value.replace("'", r"\'").replace("\\", "\\\\")
return f"{{{field}}} = '{value}'"
def _write(method, *args, **kwargs):
result = None
for backend in _write_backends():
result = getattr(backend, method)(*args, **kwargs)
return result


def insert_volunteer_record(user, address, form):
volunteer = volunteers.create("Volunteers", {
"Slack Handle": (
user["profile"]["display_name_normalized"] or
user["profile"]["real_name_normalized"]
),
"Slack User ID": user["id"],
"Profession": form["profession"],
"External Organization": form.get("organization", ""),
"LinkedIn Profile": form.get("linkedin", ""),
"Weekly Capacity (new)": int(form["availability"].pop()),
"Skills": tags(volunteers, "Skills", "Name", form["skills"]),
"Languages": tags(volunteers, "Languages", "Language", form["languages"]),
"Industry": tags(volunteers, "Industries", "Name", form["industries"]),
"City": (
address.get("locality") or
address.get("administrative_area_level_2", "")
),
"Country (new)": address.get("country", ""),
"State/Province": address.get("administrative_area_level_1", ""),
"Zip Code": address.get("postal_code", ""),
"Geolocation": address.get("address", ""),
"Geocode": ", ".join(map(str, address.get("location", []))),
"Volunteer Interest": True,
"Timezone": user["tz_label"],
"Experience": form["experience"],
"Management Interest": "leadership" in form["options"],
"Privacy Policy": "privacy" in form["options"]
})["id"]
mail = mails.create("Email Addresses", {
"Email Address": user["profile"]["email"],
"Volunteer Record": volunteer
})
return volunteer
return _write("insert_volunteer_record", user, address, form)


def insert_event_record(user, category, action, label=""):
return engagement.create("Events", {
"User ID": user,
"Category": tags(engagement, "Categories", "Category", [category]),
"Action": tags(engagement, "Actions", "Action", [action]),
"Label": label
})["id"]
return _write("insert_event_record", user, category, action, label)


def check_volunteer(identifier):
"""Checks if there is any volunteer with the given Slack identifier."""
return volunteers.get(
"Volunteers",
filter_by_formula=filter_formula("Slack User ID", identifier)
)["records"]
return _read("check_volunteer", identifier)
Loading