Skip to content

Commit

Permalink
update implementations and tests for local eval v2
Browse files Browse the repository at this point in the history
  • Loading branch information
tyiuhc committed Jul 23, 2024
1 parent a762a3f commit df56478
Show file tree
Hide file tree
Showing 16 changed files with 135 additions and 138 deletions.
4 changes: 1 addition & 3 deletions lib/experiment/cohort/cohort_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def get_cohorts_for_group(group_type, group_name, cohort_ids)
group_type_cohorts = @group_to_cohort_store[group_type] || Set.new
group_type_cohorts.each do |cohort_id|
members = @cohort_store[cohort_id]&.member_ids || Set.new
if cohort_ids.include?(cohort_id) && members.include?(group_name)
result.add(cohort_id)
end
result.add(cohort_id) if cohort_ids.include?(cohort_id) && members.include?(group_name)
end
end
result
Expand Down
2 changes: 1 addition & 1 deletion lib/experiment/cohort/cohort_sync_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class CohortSyncConfig

attr_accessor :api_key, :secret_key, :max_cohort_size, :cohort_request_delay_millis, :cohort_server_url

def initialize(api_key:, secret_key:, max_cohort_size: 2_147_483_647, cohort_request_delay_millis: 5000, cohort_server_url: DEFAULT_COHORT_SYNC_URL)
def initialize(api_key, secret_key, max_cohort_size: 2_147_483_647, cohort_request_delay_millis: 5000, cohort_server_url: DEFAULT_COHORT_SYNC_URL)
@api_key = api_key
@secret_key = secret_key
@max_cohort_size = max_cohort_size
Expand Down
3 changes: 2 additions & 1 deletion lib/experiment/deployment/deployment_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def periodic_flag_update
end

def update_flag_configs
flag_configs = @flag_config_fetcher.fetch_v2
flags = @flag_config_fetcher.fetch_v2
flag_configs = flags.each_with_object({}) { |flag, hash| hash[flag['key']] = flag }
flag_keys = flag_configs.values.map { |flag| flag['key'] }.to_set
@flag_config_storage.remove_if { |f| !flag_keys.include?(f['key']) }

Expand Down
2 changes: 1 addition & 1 deletion lib/experiment/flag/flag_config_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def fetch_v2
raise "flagConfigs - received error response: #{response.code}: #{response.body}" unless response.is_a?(Net::HTTPOK)

@logger.debug("[Experiment] Fetch flag configs: #{response.body}")
parse(response.body)
JSON.parse(response.body)
end

# Fetch local evaluation mode flag configs from the Experiment API server.
Expand Down
2 changes: 1 addition & 1 deletion lib/experiment/flag/flag_config_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def put_flag_config(flag_config)

def remove_if
@flag_configs_lock.synchronize do
@flag_configs.delete_if { |key, value| yield(value) }
@flag_configs.delete_if { |_key, value| yield(value) }
end
end
end
Expand Down
58 changes: 18 additions & 40 deletions lib/experiment/local/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def initialize(api_key, config = nil)
@flag_config_storage = InMemoryFlagConfigStorage.new
@flag_config_fetcher = LocalEvaluationFetcher.new(@api_key, @logger, @config.server_url)
@cohort_loader = nil
if @config.cohort_sync_config != nil
unless @config.cohort_sync_config.nil?
@cohort_download_api = DirectCohortDownloadApi.new(@config.cohort_sync_config.api_key,
@config.cohort_sync_config.secret_key,
@config.cohort_sync_config.max_cohort_size,
Expand All @@ -42,7 +42,6 @@ def initialize(api_key, config = nil)
@cohort_loader = CohortLoader.new(@cohort_download_api, @cohort_storage)
end
@deployment_runner = DeploymentRunner.new(@config, @flag_config_fetcher, @flag_config_storage, @cohort_storage, @logger, @cohort_loader)

end

# Locally evaluates flag variants for a user.
Expand All @@ -66,19 +65,17 @@ def evaluate(user, flag_keys = [])
# @param [String[]] flag_keys The flags to evaluate with the user, if empty all flags are evaluated
# @return [Hash[String, Variant]] The evaluated variants
def evaluate_v2(user, flag_keys = [])
flags = @flags_mutex.synchronize do
@flags
end
flags = @flag_config_storage.flag_configs
return {} if flags.nil?

sorted_flags = AmplitudeExperiment.topological_sort(flags, flag_keys.to_set)
flags_json = sorted_flags.to_json

enriched_user = AmplitudeExperiment.user_to_evaluation_context(user)
user_str = enriched_user.to_json
context = AmplitudeExperiment.user_to_evaluation_context(enrich_user(user, flags))
context_json = context.to_json

@logger.debug("[Experiment] Evaluate: User: #{user_str} - Rules: #{flags}") if @config.debug
result = evaluation(flags, user_str)
@logger.debug("[Experiment] Evaluate: User: #{context_json} - Rules: #{flags}") if @config.debug
result = evaluation(flags_json, context_json)
@logger.debug("[Experiment] evaluate - result: #{result}") if @config.debug
variants = AmplitudeExperiment.evaluation_variants_json_to_variants(result)
@assignment_service&.track(Assignment.new(user, variants))
Expand All @@ -96,52 +93,33 @@ def start

# Stop polling for flag configurations. Close resource like connection pool with client
def stop
@poller_thread&.exit
@is_running = false
@poller_thread = nil
@deployment_runner.stop
end

private

def run
@is_running = true
begin
flags = @fetcher.fetch_v2
flags_obj = JSON.parse(flags)
flags_map = flags_obj.each_with_object({}) { |flag, hash| hash[flag['key']] = flag }
@flags_mutex.synchronize do
@flags = flags_map
end
end
end

def enrich_user(user, flag_configs)
v = flag_configs.values
grouped_cohort_ids = AmplitudeExperiment.get_grouped_cohort_ids_from_flags(flag_configs)

if grouped_cohort_ids.key?(USER_GROUP_TYPE)
user_cohort_ids = grouped_cohort_ids[USER_GROUP_TYPE]
if user_cohort_ids && user.user_id
user.cohort_ids = @cohort_storage.get_cohorts_for_user(user.user_id, user_cohort_ids)
end
user.cohort_ids = Array(@cohort_storage.get_cohorts_for_user(user.user_id, user_cohort_ids)) if user_cohort_ids && user.user_id
end

if user.groups
user.groups.each do |group_type, group_names|
group_name = group_names.first if group_names
next unless group_name
user.groups&.each do |group_type, group_names|
group_name = group_names.first if group_names
next unless group_name

cohort_ids = grouped_cohort_ids[group_type] || []
next if cohort_ids.empty?
cohort_ids = grouped_cohort_ids[group_type] || []
next if cohort_ids.empty?

user.add_group_cohort_ids(
group_type,
group_name,
@cohort_storage.get_cohorts_for_group(group_type, group_name, cohort_ids)
)
end
user.add_group_cohort_ids(
group_type,
group_name,
Array(@cohort_storage.get_cohorts_for_group(group_type, group_name, cohort_ids))
)
end

user
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/experiment/local/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class LocalEvaluationConfig
# @param [long] flag_config_polling_interval_millis The value of flag config polling interval in million seconds.
def initialize(server_url: DEFAULT_SERVER_URL, bootstrap: {},
flag_config_polling_interval_millis: 30_000, debug: false, assignment_config: nil,
cohort_sync_config: nil)
cohort_sync_config: nil)
@debug = debug || false
@server_url = server_url
@bootstrap = bootstrap
Expand Down
60 changes: 40 additions & 20 deletions lib/experiment/user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ class User
# @return [Hash, nil] the value of group properties
attr_accessor :group_properties

# Cohort IDs for the user
# @return [Hash, nil] the value of cohort_ids
attr_accessor :cohort_ids

# Cohort IDs for the user's groups
# @return [Hash, nil] the value of group_cohort_ids
attr_accessor :group_cohort_ids

# @param [String, nil] device_id Device ID for associating with an identity in Amplitude
# @param [String, nil] user_id User ID for associating with an identity in Amplitude
# @param [String, nil] country Predefined field, must be manually provided
Expand All @@ -101,7 +109,8 @@ class User
# @param [Hash, nil] group_properties Custom properties for groups
def initialize(device_id: nil, user_id: nil, country: nil, city: nil, region: nil, dma: nil, ip_address: nil, language: nil,
platform: nil, version: nil, os: nil, device_manufacturer: nil, device_brand: nil,
device_model: nil, carrier: nil, library: nil, user_properties: nil, groups: nil, group_properties: nil)
device_model: nil, carrier: nil, library: nil, user_properties: nil, groups: nil, group_properties: nil,
cohort_ids: nil, group_cohort_ids: nil)
@device_id = device_id
@user_id = user_id
@country = country
Expand All @@ -121,31 +130,35 @@ def initialize(device_id: nil, user_id: nil, country: nil, city: nil, region: ni
@user_properties = user_properties
@groups = groups
@group_properties = group_properties
@cohort_ids = cohort_ids
@group_cohort_ids = group_cohort_ids
end

# Return User as Hash.
# @return [Hash] Hash object with user values
def as_json(_options = {})
{
device_id: @device_id,
user_id: @user_id,
country: @country,
city: @city,
region: @region,
dma: @dma,
ip_address: @ip_address,
language: @language,
platform: @platform,
version: @version,
os: @os,
device_manufacturer: @device_manufacturer,
device_brand: @device_brand,
device_model: @device_model,
carrier: @carrier,
library: @library,
user_properties: @user_properties,
groups: @groups,
group_properties: @group_properties
'device_id' => @device_id,
'user_id' => @user_id,
'country' => @country,
'city' => @city,
'region' => @region,
'dma' => @dma,
'ip_address' => @ip_address,
'language' => @language,
'platform' => @platform,
'version' => @version,
'os' => @os,
'device_manufacturer' => @device_manufacturer,
'device_brand' => @device_brand,
'device_model' => @device_model,
'carrier' => @carrier,
'library' => @library,
'user_properties' => @user_properties,
'groups' => @groups,
'group_properties' => @group_properties,
'cohort_ids' => @cohort_ids,
'group_cohort_ids' => @group_cohort_ids
}.compact
end

Expand All @@ -154,5 +167,12 @@ def as_json(_options = {})
def to_json(*options)
as_json(*options).to_json(*options)
end

def add_group_cohort_ids(group_type, group_name, cohort_ids)
@group_cohort_ids ||= {}

group_names = @group_cohort_ids[group_type] ||= {}
group_names[group_name] = cohort_ids
end
end
end
2 changes: 1 addition & 1 deletion lib/experiment/util/flag_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def self.get_grouped_cohort_condition_ids(segment)
conditions = segment['conditions'] || []
conditions.each do |condition|
condition = condition[0]
next unless cohort_filter?(condition) && (condition['selector'].length > 2)
next unless cohort_filter?(condition) && (condition['selector'][1].length > 2)

context_subtype = condition['selector'][1]
group_type =
Expand Down
32 changes: 20 additions & 12 deletions lib/experiment/util/user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,40 @@ module AmplitudeExperiment
def self.user_to_evaluation_context(user)
user_groups = user.groups
user_group_properties = user.group_properties
user_group_cohort_ids = user.group_cohort_ids
user_hash = user.as_json.compact
user_hash.delete(:groups)
user_hash.delete(:group_properties)
user_hash.delete('groups')
user_hash.delete('group_properties')
user_hash.delete('group_cohort_ids')

context = user_hash.empty? ? {} : { user: user_hash }
context = user_hash.empty? ? {} : { 'user' => user_hash }

return context if user_groups.nil?

groups = {}
user_groups.each do |group_type, group_name|
group_name = group_name[0] if group_name.is_a?(Array) && !group_name.empty?

groups[group_type.to_sym] = { group_name: group_name }
groups[group_type] = { 'group_name' => group_name }

next if user_group_properties.nil?
if user_group_properties
group_properties_type = user_group_properties[group_type]
if group_properties_type.is_a?(Hash)
group_properties_name = group_properties_type[group_name]
groups[group_type]['group_properties'] = group_properties_name if group_properties_name.is_a?(Hash)
end
end

group_properties_type = user_group_properties[group_type.to_sym]
next if group_properties_type.nil? || !group_properties_type.is_a?(Hash)
next unless user_group_cohort_ids

group_properties_name = group_properties_type[group_name.to_sym]
next if group_properties_name.nil? || !group_properties_name.is_a?(Hash)

groups[group_type.to_sym][:group_properties] = group_properties_name
group_cohort_ids_type = user_group_cohort_ids[group_type]
if group_cohort_ids_type.is_a?(Hash)
group_cohort_ids_name = group_cohort_ids_type[group_name]
groups[group_type]['cohort_ids'] = group_cohort_ids_name if group_cohort_ids_name.is_a?(Array)
end
end

context[:groups] = groups unless groups.empty?
context['groups'] = groups unless groups.empty?
context
end
end
2 changes: 0 additions & 2 deletions spec/experiment/cohort/cohort_download_api_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ def cohort_to_h(cohort)
expect(result_cohort.group_type).to eq(cohort.group_type)
end


it 'retries on 429s for group cohort request' do
group_name = 'org name'
cohort = Cohort.new(cohort_id, 0, 1, ['group'], group_name)
Expand Down Expand Up @@ -215,7 +214,6 @@ def cohort_to_h(cohort)
expect { api.get_cohort(cohort_id, cohort) }.to raise_error(CohortTooLargeError)
end


it 'raises CohortNotModifiedError for cohort not modified' do
last_modified = 1000
cohort = Cohort.new(cohort_id, last_modified, 1, [])
Expand Down
1 change: 0 additions & 1 deletion spec/experiment/cohort/cohort_loader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ module AmplitudeExperiment

expect(@storage.get_cohorts_for_user('1', Set.new(%w[a b c]))).to eq(Set.new(%w[a c]))
end

end
end
end
12 changes: 6 additions & 6 deletions spec/experiment/deployment/deployment_runner_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'rspec'
module AmplitudeExperiment
COHORT_ID = '1234'
COHORT_ID = '1234'.freeze

describe DeploymentRunner do
before(:each) do
Expand All @@ -14,7 +14,7 @@ module AmplitudeExperiment
{
'selector' => %w[context user cohort_ids],
'op' => 'set contains any',
'values' => [COHORT_ID],
'values' => [COHORT_ID]
}
]
]
Expand All @@ -30,7 +30,7 @@ module AmplitudeExperiment
flag_config_storage = double('FlagConfigStorage')
cohort_storage = double('CohortStorage')
cohort_loader = CohortLoader.new(cohort_download_api, cohort_storage)
logger = Logger.new(STDOUT)
logger = Logger.new($stdout)
runner = DeploymentRunner.new(
LocalEvaluationConfig.new,
flag_fetcher,
Expand All @@ -40,7 +40,7 @@ module AmplitudeExperiment
cohort_loader
)

allow(flag_fetcher).to receive(:fetch_v1).and_raise(RuntimeError, 'test')
allow(flag_fetcher).to receive(:fetch_v2).and_raise(RuntimeError, 'test')

expect { runner.start }.to raise_error(RuntimeError, 'test')
end
Expand All @@ -51,7 +51,7 @@ module AmplitudeExperiment
flag_config_storage = double('FlagConfigStorage')
cohort_storage = double('CohortStorage')
cohort_loader = CohortLoader.new(cohort_download_api, cohort_storage)
logger = Logger.new(STDOUT)
logger = Logger.new($stdout)
runner = DeploymentRunner.new(
LocalEvaluationConfig.new,
flag_fetcher,
Expand All @@ -61,7 +61,7 @@ module AmplitudeExperiment
cohort_loader
)

allow(flag_fetcher).to receive(:fetch_v1).and_return([@flag])
allow(flag_fetcher).to receive(:fetch_v2).and_return([@flag])
allow(flag_config_storage).to receive(:remove_if).and_return(nil)
allow(flag_config_storage).to receive(:flag_configs).and_return({})
allow(cohort_storage).to receive(:cohort_ids).and_return(Set.new)
Expand Down
Loading

0 comments on commit df56478

Please sign in to comment.