Skip to content

Tutorial 2: Simple Pipeline with TCP

Jin Heo edited this page Dec 16, 2023 · 5 revisions

How to run

$ cd FlexPipe/examples
$ bash 2_tcp_pipeline.sh

In this tutorial, by using the given SourceKernel and SinkKernelB (with blocking input), we create a pipeline connected via TCP in a single machine. The pipeline topology looks below.

|Source|[o1]----TCP--->[i1, B]|Sink|

where B: blocking

Port Registration by Kernel Developer

Using the same kernels, it is the same as the previous tutorial.

Port Activation by Kernel User

The kernel user uses the given kernels to create a pipeline. The below codes show how the pipeline is created.

/* examples/2_tcp_pipeline.cc */
int main()
{
  /* 1. Create instances of the given kernels */
  SourceKernel *sourceKernel = new SourceKernel("sourceKernel");
  SinkKernelB *sinkKernelB = new SinkKernelB("sinkKernel");

  /* 2. Activate ports that are registered by the kernel developers */
  sourceKernel->portManager.activateOutPortAsRemote(flexpipe::RemoteProtocol::ZMQ_TCP, "o1", "127.0.0.1", 5555);
  sinkKernelB->portManager.activateInPortAsRemote(flexpipe::RemoteProtocol::ZMQ_TCP, "i1", 5555);

  std::vector<std::thread> singleKernelThreads;
  vector<flexpipe::Kernel*> separateKernels;
  separateKernels.push_back(sourceKernel);
  separateKernels.push_back(sinkKernelB);

  /* 3. Run kernels in separate threads as they are not locally linked. */
  for(int i = 0; i < separateKernels.size(); i++)
  {
    std::thread singleKernelThread(flexpipe::runSingleKernel, separateKernels[i]);
    singleKernelThreads.push_back(std::move(singleKernelThread));
  }

  for(int i = 0; i < singleKernelThreads.size(); i++) singleKernelThreads[i].join();
}

After creating the kernels, the kernel ports are activated with the protocol::ZMQ_TCP. FlexPipe's port interface works with different communication channels, and ZeroMQ is supported for TCP. As the kernels are not linked locally, they run in separate threads.

Compared to the FlexPipe local connection, TCP connection involves data serialization and copy, and it increases the end-to-end latency with the large size of data.