Skip to content

Latest commit

 

History

History
47 lines (41 loc) · 1021 Bytes

File metadata and controls

47 lines (41 loc) · 1021 Bytes

KSQL DB Examples

Create streams

CREATE STREAM ride_streams (
    VendorId varchar, 
    trip_distance double,
    payment_type varchar
)  WITH (KAFKA_TOPIC='rides',
        VALUE_FORMAT='JSON');

Query stream

select * from RIDE_STREAMS 
EMIT CHANGES;

Query stream count

SELECT VENDORID, count(*) FROM RIDE_STREAMS 
GROUP BY VENDORID
EMIT CHANGES;

Query stream with filters

SELECT payment_type, count(*) FROM RIDE_STREAMS 
WHERE payment_type IN ('1', '2')
GROUP BY payment_type
EMIT CHANGES;

Query stream with window functions

CREATE TABLE payment_type_sessions AS
  SELECT payment_type,
         count(*)
  FROM  RIDE_STREAMS 
  WINDOW SESSION (60 SECONDS)
  GROUP BY payment_type
  EMIT CHANGES;

KSQL documentation for details

KSQL DB Documentation

KSQL DB Java client