Skip to content

Commit

Permalink
subscription: making exception free
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk committed Jun 4, 2024
1 parent a094d28 commit fed356e
Show file tree
Hide file tree
Showing 18 changed files with 101 additions and 65 deletions.
5 changes: 3 additions & 2 deletions envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ class SubscriptionFactory {
* CollectionSubscription object.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
*
* @return SubscriptionPtr subscription object corresponding for collection_locator.
* @return SubscriptionPtr subscription object corresponding for collection_locator or error
* status.
*/
virtual SubscriptionPtr
virtual absl::StatusOr<SubscriptionPtr>
collectionSubscriptionFromUrl(const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
Expand Down
3 changes: 2 additions & 1 deletion envoy/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,9 @@ class Loader {
* the constructor is finished, with the exception of dynamic RTDS layers,
* which require ClusterManager.
* @param cm cluster manager reference.
* @return a status indicating if initialization was successful.
*/
virtual void initialize(Upstream::ClusterManager& cm) PURE;
virtual absl::Status initialize(Upstream::ClusterManager& cm) PURE;

/**
* @return const Snapshot& the current snapshot. This reference is safe to use for the duration of
Expand Down
39 changes: 24 additions & 15 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ absl::StatusOr<SubscriptionPtr> SubscriptionFactoryImpl::subscriptionFromConfigS
return factory->create(data);
}

SubscriptionPtr createFromFactoryOrThrow(ConfigSubscriptionFactory::SubscriptionData& data,
absl::string_view subscription_type) {
absl::StatusOr<SubscriptionPtr> createFromFactory(ConfigSubscriptionFactory::SubscriptionData& data,
absl::string_view subscription_type) {
ConfigSubscriptionFactory* factory =
Registry::FactoryRegistry<ConfigSubscriptionFactory>::getFactory(subscription_type);
if (factory == nullptr) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"Didn't find a registered config subscription factory implementation for name: '{}'",
subscription_type));
}
return factory->create(data);
}

SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
absl::StatusOr<SubscriptionPtr> SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config, absl::string_view resource_type,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
Expand All @@ -148,21 +148,23 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
switch (collection_locator.scheme()) {
case xds::core::v3::ResourceLocator::FILE: {
const std::string path = Http::Utility::localPathFromFilePath(collection_locator.id());
THROW_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(path, api_));
RETURN_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(path, api_));
factory_config.set_path(path);
return createFromFactoryOrThrow(data, "envoy.config_subscription.filesystem_collection");
auto ptr_or_error = createFromFactory(data, "envoy.config_subscription.filesystem_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
case xds::core::v3::ResourceLocator::XDSTP: {
if (resource_type != collection_locator.resource_type()) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("xdstp:// type does not match {} in {}", resource_type,
Config::XdsResourceIdentifier::encodeUrl(collection_locator)));
}
switch (config.config_source_specifier_case()) {
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kApiConfigSource: {
const envoy::config::core::v3::ApiConfigSource& api_config_source =
config.api_config_source();
THROW_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(
RETURN_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(
cm_.primaryClusters(), api_config_source));
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
Expand All @@ -171,16 +173,21 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
std::string type_url = TypeUtil::descriptorFullNameToTypeUrl(resource_type);
data.type_url_ = type_url;
return createFromFactoryOrThrow(data, "envoy.config_subscription.delta_grpc_collection");
auto ptr_or_error =
createFromFactory(data, "envoy.config_subscription.delta_grpc_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_GRPC:
FALLTHRU;
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC: {
return createFromFactoryOrThrow(data,
"envoy.config_subscription.aggregated_grpc_collection");
auto ptr_or_error =
createFromFactory(data, "envoy.config_subscription.aggregated_grpc_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
default:
throwEnvoyExceptionOrPanic(fmt::format("Unknown xdstp:// transport API type in {}",
return absl::InvalidArgumentError(fmt::format("Unknown xdstp:// transport API type in {}",
api_config_source.DebugString()));
}
}
Expand All @@ -189,18 +196,20 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
options.add_xdstp_node_context_params_ = true;
return createFromFactoryOrThrow(data, "envoy.config_subscription.ads_collection");
auto ptr_or_error = createFromFactory(data, "envoy.config_subscription.ads_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
default:
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
"Missing or not supported config source specifier in "
"envoy::config::core::v3::ConfigSource for a collection. Only ADS and "
"gRPC in delta-xDS mode are supported.");
}
}
default:
// TODO(htuch): Implement HTTP semantics for collection ResourceLocators.
throwEnvoyExceptionOrPanic("Unsupported code path");
return absl::InvalidArgumentError("Unsupported code path");
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/config/subscription_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SubscriptionFactoryImpl : public SubscriptionFactory, Logger::Loggable<Log
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) override;
SubscriptionPtr
absl::StatusOr<SubscriptionPtr>
collectionSubscriptionFromUrl(const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config,
absl::string_view resource_type, Stats::Scope& scope,
Expand Down
6 changes: 4 additions & 2 deletions source/common/listener_manager/lds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ LdsApiImpl::LdsApiImpl(const envoy::config::core::v3::ConfigSource& lds_config,
*scope_, *this, resource_decoder_, {}),
Config::SubscriptionPtr);
} else {
subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl(
*lds_resources_locator, lds_config, resource_name, *scope_, *this, resource_decoder_);
subscription_ = THROW_OR_RETURN_VALUE(
cm.subscriptionFactory().collectionSubscriptionFromUrl(
*lds_resources_locator, lds_config, resource_name, *scope_, *this, resource_decoder_),
Config::SubscriptionPtr);
}
init_manager.add(init_target_);
}
Expand Down
5 changes: 3 additions & 2 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ ScopedRdsConfigSubscription::ScopedRdsConfigSubscription(
const auto srds_resources_locator = THROW_OR_RETURN_VALUE(
Envoy::Config::XdsResourceIdentifier::decodeUrl(scoped_rds.srds_resources_locator()),
xds::core::v3::ResourceLocator);
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().collectionSubscriptionFromUrl(
srds_resources_locator, scoped_rds.scoped_rds_config_source(), resource_name, *scope_,
*this, resource_decoder_);
*this, resource_decoder_),
Envoy::Config::SubscriptionPtr);
}

// TODO(tony612): consider not using the callback here.
Expand Down
14 changes: 9 additions & 5 deletions source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,13 @@ absl::Status LoaderImpl::initLayers(Event::Dispatcher& dispatcher,
return loadNewSnapshot();
}

void LoaderImpl::initialize(Upstream::ClusterManager& cm) {
absl::Status LoaderImpl::initialize(Upstream::ClusterManager& cm) {
cm_ = &cm;

for (const auto& s : subscriptions_) {
s->createSubscription();
RETURN_IF_NOT_OK(s->createSubscription());
}
return absl::OkStatus();
}

void LoaderImpl::startRtdsSubscriptions(ReadyCallback on_done) {
Expand All @@ -632,11 +633,14 @@ RtdsSubscription::RtdsSubscription(
stats_scope_(store_.createScope("runtime")), resource_name_(rtds_layer.name()),
init_target_("RTDS " + resource_name_, [this]() { start(); }) {}

void RtdsSubscription::createSubscription() {
absl::Status RtdsSubscription::createSubscription() {
const auto resource_name = getResourceName();
subscription_ = THROW_OR_RETURN_VALUE(parent_.cm_->subscriptionFactory().subscriptionFromConfigSource(
auto subscription_or_error = parent_.cm_->subscriptionFactory().subscriptionFromConfigSource(
config_source_, Grpc::Common::typeUrl(resource_name), *stats_scope_, *this, resource_decoder_,
{}), Config::SubscriptionPtr);
{});
RETURN_IF_NOT_OK(subscription_or_error.status());
subscription_ = std::move(*subscription_or_error);
return absl::OkStatus();
}

absl::Status
Expand Down
4 changes: 2 additions & 2 deletions source/common/runtime/runtime_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ struct RtdsSubscription : Envoy::Config::SubscriptionBase<envoy::service::runtim
void start();
absl::Status validateUpdateSize(uint32_t added_resources_num, uint32_t removed_resources_num);
absl::Status onConfigRemoved(const Protobuf::RepeatedPtrField<std::string>& removed_resources);
void createSubscription();
absl::Status createSubscription();

LoaderImpl& parent_;
const envoy::config::core::v3::ConfigSource config_source_;
Expand Down Expand Up @@ -223,7 +223,7 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
Api::Api& api);

// Runtime::Loader
void initialize(Upstream::ClusterManager& cm) override;
absl::Status initialize(Upstream::ClusterManager& cm) override;
const Snapshot& snapshot() override;
SnapshotConstSharedPtr threadsafeSnapshot() override;
absl::Status mergeValues(const absl::node_hash_map<std::string, std::string>& values) override;
Expand Down
6 changes: 4 additions & 2 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config,
*scope_, *this, resource_decoder_, {}),
Config::SubscriptionPtr);
} else {
subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl(
*cds_resources_locator, cds_config, resource_name, *scope_, *this, resource_decoder_);
subscription_ = THROW_OR_RETURN_VALUE(
cm.subscriptionFactory().collectionSubscriptionFromUrl(
*cds_resources_locator, cds_config, resource_name, *scope_, *this, resource_decoder_),
Config::SubscriptionPtr);
}
}

Expand Down
6 changes: 4 additions & 2 deletions source/common/upstream/od_cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ OdCdsApiImpl::OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& odcds_co
*scope_, *this, resource_decoder_, {}),
Config::SubscriptionPtr);
} else {
subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl(
*odcds_resources_locator, odcds_config, resource_name, *scope_, *this, resource_decoder_);
subscription_ = THROW_OR_RETURN_VALUE(cm.subscriptionFactory().collectionSubscriptionFromUrl(
*odcds_resources_locator, odcds_config, resource_name,
*scope_, *this, resource_decoder_),
Config::SubscriptionPtr);
}
}

Expand Down
5 changes: 3 additions & 2 deletions source/extensions/clusters/eds/leds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ LedsSubscription::LedsSubscription(
Config::XdsResourceIdentifier::decodeUrl(leds_config.leds_collection_name()),
xds::core::v3::ResourceLocator);
const auto resource_name = getResourceName();
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().collectionSubscriptionFromUrl(
leds_resource_locator, leds_config.leds_config(), resource_name, *stats_scope_, *this,
resource_decoder_);
resource_decoder_),
Config::SubscriptionPtr);
subscription_->start({});
}

Expand Down
2 changes: 1 addition & 1 deletion source/server/config_validation/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void ValidationInstance::initialize(const Options& options,
[this]() -> Network::DnsResolverSharedPtr { return this->dnsResolver(); },
sslContextManager(), *secret_manager_, quic_stat_names_, *this);
THROW_IF_NOT_OK(config_.initialize(bootstrap_, *this, *cluster_manager_factory_));
runtime().initialize(clusterManager());
THROW_IF_NOT_OK(runtime().initialize(clusterManager()));
clusterManager().setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); });
}

Expand Down
2 changes: 1 addition & 1 deletion source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstShar

// We have to defer RTDS initialization until after the cluster manager is
// instantiated (which in turn relies on runtime...).
runtime().initialize(clusterManager());
RETURN_IF_NOT_OK(runtime().initialize(clusterManager()));

clusterManager().setPrimaryClustersInitializedCb(
[this]() { onClusterManagerPrimaryInitializationComplete(); });
Expand Down
5 changes: 3 additions & 2 deletions test/common/runtime/runtime_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ class RtdsLoaderImplTest : public LoaderImplTest {
dispatcher_, tls_, config, local_info_, store_, generator_, validation_visitor_, *api_);
THROW_IF_NOT_OK(loader.status());
loader_ = std::move(loader.value());
loader_->initialize(cm_);
THROW_IF_NOT_OK(loader_->initialize(cm_));
for (auto* sub : rtds_subscriptions_) {
EXPECT_CALL(*sub, start(_));
}
Expand Down Expand Up @@ -1298,7 +1298,8 @@ TEST_F(RtdsLoaderImplTest, BadConfigSource) {
absl::StatusOr<std::unique_ptr<Runtime::LoaderImpl>> loader = Runtime::LoaderImpl::create(
dispatcher_, tls_, config, local_info_, store_, generator_, validation_visitor_, *api_);

EXPECT_THROW_WITH_MESSAGE(loader.value()->initialize(cm_), EnvoyException, "bad config");
EXPECT_THROW_WITH_MESSAGE(loader.value()->initialize(cm_).IgnoreError(), EnvoyException,
"bad config");
}

} // namespace
Expand Down
Loading

0 comments on commit fed356e

Please sign in to comment.