Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: evaluation v2 #41

Merged
merged 13 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/amplitude_experiment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@
from .cookie import AmplitudeCookie
from .local.client import LocalEvaluationClient
from .local.config import LocalEvaluationConfig
from .flagresult import FlagResult
from .assignment import AssignmentConfig
14 changes: 8 additions & 6 deletions src/amplitude_experiment/assignment/assignment.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import time
from typing import Dict

from ..flagresult import FlagResult
from .. import Variant
from ..user import User

DAY_MILLIS = 24 * 60 * 60 * 1000


class Assignment:

def __init__(self, user: User, results: Dict[str, FlagResult]):
def __init__(self, user: User, results: Dict[str, Variant]):
self.user = user
self.results = results
self.timestamp = time.time() * 1000
Expand All @@ -18,8 +18,10 @@ def canonicalize(self) -> str:
user = self.user.user_id.strip() if self.user.user_id else 'None'
device = self.user.device_id.strip() if self.user.device_id else 'None'
canonical = user + ' ' + device + ' '
for key in sorted(self.results):
value = self.results[key].variant['key'].strip() if self.results[key] and self.results[key].variant and \
self.results[key].variant.get('key') else 'None'
canonical += key.strip() + ' ' + value + ' '
for flag_key in sorted(self.results):
variant = self.results[flag_key]
if variant.key is None:
continue
value = self.results[flag_key].key.strip()
canonical += flag_key.strip() + ' ' + value + ' '
return canonical
30 changes: 21 additions & 9 deletions src/amplitude_experiment/assignment/assignment_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,35 @@
def to_event(assignment: Assignment) -> BaseEvent:
event = BaseEvent(event_type='[Experiment] Assignment', user_id=assignment.user.user_id,
device_id=assignment.user.device_id, event_properties={}, user_properties={})
for key in sorted(assignment.results):
event.event_properties[key + '.variant'] = assignment.results[key].variant['key']

set_props = {}
unset_props = {}

for key in sorted(assignment.results):
if assignment.results[key].type == FLAG_TYPE_MUTUAL_EXCLUSION_GROUP:
for flag_key in sorted(assignment.results):
variant = assignment.results[flag_key]
if variant.key is None:
continue
# Get variant metadata
version: int = variant.metadata.get('flagVersion') if variant.metadata is not None else None
segment_name: str = variant.metadata.get('segmentName') if variant.metadata is not None else None
flag_type: str = variant.metadata.get('flagType') if variant.metadata is not None else None
default: bool = False
if variant.metadata is not None and variant.metadata.get('default') is not None:
default = variant.metadata.get('default')
# Set event properties
event.event_properties[flag_key + '.variant'] = variant.key
if version is not None and segment_name is not None:
event.event_properties[flag_key + '.details'] = f"v{version} rule:{segment_name}"
bgiori marked this conversation as resolved.
Show resolved Hide resolved
# Build user properties
if flag_type == FLAG_TYPE_MUTUAL_EXCLUSION_GROUP:
continue
elif assignment.results[key].is_default_variant:
unset_props[f'[Experiment] {key}'] = '-'
elif default:
unset_props[f'[Experiment] {flag_key}'] = '-'
else:
set_props[f'[Experiment] {key}'] = assignment.results[key].variant['key']
set_props[f'[Experiment] {flag_key}'] = variant.key

# Set user properties and insert id
event.user_properties['$set'] = set_props
event.user_properties['$unset'] = unset_props

event.insert_id = f'{event.user_id} {event.device_id} {hash_code(assignment.canonicalize())} {int(assignment.timestamp / DAY_MILLIS)}'

return event
Expand Down
8 changes: 0 additions & 8 deletions src/amplitude_experiment/flagresult.py

This file was deleted.

94 changes: 70 additions & 24 deletions src/amplitude_experiment/local/client.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import json
import logging
from threading import Lock
from typing import Any, List, Dict
from typing import Any, List, Dict, Set

from amplitude import Amplitude

from .config import LocalEvaluationConfig
from ..flagresult import FlagResult
from .topological_sort import topological_sort
from ..assignment import Assignment, AssignmentFilter, AssignmentService
from ..assignment.assignment_service import FLAG_TYPE_MUTUAL_EXCLUSION_GROUP, FLAG_TYPE_HOLDOUT_GROUP
from ..user import User
from ..connection_pool import HTTPConnectionPool
from .poller import Poller
from .evaluation.evaluation import evaluate
from ..util import deprecated
from ..util.user import user_to_evaluation_context
from ..variant import Variant
from ..version import __version__

