Skip to content

Instantly share code, notes, and snippets.

@ltebean
Last active January 15, 2018 03:05
Show Gist options
  • Save ltebean/3fb761a9c783640f1bedd0001364a8bf to your computer and use it in GitHub Desktop.
Save ltebean/3fb761a9c783640f1bedd0001364a8bf to your computer and use it in GitHub Desktop.
CREATE STREAM kafka_event_stream(message json);
CREATE STREAM event_stream(app_id text, user_id INT, event_name text, event_time timestamp)
CREATE CONTINUOUS TRANSFORM t AS
SELECT
app_id::text AS app_id,
cast(event::json ->> 'userId' AS int) AS user_id,
cast(event::json ->> 'eventName' AS text) AS event_name,
cast(event::json ->> 'eventTime' AS timestamp) AS event_time
FROM (
SELECT message::json->>'appId' AS app_id, (json_array_elements(message->'events'))::json AS event FROM kafka_event_stream
) t
THEN EXECUTE PROCEDURE pipeline_stream_insert('event_stream');
CREATE CONTINUOUS view dau AS
SELECT app_id, Date(event_time) AS day, COUNT(distinct(user_id))
FROM event_stream
GROUP BY app_id, DAY
CREATE EXTENSION pipeline_kafka;
SELECT pipeline_kafka.add_broker('192.168.1.3:9092');
SELECT pipeline_kafka.consume_begin('event', 'kafka_event_stream', format := 'json');
INSERT INTO kafka_event_stream values('{"appId":"app1", "events":[{"userId":1, "eventName":"open_app", "eventTime": "2018-01-04 10:12:13"}]}')
SELECT * FROM dau
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment