Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 54 additions & 23 deletions ddtrace/internal/openfeature/_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections import OrderedDict
from collections.abc import MutableMapping
from importlib.metadata import version
import threading
import typing

from openfeature.evaluation_context import EvaluationContext
Expand Down Expand Up @@ -87,11 +88,20 @@ class DataDogProvider(AbstractProvider):
Feature Flags and Experimentation (FFE) product.
"""

def __init__(self, *args: typing.Any, **kwargs: typing.Any):
def __init__(self, *args: typing.Any, initialization_timeout: typing.Optional[float] = None, **kwargs: typing.Any):
super().__init__(*args, **kwargs)
self._metadata = Metadata(name="Datadog")
self._status = ProviderStatus.NOT_READY
self._config_received = False

# Initialization timeout: constructor arg takes priority, then env var (default 30s)
if initialization_timeout is not None:
self._initialization_timeout = initialization_timeout
else:
self._initialization_timeout = ffe_config.initialization_timeout_ms / 1000.0

# Event used to block initialize() until config arrives.
# Also serves as the "config received" flag via is_set().
self._config_received = threading.Event()

# Cache for reported exposures to prevent duplicates
# Stores mapping of (flag_key, subject_id) -> (allocation_key, variant_key)
Expand All @@ -108,9 +118,6 @@ def __init__(self, *args: typing.Any, **kwargs: typing.Any):
"please set DD_EXPERIMENTAL_FLAGGING_PROVIDER_ENABLED=true to enable it",
)

# Register this provider instance for status updates
_register_provider(self)

def get_metadata(self) -> Metadata:
"""Returns provider metadata."""
return self._metadata
Expand All @@ -119,32 +126,52 @@ def initialize(self, evaluation_context: EvaluationContext) -> None:
"""
Initialize the provider.

Called by the OpenFeature SDK when the provider is set.
Provider Creation → NOT_READY
First Remote Config Payload
READY (emits PROVIDER_READY event)
Shutdown
NOT_READY
Blocks until Remote Config delivers the first FFE configuration or
the initialization timeout expires.

The timeout is configurable via:
- Constructor: DataDogProvider(initialization_timeout=10.0) # seconds
- Env var: DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS=10000