Expand Down Expand Up @@ -57,37 +58,68 @@ def start(self):
self.__do_flags()
self.poller.start()

def evaluate(self, user: User, flag_keys: List[str] = None) -> Dict[str, Variant]:
def evaluate_v2(self, user: User, flag_keys: Set[str] = None) -> Dict[str, Variant]:
tyiuhc marked this conversation as resolved.
Show resolved Hide resolved
"""
Locally evaluates flag variants for a user.
Parameters:
Locally evaluates flag variants for a user.

This function will only evaluate flags for the keys specified in the flag_keys argument. If flag_keys is
missing or None, all flags are evaluated.

Parameters:
user (User): The user to evaluate
flag_keys (List[str]): The flags to evaluate with the user. If empty, all flags from the flag cache are evaluated.
flag_keys (List[str]): The flags to evaluate with the user. If empty, all flags are evaluated.

Returns:
The evaluated variants.
"""
variants = {}
if self.flags is None or len(self.flags) == 0:
return variants
user_json = str(user)
self.logger.debug(f"[Experiment] Evaluate: User: {user_json} - Flags: {self.flags}")
result_json = evaluate(self.flags, user_json)
self.logger.debug(f"[Experiment] Evaluate: user={user} - Flags: {self.flags}")
context = user_to_evaluation_context(user)
sorted_flags = topological_sort(self.flags, flag_keys)
flags_json = json.dumps(sorted_flags)
context_json = json.dumps(context)
result_json = evaluate(flags_json, context_json)
self.logger.debug(f"[Experiment] Evaluate Result: {result_json}")
evaluation_result = json.loads(result_json)
filter_result = flag_keys is not None
assignment_result = {}
for key, value in evaluation_result.items():
included = not filter_result or key in flag_keys
tyiuhc marked this conversation as resolved.
Show resolved Hide resolved
if not value.get('isDefaultVariant') and included:
variants[key] = Variant(value['variant'].get('key'), value['variant'].get('payload'))
if included or evaluation_result[key]['type'] == FLAG_TYPE_MUTUAL_EXCLUSION_GROUP or \
evaluation_result[key]['type'] == FLAG_TYPE_HOLDOUT_GROUP:
assignment_result[key] = FlagResult(value)
if self.assignment_service:
self.assignment_service.track(Assignment(user, assignment_result))
error = evaluation_result.get('error')
if error is not None:
self.logger.error(f"[Experiment] Evaluation failed: {error}")
return variants
result = evaluation_result.get('result')
if result is None:
return variants
for flag_key, variant in result.items():
variants[flag_key] = Variant(
key=variant.get('key'),
value=variant.get('value'),
payload=variant.get('payload'),
metadata=variant.get('metadata')
)
if self.assignment_service is not None:
self.assignment_service.track(Assignment(user, variants))
return variants

@deprecated("Use evaluate_v2")
def evaluate(self, user: User, flag_keys: List[str] = None) -> Dict[str, Variant]:
"""
Locally evaluates flag variants for a user.

This function will only evaluate flags for the keys specified in the flag_keys argument. If flag_keys is
missing, all flags are evaluated.

Parameters:
user (User): The user to evaluate
flag_keys (List[str]): The flags to evaluate with the user. If empty, all flags are evaluated.

Returns:
The evaluated variants.
"""
flag_keys = set(flag_keys) if flag_keys is not None else None
variants = self.evaluate_v2(user, flag_keys)
return self.__filter_default_variants(variants)

