Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Report]: KafkaFlow.SchemaRegistry does not work with multiple clusters/schema registry URLs #558

Open
1 task done
danielmpetrov opened this issue Apr 16, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@danielmpetrov
Copy link
Contributor

danielmpetrov commented Apr 16, 2024

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

We ran into this issue when we tried to connect two different Kafka clusters, with two different schema registry endpoints. We noticed that one of the consumers was throwing a unfamiliar exception. I don't have the exact stack trace, but the issue was "schema id could not be found". When running the solution with one of the cluster configs or the other, it works. When running both of them together on the other hand, one of the cluster's consumers constantly throws.

We debugged KafkaFlow's source code and traced the problem deep down into Confluent.SchemaRegistry. The Confluent library expects an ISchemaRegistryClient per schema registry endpoint, however KafkaFlow's WithSchemaRegistry method registers singleton clients, and then other classes resolve the last one registered. By resolving the last registered client (at least how Microsoft's DI container works), all clusters and their consumers will use that last registered client, which results in a 404 Not Found for all other consumers, since the schema ID's are naturally different across different schema registries.

Steps to reproduce

  1. Register multiple clusters with different schema registries, contrived example below:
// the first cluster's consumers will throw,
// since it resolves the schema registry client of the second one
// last one registered wins! 😢
services.AddKafka(kafka =>
{
    kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "localhost:9092" }).WithSchemaRegistry(config => config.Url = "localhost:8081"));
    kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "localhost:9093" }).WithSchemaRegistry(config => config.Url = "localhost:8082"));
});

// this works fine
services.AddKafka(kafka =>
{
    kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "localhost:9092" }).WithSchemaRegistry(config => config.Url = "localhost:8081"));
});

// this works fine
services.AddKafka(kafka =>
{
    kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "localhost:9093" }).WithSchemaRegistry(config => config.Url = "localhost:8082"));
});
  1. Add a consumer to each cluster, and start the program.

Expected behavior

We expect each cluster to read from it's own schema registry, as implied by the docstring of WithSchemaRegistry - "Configures schema registry to the cluster".

Actual behavior

One of the consumer throws exceptions "Cannot find schema with id '123'" (paraphrased)

KafkaFlow version

v3.0.7

@danielmpetrov danielmpetrov added the bug Something isn't working label Apr 16, 2024
@danielmpetrov danielmpetrov changed the title [Bug Report]: KafkaFlow.SchemaRegistry does not work with multiple clusters [Bug Report]: KafkaFlow.SchemaRegistry does not work with multiple clusters/schema registry URLs Apr 16, 2024
@ruiqbarbosa
Copy link
Contributor

Hello @danielmpetrov,

We have internally analyzed the issue and indeed, Kafkaflow is resolving to the last registered Schema Registry Client.

We will internally review the best way to fix the problem and will provide feedback as soon as possible.

Thank you!

@KurtNapolitano-TCGP
Copy link

Our organization has chosen to add code to work around this bug. I've uploaded it here, in the hopes that this either helps the maintainers of this project find a workable solution or helps others blocked by this bug find a suitable work around for the time being.

Some of the decisions made in this implementation are specific to our organizational use cases but hopefully the code will serve as a guide.

Note: As of .NET 8, IServiceCollection supports using "keyed" registrations. Since the KafkaFlow IDependencyConfigurator wraps IServiceCollection, I wasn't able to take advantage of that feature, which seems perfect for this problem. Hopefully the KafkaFlow owners will be able to exercise that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

No branches or pull requests

3 participants