Skip to content

Latest commit

 

History

History
132 lines (94 loc) · 3.16 KB

README.md

File metadata and controls

132 lines (94 loc) · 3.16 KB

Ytake\KsqlClient [ytake/php-ksql]

Apache kafka / Confluent KSQL REST Client for php

Build Status Coverage Status Scrutinizer Code Quality StyleCI

License Latest Version Total Downloads

What is KSQL

KSQL is the streaming SQL engine for Apache Kafka.

What Is KSQL?

Install

required >= PHP 7.1

$ composer require ytake/php-ksql

Usage

Request Preset

class
Ytake\KsqlClient\Query\CommandStatus
Ytake\KsqlClient\Query\Status
Ytake\KsqlClient\Query\ServerInfo
Ytake\KsqlClient\Query\Ksql
Ytake\KsqlClient\Query\Stream (for stream)

Syntax Reference

Get Command Status

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\CommandStatus;
use Ytake\KsqlClient\Computation\CommandId;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new CommandStatus(CommandId::fromString('stream/MESSAGE_STREAM/create'))
)->result();

Get Statuses

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Status;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new Status())->result();

Get KSQL Server Information

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\ServerInfo;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new ServerInfo())->result();

Query KSQL

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Ksql;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Ksql('DESCRIBE users_original;')
)->result();

Client for Stream Response

<?php

use Ytake\KsqlClient\StreamClient;
use Ytake\KsqlClient\Query\Stream;
use Ytake\KsqlClient\StreamConsumable;
use Ytake\KsqlClient\Entity\StreamedRow;

$client = new StreamClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Stream(
        'SELECT * FROM testing',
        new class() implements StreamConsumable {
            public function __invoke(StreamedRow $row) 
            {
                // stream response consumer
            }
        }    
    )
)->result();