def __do_flags(self):
conn = self._connection_pool.acquire()
headers = {
Expand All @@ -98,14 +130,16 @@ def __do_flags(self):
body = None
self.logger.debug('[Experiment] Get flag configs')
try:
response = conn.request('GET', '/sdk/v1/flags', body, headers)
response = conn.request('GET', '/sdk/v2/flags?v=0', body, headers)
response_body = response.read().decode("utf8")
if response.status != 200:
raise Exception(
f"[Experiment] Get flagConfigs - received error response: ${response.status}: ${response_body}")
self.logger.debug(f"[Experiment] Got flag configs: {response_body}")
flags = json.loads(response_body)
flags_dict = {flag['key']: flag for flag in flags}
self.logger.debug(f"[Experiment] Got flag configs: {flags}")
self.lock.acquire()
self.flags = response_body
self.flags = flags_dict
self.lock.release()
finally:
self._connection_pool.release(conn)
Expand All @@ -128,3 +162,15 @@ def __enter__(self) -> 'LocalEvaluationClient':

def __exit__(self, *exit_info: Any) -> None:
self.stop()

@staticmethod
def __filter_default_variants(variants: Dict[str, Variant]) -> Dict[str, Variant]:
def is_default_variant(variant: Variant) -> bool:
default = False if variant.metadata.get('default') is None else variant.metadata.get('default')
deployed = True if variant.metadata.get('deployed') is None else variant.metadata.get('deployed')
return default or not deployed

return {key: variant for key, variant in variants.items() if not is_default_variant(variant)}



7 changes: 4 additions & 3 deletions src/amplitude_experiment/local/evaluation/evaluation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from .libevaluation_interop import libevaluation_interop_symbols
from ctypes import cast, c_char_p

def evaluate(rules: str, user: str) -> str:

def evaluate(rules: str, context: str) -> str:
"""
Local evaluation wrapper.
Parameters:
rules (str): rules JSON string
user (str): user JSON string
context (str): context JSON string

Returns:
Evaluation results with variants in JSON
"""
result = libevaluation_interop_symbols().contents.kotlin.root.evaluate(rules, user)
result = libevaluation_interop_symbols().contents.kotlin.root.evaluate(rules, context)
py_result = cast(result, c_char_p).value
libevaluation_interop_symbols().contents.DisposeString(result)
return str(py_result, 'utf-8')
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ typedef struct {
/* User functions. */
struct {
struct {
const char* (*evaluate)(const char* rules, const char* user);
const char* (*evaluate)(const char* flags, const char* context);
} root;
} kotlin;
} libevaluation_interop_ExportedSymbols;
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ typedef struct {
/* User functions. */
struct {
struct {
const char* (*evaluate)(const char* rules, const char* user);
const char* (*evaluate)(const char* flags, const char* context);
} root;
} kotlin;
} libevaluation_interop_ExportedSymbols;
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ typedef struct {
/* User functions. */
struct {
struct {
const char* (*evaluate)(const char* rules, const char* user);
const char* (*evaluate)(const char* flags, const char* context);
} root;
} kotlin;
} libevaluation_interop_ExportedSymbols;
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ typedef struct {
/* User functions. */
struct {
struct {
const char* (*evaluate)(const char* rules, const char* user);
const char* (*evaluate)(const char* flags, const char* context);
} root;
} kotlin;
} libevaluation_interop_ExportedSymbols;
Expand Down
57 changes: 57 additions & 0 deletions src/amplitude_experiment/local/topological_sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Dict, Set, Any, List, Optional


class CycleException(Exception):
"""
Raised when topological sorting encounters a cycle between flag dependencies.
"""

def __init__(self, path: Set[str]):
self.path = path

def __str__(self):
return f"Detected a cycle between flags {self.path}"


def topological_sort(
flags: Dict[str, Dict[str, Any]],
keys: List[str] = None,
ordered: bool = False
) -> List[Dict[str, Any]]:
available = flags.copy()
result = []
starting_keys = keys if keys is not None and len(keys) > 0 else list(flags.keys())
# Used for testing to ensure consistency.
if ordered and (keys is None or len(keys) == 0):
starting_keys.sort()
for flag_key in starting_keys:
traversal = __parent_traversal(flag_key, available, set())
if traversal is not None:
result.extend(traversal)
return result


def __parent_traversal(
flag_key: str,
available: Dict[str, Dict[str, Any]],
path: Set[str]
) -> Optional[List[Dict[str, Any]]]:
flag = available.get(flag_key)
if flag is None:
return None
dependencies = flag.get('dependencies')
if dependencies is None or len(dependencies) == 0:
available.pop(flag_key)
return [flag]
path.add(flag_key)
result = []
for parent_key in dependencies:
if parent_key in path:
raise CycleException(path)
traversal = __parent_traversal(parent_key, available, path)
if traversal is not None:
result.extend(traversal)
result.append(flag)
path.remove(flag_key)
available.pop(flag_key)
return result
Loading