Skip to content

Commit

Permalink
Fix #561: Synchronize waitlists to Acoustic (#708)
Browse files Browse the repository at this point in the history
* Add setting and parameter for waitlist table id

* Handle waitlist rows

* Remove inappropriate assertions

These assertions are out of topic for this test. Remove them to avoid having to adjust them to the waitlist additions.

* Lint

* Read optional waitlist fields from DB

* Add test about waitlist rows

* Rework existing tests and assertions

* Clean timestamp code

* Add ADR to support chosen approach

* Use unsubscribed/unsub_reason fields in Acoustic syncdd

* Fix order of migration after rebase

* Add notes about Acoustic tables in docs

* Lint
  • Loading branch information
leplatrem authored Jul 13, 2023
1 parent d4d3e38 commit ee49a1d
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 73 deletions.
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
"filename": "tests/unit/test_acoustic_service.py",
"hashed_secret": "3c3b274d119ff5a5ec6c1e215c1cb794d9973ac1",
"is_verified": false,
"line_number": 21
"line_number": 22
}
],
"tests/unit/test_auth.py": [
Expand Down Expand Up @@ -185,5 +185,5 @@
}
]
},
"generated_at": "2023-07-11T10:47:25Z"
"generated_at": "2023-07-13T13:27:23Z"
}
56 changes: 47 additions & 9 deletions ctms/acoustic_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ctms.background_metrics import BackgroundMetricService
from ctms.config import re_trace_email
from ctms.schemas import ContactSchema, NewsletterSchema
from ctms.schemas.waitlist import WaitlistBase

# Start cherry-picked from django.utils.encoding
_PROTECTED_TYPES = (
Expand Down Expand Up @@ -121,6 +122,7 @@ def __init__(
self,
acoustic_main_table_id,
acoustic_newsletter_table_id,
acoustic_waitlist_table_id,
acoustic_product_table_id,
acoustic_client: Acoustic,
metric_service: BackgroundMetricService = None,
Expand All @@ -133,6 +135,7 @@ def __init__(
self.relational_tables = {
"newsletter": str(int(acoustic_newsletter_table_id)),
"product": str(int(acoustic_product_table_id)),
"waitlist": str(int(acoustic_waitlist_table_id)),
}
self.logger = structlog.get_logger(__name__)
self.context: Dict[str, Union[str, int, List[str]]] = {}
Expand All @@ -142,19 +145,21 @@ def convert_ctms_to_acoustic(
self,
contact: ContactSchema,
main_fields: set[str],
waitlist_fields: set[str],
newsletters_mapping: dict[str, str],
):
acoustic_main_table = self._main_table_converter(contact, main_fields)
newsletter_rows, acoustic_main_table = self._newsletter_converter(
acoustic_main_table, contact, newsletters_mapping
)
acoustic_main_table = self._waitlist_converter(
waitlist_rows, acoustic_main_table = self._waitlist_converter(
acoustic_main_table,
contact,
main_fields,
waitlist_fields,
)
product_rows = self._product_converter(contact)
return acoustic_main_table, newsletter_rows, product_rows
return acoustic_main_table, newsletter_rows, waitlist_rows, product_rows

def _main_table_converter(self, contact, main_fields):
acoustic_main_table = {}
Expand Down Expand Up @@ -263,16 +268,20 @@ def _newsletter_converter(self, acoustic_main_table, contact, newsletters_mappin
self.context["newsletters_skipped"] = sorted(skipped)
return newsletter_rows, acoustic_main_table

def _waitlist_converter(self, acoustic_main_table, contact, main_fields):
def _waitlist_converter(
self, acoustic_main_table, contact, main_fields, waitlist_fields
):
"""Turns waitlists into flat fields on the main table.
If the field `{name}_waitlist_{field}` is not present in the `main_fields`
list, then it is ignored.
See `bin/acoustic_fields.py` to manage them (eg. add ``vpn_waitlist_source``).
Note: In the future, a dedicated relation/table for waitlists can be considered.
"""
waitlists_by_name = {wl.name: wl for wl in contact.waitlists}
waitlist_rows = []
contact_waitlists: List[WaitlistBase] = contact.waitlists
contact_email_id = str(contact.email.email_id)

waitlists_by_name = {wl.name: wl for wl in contact_waitlists}
for acoustic_field_name in main_fields:
if "_waitlist_" not in acoustic_field_name:
continue
Expand All @@ -284,7 +293,29 @@ def _waitlist_converter(self, acoustic_main_table, contact, main_fields):
acoustic_main_table[
acoustic_field_name
] = self.transform_field_for_acoustic(value)
return acoustic_main_table

# Waitlist relational table
for waitlist in contact_waitlists:
waitlist_row = {
"email_id": contact_email_id,
"waitlist_name": waitlist.name,
"waitlist_source": str(waitlist.source or ""),
"subscribed": waitlist.subscribed,
"unsub_reason": waitlist.unsub_reason or "",
# Timestamps
"create_timestamp": waitlist.create_timestamp.date().isoformat(),
"update_timestamp": waitlist.update_timestamp.date().isoformat(),
}
# Extra optional fields (eg. "geo", "platform", ...)
for field in waitlist_fields:
waitlist_row[f"waitlist_{field}"] = str(
waitlist.fields.get(field) or ""
)

# TODO: manage sync of unsubscribed waitlists (currently not possible)
waitlist_rows.append(waitlist_row)

return waitlist_rows, acoustic_main_table

@staticmethod
def transform_field_for_acoustic(data):
Expand Down Expand Up @@ -430,6 +461,7 @@ def attempt_to_upload_ctms_contact(
self,
contact: ContactSchema,
main_fields: set[str],
waitlist_fields: set[str],
newsletters_mapping: dict[str, str],
): # raises AcousticUploadError
"""
Expand All @@ -439,8 +471,13 @@ def attempt_to_upload_ctms_contact(
"""
self.context = {}
try:
main_table_data, nl_data, prod_data = self.convert_ctms_to_acoustic(
contact, main_fields, newsletters_mapping
(
main_table_data,
nl_data,
wl_data,
prod_data,
) = self.convert_ctms_to_acoustic(
contact, main_fields, waitlist_fields, newsletters_mapping
)
main_table_id = str(self.acoustic_main_table_id)
email_id = main_table_data["email_id"]
Expand All @@ -453,6 +490,7 @@ def attempt_to_upload_ctms_contact(
columns=main_table_data,
)
self._insert_update_relational_table("newsletter", nl_data)
self._insert_update_relational_table("waitlist", wl_data)
self._insert_update_relational_table("product", prod_data)

# success
Expand Down
6 changes: 4 additions & 2 deletions ctms/bin/acoustic.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,13 @@ def do_dump(dbsession, contacts, output: TextIO):
acoustic_client=None,
acoustic_main_table_id=-1,
acoustic_newsletter_table_id=-1,
acoustic_waitlist_table_id=-1,
acoustic_product_table_id=-1,
)
main_fields = {
f.field for f in get_all_acoustic_fields(dbsession, tablename="main")
}
waitlist_fields: set[str] = set() # Unused in main table dump.
newsletters_mapping = {
m.source: m.destination for m in get_all_acoustic_newsletters_mapping(dbsession)
}
Expand All @@ -263,8 +265,8 @@ def do_dump(dbsession, contacts, output: TextIO):
with tempfile.NamedTemporaryFile(mode="w") as tmpfile:
for email in contacts:
contact = ContactSchema.from_email(email)
main_table_row, _, _ = service.convert_ctms_to_acoustic(
contact, main_fields, newsletters_mapping
main_table_row, _, _, _ = service.convert_ctms_to_acoustic(
contact, main_fields, waitlist_fields, newsletters_mapping
)
columns.update(set(main_table_row.keys()))
tmpfile.write(json.dumps(main_table_row) + "\n")
Expand Down
1 change: 1 addition & 0 deletions ctms/bin/acoustic_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def main(db, settings):
refresh_token=settings.acoustic_refresh_token,
acoustic_main_table_id=settings.acoustic_main_table_id,
acoustic_newsletter_table_id=settings.acoustic_newsletter_table_id,
acoustic_waitlist_table_id=settings.acoustic_waitlist_table_id,
acoustic_product_table_id=settings.acoustic_product_subscriptions_id,
server_number=settings.acoustic_server_number,
retry_limit=settings.acoustic_retry_limit,
Expand Down
2 changes: 2 additions & 0 deletions ctms/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Settings(BaseSettings):
acoustic_refresh_token: Optional[str] = None
acoustic_main_table_id: Optional[int] = None
acoustic_newsletter_table_id: Optional[int] = None
acoustic_waitlist_table_id: Optional[int] = None
acoustic_product_subscriptions_id: Optional[int] = None
prometheus_pushgateway_url: Optional[str] = None
background_healthcheck_path: Optional[str] = None
Expand All @@ -81,6 +82,7 @@ class BackgroundSettings(Settings):
acoustic_refresh_token: str
acoustic_main_table_id: int
acoustic_newsletter_table_id: int
acoustic_waitlist_table_id: int
acoustic_product_subscriptions_id: int
prometheus_pushgateway_url: str

Expand Down
11 changes: 9 additions & 2 deletions ctms/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
refresh_token,
acoustic_main_table_id,
acoustic_newsletter_table_id,
acoustic_waitlist_table_id,
acoustic_product_table_id,
server_number,
retry_limit=5,
Expand All @@ -44,6 +45,7 @@ def __init__(
acoustic_client=acoustic_client,
acoustic_main_table_id=acoustic_main_table_id,
acoustic_newsletter_table_id=acoustic_newsletter_table_id,
acoustic_waitlist_table_id=acoustic_waitlist_table_id,
acoustic_product_table_id=acoustic_product_table_id,
metric_service=metric_service,
)
Expand All @@ -58,6 +60,7 @@ def _sync_pending_record(
db,
pending_record: PendingAcousticRecord,
main_fields: set[str],
waitlist_fields: set[str],
newsletters_mapping: dict[str, str],
) -> str:
state = "unknown"
Expand All @@ -67,7 +70,7 @@ def _sync_pending_record(
contact = get_contact_by_email_id(db, pending_record.email_id)
try:
self.ctms_to_acoustic.attempt_to_upload_ctms_contact(
contact, main_fields, newsletters_mapping
contact, main_fields, waitlist_fields, newsletters_mapping
)
except AcousticUploadError as exc:
email_domain = (
Expand Down Expand Up @@ -134,6 +137,10 @@ def sync_records(self, db, end_time=None) -> Dict[str, Union[int, str]]:
db.query(AcousticField).filter(AcousticField.tablename == "main").all()
)
main_fields = {entry.field for entry in main_acoustic_fields}
waitlist_acoustic_fields: List[AcousticField] = (
db.query(AcousticField).filter(AcousticField.tablename == "waitlist").all()
)
waitlist_fields = {entry.field for entry in waitlist_acoustic_fields}

newsletters_mapping_entries: List[AcousticNewsletterMapping] = db.query(
AcousticNewsletterMapping
Expand All @@ -158,7 +165,7 @@ def sync_records(self, db, end_time=None) -> Dict[str, Union[int, str]]:
record_created = None
for acoustic_record in all_acoustic_records_before_now:
state = self._sync_pending_record(
db, acoustic_record, main_fields, newsletters_mapping
db, acoustic_record, main_fields, waitlist_fields, newsletters_mapping
)
total += 1

Expand Down
131 changes: 131 additions & 0 deletions docs/adrs/004-waitlists_acoustic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Waitlists in Acoustic

* Status: proposed
* Deciders: <CTMS stakeholders> + <Basket stakeholders>
* Date: June 30, 2023

## Context and Problem Statement

Users join products waiting lists. Marketing wants to reach out when the product becomes available in a certain country or on a certain platform.

There needs to be a convenient way to target users waiting for a certain product from a certain country or platform (or other additional fields).

CTMS holds this waitlist data, but currently does not synchronize it to Acoustic.

## Decision Drivers

In order to choose our solution we considered the following criteria:

- **Complexity**: Low → High: how complex is the solution
- **Cost**: Low → High: how much engineering efforts is required
- **Level of self-service**: Low → High: how much efforts are necessary

## Considered Options

1. [Option 1 - Fields on the main table](#option-1---fields-on-the-main-table)
2. [Option 2 - One relational table]()
3. [Option 3 - Several relational tables]()
3. [Option 4 - Rely on contact lists]()

## Decision Outcome

Chosen option: Option 2 because it has the highest level of self-service and the lowest cost and complexity. Plus, it follows what we already have in place for newsletters.

## Pros and Cons of the Options

### Option 1 - Fields on the main table

With this solution, we add columns on the Acoustic main table, following the same approach as newsletters and VPN/Relay waitlists.

For example, currently we have the following columns:

* `vpn_waitlist_geo`
* `vpn_waitlist_platform`
* `relay_waitlist_geo`

For example, the marketing team uses the `*_geo` columns to target users waiting for the product from a specific country. Obviously, it will be empty for most contacts in the main table.

**Complexity**: Mid

This does not increase the complexity of the code base, since it follows an existing pattern.

However, technical debt increases over time. Because if we had to onboard many waitlists, with many additional fields, the number of columns on the main table could become very large and not very practicable.

**Cost**: Low

We already have the code in place for VPN and Relay where we sync columns with this pattern `{name}_waitlist_{field}`.

**Level of self-service**: Low

In order to synchronize a new waitlist on Acoustic, we would have to:

1. Create the column(s) in Acoustic
2. Declare the column(s) in CTMS using `acoustic.py fields add main:{name}_waitlist_{field}`


### Option 2 - One relational table

A new relational table, `waitlist`, almost equivalent to the PostgreSQL, but with fields being fattened as columns.

Since we have a single table for all waitlists, its columns will be the union of all waitlists different fields. If waitlists have all very different types of extra fields, these columns will mostly have empty cells.

**Complexity**: Low

This does not increase the complexity of the code base, since it follows an existing pattern.

**Cost**: Low

The code was trivial to implement since it follow exactly the same approach as for the newsletter relational table.

**Level of self-service**: Mid-High

In order to synchronize a new waitlist on Acoustic that has the same fields as others, no action would be required.

Otherwise, if the waitlist has a specific field that does not exist yet, then we would have to:

1. Create the column(s) in Acoustic
2. Declare the column(s) in CTMS using `acoustic.py fields add waitlist:{field}`

### Option 3 - Several relational tables

With this solution, we have a dedicated relational table for each waitlist. Each table contains the appropriate columns.

**Complexity**: Mid

The Acoustic service code currently relies on a finite list of tables. With this solution, it would become slightly more abstract and manage an arbitrary number of tables.

**Cost**: Mid

Some refactoring would be necessary, but globally this solution would follow what we currently do for newsletters.

**Level of self-service**: Low

In order to synchronize a new waitlist on Acoustic, we would have to:

1. Create the relational table in Acoustic
2. Declare the column(s) in CTMS using `acoustic.py fields add {name}:{field}`

### Option 4 - Rely on contact lists

With this solution, CTMS would process the wailists records and create contacts lists based on predefined segmentation.

For example, CTMS would create and maintain in sync contact lists per product and country and per platform.

**Complexity**: Mid

The concept is relatively simple. Since this is accomplished in background tasks, performance is not a major concern. And if implemented simply using queries, the complexity would be limited to the timestamp based synchronization.

**Cost**: Mid

The approach would be new in the Acoustic service code base, and we would have to leverage Acoustic APIs that we haven't used until now.

**Level of self-service**: Mid

We could imagine to have default segmentation criteria.

In order to synchronize a new waitlist on Acoustic that has the same predefined fields as others, no action would be required.

Otherwise, if the waitlist has a specific field to distinguish contacts:

1. Modify the code to add the distinction field or query
2. Redeploy
Loading

0 comments on commit ee49a1d

Please sign in to comment.