Provider lifecycle:
NOT_READY -> initialize() blocks -> config arrives -> READY
NOT_READY -> initialize() blocks -> timeout -> raises ProviderNotReadyError
"""
if not self._enabled:
return

# Register for RC config callbacks (in initialize, not __init__, so
# re-initialization after shutdown re-registers the provider)
_register_provider(self)

try:
# Start the exposure writer for reporting
start_exposure_writer()
except ServiceStatusError:
logger.debug("Exposure writer is already running", exc_info=True)

# If configuration was already received before initialization, emit ready now
# Fast path: config already available (RC delivered before set_provider)
config = _get_ffe_config()
if config is not None and not self._config_received:
self._config_received = True
if config is not None:
logger.debug("FFE configuration already available, provider is READY")
self._config_received.set()
self._status = ProviderStatus.READY
self._emit_ready_event()
return # SDK will dispatch PROVIDER_READY

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# Block until config arrives or timeout expires
logger.debug(
"Waiting up to %.1fs for initial FFE configuration from Remote Config", self._initialization_timeout
)
if not self._config_received.wait(timeout=self._initialization_timeout):
# Timeout expired without receiving config
from openfeature.exception import ProviderNotReadyError

raise ProviderNotReadyError(
f"Provider timed out after {self._initialization_timeout:.1f}s waiting for "
"initial configuration from Remote Config"
)

# Config received during wait -- on_configuration_received() already set status

def shutdown(self) -> None:
"""
Expand All @@ -167,7 +194,7 @@ def shutdown(self) -> None:
# Unregister provider
_unregister_provider(self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside(not caused by this PR): registering provider in __init__ and unregistering it in shutdown may cause an issue. If the provider is reused after shutdown, it's not going to be re-registered. We should move registration to initialize

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to this. The provider shouldn't be managing its own registration, right?

self._status = ProviderStatus.NOT_READY
self._config_received = False
self._config_received.clear()

def resolve_boolean_details(
self,
Expand Down Expand Up @@ -423,14 +450,18 @@ def on_configuration_received(self) -> None:
"""
Called when a Remote Configuration payload is received and processed.

Emits PROVIDER_READY event on first configuration.
Updates status first, then signals the event to unblock initialize().
Emits PROVIDER_READY for late arrivals (config received after initialize() timed out).
"""
if not self._config_received:
self._config_received = True
if not self._config_received.is_set():
self._status = ProviderStatus.READY
logger.debug("First FFE configuration received, provider is now READY")
# Emit READY for late recovery: config arrived after init timed out
self._emit_ready_event()

# Signal the event last to unblock initialize() after status is updated
self._config_received.set()

def _emit_ready_event(self) -> None:
"""
Safely emit PROVIDER_READY event.
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/internal/settings/openfeature.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,20 @@ class OpenFeatureConfig(DDConfig):
default=1.0,
)

# Provider initialization timeout in milliseconds.
# Controls how long initialize() blocks waiting for the first Remote Config payload.
# Default is 30000ms (30 seconds), matching Java, Go, and Node.js SDKs.
initialization_timeout_ms = DDConfig.var(
int,
"DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the EXPERIMENTAL here? I know we had it for the flag that turns the flagging feature on and off (since the feature was experimental), but not sure we need to prepend all the params thusly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed per-se but all other env vars across the SDK fleet are still prefix with EXPERIMENTAL so I'm going to keep it consistent and them remove them all at once.

default=30000,
)

_openfeature_config_keys = [
"experimental_flagging_provider_enabled",
"ffe_intake_enabled",
"ffe_intake_heartbeat_interval",
"initialization_timeout_ms",
]


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
fixes:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: exiting before initialization is complete was a bug. But timeout (initialization_timeout_ms and init option) are technically a new feature, so maybe we should have both fix and feature entries?

- |
openfeature: This fix resolves an issue where ``DataDogProvider.initialize()`` returned immediately
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should simplify this release note a ton and focus on customer facing wording, feels like too much internal implementation details.

openfeature: This fix resolves an issue where Flag evaluations would return default values until up to 30s after initialize() calls completed.

obviously no clue if my summary is correct/accurate at all, but things like... does mentioning "remote config" matter much to understand the bug/impact? same for PROVIDER_READY? etc?

without waiting for Remote Configuration data, causing the OpenFeature SDK to emit ``PROVIDER_READY``
before flag configuration was available. Flag evaluations in this window silently returned default
values. The provider now blocks in ``initialize()`` until the first configuration arrives or a
configurable timeout expires (default 30s), matching the behavior of the Java, Go, and Node.js
providers. The timeout is configurable via the ``DD_EXPERIMENTAL_FLAGGING_PROVIDER_INITIALIZATION_TIMEOUT_MS``
environment variable or the ``init_timeout`` constructor parameter.
111 changes: 106 additions & 5 deletions tests/openfeature/test_provider_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
- NOT_READY by default
- READY when first Remote Config payload is received
- Event emission on status change
- Blocking initialization until config arrives or timeout
"""

import threading
import time

from openfeature import api
from openfeature.provider import ProviderStatus
import pytest
Expand Down Expand Up @@ -43,7 +47,7 @@ def test_provider_starts_not_ready(self):
provider = DataDogProvider()

assert provider._status == ProviderStatus.NOT_READY
assert provider._config_received is False
assert not provider._config_received.is_set()

def test_provider_becomes_ready_after_first_config(self):
"""Test that provider becomes READY after receiving first configuration."""
Expand All @@ -61,7 +65,7 @@ def test_provider_becomes_ready_after_first_config(self):

# Verify becomes READY
assert provider._status == ProviderStatus.READY
assert provider._config_received is True
assert provider._config_received.is_set()
finally:
api.clear_providers()

Expand All @@ -73,14 +77,14 @@ def test_provider_ready_event_emitted(self):

try:
# Provider should not have received config yet
assert not provider._config_received
assert not provider._config_received.is_set()

# Process a configuration
config = create_config(create_boolean_flag("test-flag", enabled=True))
process_ffe_configuration(config)

# Provider should now have received config and be READY
assert provider._config_received
assert provider._config_received.is_set()
assert provider._status == ProviderStatus.READY
finally:
api.clear_providers()
Expand Down Expand Up @@ -140,7 +144,7 @@ def test_provider_status_after_shutdown(self):

# Verify back to NOT_READY
assert provider._status == ProviderStatus.NOT_READY
assert provider._config_received is False
assert not provider._config_received.is_set()
finally:
api.clear_providers()

Expand Down Expand Up @@ -194,3 +198,100 @@ def on_provider_ready(event_details):
finally:
api.remove_handler(ProviderEvent.PROVIDER_READY, on_provider_ready)
api.clear_providers()


class TestProviderInitializationBlocking:
"""Test that initialize() blocks until config arrives or timeout expires."""

def test_initialize_blocks_until_config_arrives(self):
"""initialize() should block and return once config is delivered mid-wait."""
with override_global_config({"experimental_flagging_provider_enabled": True}):
provider = DataDogProvider(initialization_timeout=5.0)

# Deliver config from a background thread after 0.5s
def deliver_config():
time.sleep(0.5)
config = create_config(create_boolean_flag("test-flag", enabled=True))
process_ffe_configuration(config)

timer = threading.Thread(target=deliver_config, daemon=True)
timer.start()

try:
start = time.monotonic()
api.set_provider(provider)
elapsed = time.monotonic() - start

# Should have blocked for ~0.5s (not instant, not full timeout)
assert elapsed >= 0.3, f"initialize() returned too fast ({elapsed:.2f}s)"
assert elapsed < 4.0, f"initialize() took too long ({elapsed:.2f}s), should have unblocked at ~0.5s"
assert provider._status == ProviderStatus.READY
assert provider._config_received.is_set()
finally:
api.clear_providers()

def test_initialize_fast_path_when_config_exists(self):
"""initialize() should return immediately if config already exists."""
with override_global_config({"experimental_flagging_provider_enabled": True}):
# Deliver config BEFORE creating provider
config = create_config(create_boolean_flag("test-flag", enabled=True))
process_ffe_configuration(config)

provider = DataDogProvider(initialization_timeout=5.0)

try:
start = time.monotonic()
api.set_provider(provider)
elapsed = time.monotonic() - start

# Should be near-instant (config already available)
assert elapsed < 1.0, f"initialize() took {elapsed:.2f}s, should be instant with pre-loaded config"
assert provider._status == ProviderStatus.READY
finally:
api.clear_providers()

def test_initialize_timeout_raises(self):
"""initialize() should raise ProviderNotReadyError after timeout expires."""
from openfeature.exception import ProviderNotReadyError

with override_global_config({"experimental_flagging_provider_enabled": True}):
provider = DataDogProvider(initialization_timeout=0.5)

try:
start = time.monotonic()
# set_provider catches the exception and dispatches PROVIDER_ERROR
api.set_provider(provider)
elapsed = time.monotonic() - start

# Should have blocked for ~0.5s (the timeout)
assert elapsed >= 0.3, f"initialize() returned too fast ({elapsed:.2f}s)"
assert elapsed < 2.0, f"initialize() took too long ({elapsed:.2f}s)"

# Provider should be in ERROR state (SDK caught ProviderNotReadyError)
client = api.get_client()
assert client.get_provider_status() == ProviderStatus.ERROR
finally:
api.clear_providers()

def test_late_recovery_after_timeout(self):
"""Config arriving after timeout should transition provider to READY."""
with override_global_config({"experimental_flagging_provider_enabled": True}):
provider = DataDogProvider(initialization_timeout=0.5)

try:
# Let it timeout
api.set_provider(provider)

# Provider should be in ERROR state
client = api.get_client()
assert client.get_provider_status() == ProviderStatus.ERROR

# Now deliver config (late recovery)
config = create_config(create_boolean_flag("test-flag", enabled=True))
process_ffe_configuration(config)

# Provider should recover to READY
assert provider._status == ProviderStatus.READY
assert provider._config_received.is_set()
finally:
api.clear_providers()
Loading