Skip to content

Instantly share code, notes, and snippets.

@kafagy
Last active January 3, 2020 20:41
Show Gist options
  • Save kafagy/5c9e39edf3803402ac005ac21cd52376 to your computer and use it in GitHub Desktop.
Save kafagy/5c9e39edf3803402ac005ac21cd52376 to your computer and use it in GitHub Desktop.
## Basics
# Create the topic using the kafka-topics command
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 6 --topic inventory_purchases
# list all topics that we have in Kafka (so we can observe the internal topics)
kafka-topics --list --zookeeper localhost:2181
# Start a command line producer
kafka-console-producer --broker-list localhost:9092 --topic inventory_purchases
# Start up a command line consumer
kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --from-beginning
# Start up a command line consumer group
kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --group 1 > /home/cloud_user/output/group1_consumer1.txt
kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --group 2 > /home/cloud_user/output/group2_consumer1.txt
kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --group 2 > /home/cloud_user/output/group2_consumer2.txt
## Broker Configs
#List the current configurations for broker 1.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
#Modify the configuration for broker 1 by setting log.cleaner.threads to 2.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config log.cleaner.threads=2
#List the configurations for broker 1 to see the newly added configuration.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
## Topic Configs
# Create a new topic with a configuration override.
kafka-topics --bootstrap-server localhost:9092 --create --topic configured-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000
# List the configurations for the topic to see the configuration override.
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --describe
# Modify the configuration override for the existing topic.
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --alter --add-config max.message.bytes=65000
# List the topic configurations again to see the changes.
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name configured-topic --describe
# Modify a broker-wide default topic configuration.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --alter --add-config message.max.bytes=66000
# View the broker configuration to see the changes to the default.
kafka-configs --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe
## Producing Messages with REST Proxy
# Create a topic to use for testing.
kafka-topics --bootstrap-server localhost:9092 --create --topic rest-test-topic --partitions 1 --replication-factor 1
# Publish some messages to the topic using the Confluent REST proxy.
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"key":"message","value":"Hello"},{"key":"message","value":"World"}]}' "http://localhost:8082/topics/rest-test-topic"
# Use a console consumer to verify that your messages are present in the topic.
kafka-console-consumer --bootstrap-server localhost:9092 --topic rest-test-topic --from-beginning --property print.key=true
## Consuming Messages with REST Proxy
# Create a consumer and a consumer instance that will start from the beginning of the topic log.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_json_consumer
# Subscribe the consumer to the topic.
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"topics":["rest-test-topic"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# Consume the messages.
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
# When you are finished using the consumer, close it to clean up.
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance
## KSQL
#Start a KSQL session.
sudo ksql
# Set auto.offset.reset to earliest.
SET 'auto.offset.reset' = 'earliest';
# List topics currently in the cluster.
SHOW TOPICS;
# Display records from the ksql-test topic.
PRINT 'ksql-test' FROM BEGINNING;
# Create a stream from the ksql-test topic.
CREATE STREAM ksql_test_stream (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED');
# Create a table from the ksql-test topic.
CREATE TABLE ksql_test_table (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED', key='employee_id');
# Select data from a stream.
SELECT * FROM ksql_test_stream;
# Use a select query to perform a sum aggregation.
SELECT sum(vacation_days) FROM ksql_test_stream GROUP BY employee_id;
# Create a Stream to Pull Data in from the Topic
CREATE STREAM member_signups
(firstname VARCHAR,
lastname VARCHAR,
email_notifications BOOLEAN)
WITH (KAFKA_TOPIC='member_signups',
VALUE_FORMAT='DELIMITED');
# Create a Persistent Streaming Query to Write Data to the Output Topic in Real Time
CREATE STREAM member_signups_email AS
SELECT * FROM member_signups WHERE email_notifications=true;
## Joining two streams on row key
CREATE STREAM member_signups
(lastname VARCHAR,
firstname VARCHAR)
WITH (KAFKA_TOPIC='member_signups',
VALUE_FORMAT='DELIMITED');
CREATE STREAM member_contact
(email VARCHAR)
WITH (KAFKA_TOPIC='member_contact',
VALUE_FORMAT='DELIMITED');
CREATE STREAM MEMBER_EMAIL_LIST AS
SELECT member_signups.lastname, member_signups.firstname, member_contact.email FROM member_signups
INNER JOIN member_contact WITHIN 365 DAYS ON member_signups.rowkey = member_contact.rowkey;
## Kafka Connect
#Enter some data into the input file.
vi input.txt
# Create a source connector to import data into Kafka from a file.
curl -X POST http://localhost:8083/connectors \
-H 'Accept: */*' \
-H 'Content-Type: application/json' \
-d '{
"name": "file_source_connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"topic": "connect_topic",
"file": "/home/cloud_user/input.txt",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
# Get information about the source connector.
curl http://localhost:8083/connectors/file_source_connector
curl http://localhost:8083/connectors/file_source_connector/status
# Check the topic to verify the new data has appeared.
kafka-console-consumer --bootstrap-server localhost:9092 --topic connect_topic --from-beginning
# Create a sink connector to export data from Kafka to a file.
curl -X POST http://localhost:8083/connectors \
-H 'Accept: */*' \
-H 'Content-Type: application/json' \
-d '{
"name": "file_sink_connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics": "connect_topic",
"file": "/home/cloud_user/output.txt",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://10.0.1.102:5432/inventory",
"connection.user": "kafka",
"connection.password": "Kafka!",
"topic.prefix": "postgres-",
"mode":"timestamp",
"timestamp.column.name": "update_ts"
}
}'
# Check the contents of the output file.
cat /home/cloud_user/output.txt
# Delete both connectors to clean up.
curl -X DELETE http://localhost:8083/connectors/file_source_connector
curl -X DELETE http://localhost:8083/connectors/file_sink_connector
## Kafka Monitoring
#Create a sample topic.
kafka-topics --bootstrap-server zoo1:9092 --topic monitor-test --create --partitions 1 --replication-factor 1
# Spin up a producer with some JVM arguments to enable connecting via JMX.
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" kafka-console-producer --broker-list zoo1:9092 --topic monitor-test
# In a graphical shell, open JConsole and connect to the producer.
sudo jconsole
# Spin up a consumer with some JVM arguments to enable connecting via JMX.
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" kafka-console-consumer --bootstrap-server zoo1:9092 --from-beginning --topic monitor-test
# In a graphical shell, open JConsole and connect to the consumer.
sudo jconsole
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment