Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: allow to pass RdKafka\Conf instance into transport factory #78

Open
Arkemlar opened this issue Mar 6, 2023 · 1 comment

Comments

@Arkemlar
Copy link

Arkemlar commented Mar 6, 2023

With optional constructor argument or setter injection in KafkaTransportFactory for RdKafka\Conf (imported as KafkaConf in that file) we could set any callbacks we want, and no extra PRs for callback support features needed.

Example/change proposal

Make these changes

    private KafkaConf $conf = null;
    public function setConfInstance(KafkaConf $conf): void
    {
        $this->conf = $conf;
    }
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        $conf = null !== $this->conf ? clone $this->conf : new KafkaConf;
        ...
    }

THEN we can pass configured RdKafka\Conf instance.

How client code might look like

To pass RdKafka\Conf instance we would need to make only 2 steps:

  1. First create configurator - a place that cooks our RdKafka\Conf instance
namespace App\DI;

use Psr\Log\NullLogger;
use Koco\Kafka\Messenger\KafkaTransportFactory;
use RdKafka\Conf as KafkaConf;

class KafkaTransportConfigurator
{
    public function __construct(private ?LoggerInterface $logger)
    {
        $this->logger = $logger ?? new NullLogger();
    }

    public function __invoke(KafkaTransportFactory $transportFactory)
    {
        $conf = new KafkaConf;

        // defne callbacks like that
        $conf->setStatsCb($this->setStatsCb());
        // or like that
        $conf->setErrorCb(fn ($kafka, $err, $reason) => $this->logger->error($reason));

        // inject configured instance into transport factory
        $transportFactory->setConfInstance($conf);
    }
  1. And define configurator for KafkaTransportFactory service
    Koco\Kafka\Messenger\KafkaTransportFactory:
        configurator: '@App\DI\KafkaTransportConfigurator'

That must work. Symfony calls KafkaTransportConfigurator->__invoke() right after transport instance created and before it is used to produce transport.

This symfony container feature documented here

@Arkemlar
Copy link
Author

Arkemlar commented Mar 6, 2023

Alternative way to set conf instance via setter injection:

     Koco\Kafka\Messenger\KafkaTransportFactory:
         ... # config from https://github.com/KonstantinCodes/messenger-kafka/blob/master/src/Resources/config/services.xml
         calls:
             - setConfInstance: ["@=service('App\\\\DI\\\\KafkaTransportConfigurator').getConf()"]
             

Symfony docs about this feature.

But configurator looks better, it does not override original service

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant