Skip to content

Instantly share code, notes, and snippets.

@keithchambers
Last active April 14, 2025 02:08
Show Gist options
  • Save keithchambers/928628f124672979c2ce319f48c42e0f to your computer and use it in GitHub Desktop.
Save keithchambers/928628f124672979c2ce319f48c42e0f to your computer and use it in GitHub Desktop.
OTEL-Kafka-Clickhouse
#!/bin/bash
# Create directory structure
echo "Creating project directories..."
# Replaced nginx with traefik, added traefik config dir
mkdir -p otel-kafka-clickhouse-stack/{traefik/conf,otel-collector,clickhouse-init,kafka-data,clickhouse-data}
cd otel-kafka-clickhouse-stack || exit 1 # Exit if cd fails
# Create .env file for secrets and configs
echo "Creating .env file..."
cat << EOF > .env
# Kafka Config
KAFKA_TOPIC=otel_data # Using generic Kafka topic name
# ClickHouse Credentials & DB
CLICKHOUSE_DB=otel_db
CLICKHOUSE_USER=otel_user
CLICKHOUSE_PASSWORD=otel_password_123!
# OTEL Collector Ports (internal)
OTEL_COLLECTOR_GRPC_PORT=5317
OTEL_COLLECTOR_HTTP_PORT=5318
# Traefik Ports (external) - Renamed from NGINX
TRAEFIK_OTEL_GRPC_PORT=4317
TRAEFIK_OTEL_HTTP_PORT=4318
TRAEFIK_DASHBOARD_PORT=8080 # Added port for Traefik dashboard
EOF
# Create docker-compose.yml
echo "Creating docker-compose.yml..."
# Using Apache Kafka (Bitnami image, latest stable 3.x) in KRaft mode
# Added platform: linux/arm64 for Apple Silicon compatibility
# Replaced nginx with traefik:latest
cat << 'EOF' > docker-compose.yml
networks:
otel-net:
driver: bridge
volumes:
kafka_data: # Using kafka volume name
clickhouse-data:
services:
traefik:
image: traefik:latest # Using latest Traefik image (consider pinning version e.g., traefik:v3.0)
container_name: traefik-proxy
platform: linux/arm64 # Specify platform for ARM64 hosts (Apple Silicon)
ports:
# OTLP Entrypoints
- "${TRAEFIK_OTEL_GRPC_PORT:-4317}:${TRAEFIK_OTEL_GRPC_PORT:-4317}" # gRPC entrypoint
- "${TRAEFIK_OTEL_HTTP_PORT:-4318}:${TRAEFIK_OTEL_HTTP_PORT:-4318}" # HTTP entrypoint
# Traefik Dashboard (optional)
- "${TRAEFIK_DASHBOARD_PORT:-8080}:8080"
volumes:
- ./traefik/traefik.yml:/etc/traefik/traefik.yml:ro # Static configuration
# Mount docker socket to allow Traefik to discover services
- /var/run/docker.sock:/var/run/docker.sock:ro
networks:
- otel-net
depends_on:
- otel-collector
restart: unless-stopped
# Labels for Traefik's own dashboard (optional)
labels:
- "traefik.enable=true"
- "traefik.http.routers.traefik-dashboard.rule=Host(`traefik.localhost`)" # Access dashboard via traefik.localhost:8080
- "traefik.http.routers.traefik-dashboard.entrypoints=dashboard"
- "traefik.http.routers.traefik-dashboard.service=api@internal"
# You might want to add basic auth middleware for the dashboard in production
# - "traefik.http.middlewares.traefik-auth.basicauth.users=user:$$apr1$$....$$" # Example basic auth
# - "traefik.http.routers.traefik-dashboard.middlewares=traefik-auth"
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
container_name: otel-collector
platform: linux/arm64 # Specify platform for ARM64 hosts (Apple Silicon)
volumes:
- ./otel-collector/otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml:ro
networks:
- otel-net
depends_on:
kafka: # Changed dependency to kafka
condition: service_started # Changed condition as no healthcheck
environment:
- KAFKA_BROKERS=kafka:9092 # Updated broker env var name and value
- KAFKA_TOPIC=${KAFKA_TOPIC} # Updated topic env var name
- OTEL_COLLECTOR_GRPC_PORT=${OTEL_COLLECTOR_GRPC_PORT:-5317} # Internal gRPC port
- OTEL_COLLECTOR_HTTP_PORT=${OTEL_COLLECTOR_HTTP_PORT:-5318} # Internal HTTP port
command: ["--config=/etc/otelcol-contrib/config.yaml"]
restart: unless-stopped
# Traefik labels for service discovery and routing
labels:
- "traefik.enable=true"
# HTTP Router for OTLP/HTTP
- "traefik.http.routers.otel-http.rule=PathPrefix(`/v1/traces`) || PathPrefix(`/v1/metrics`) || PathPrefix(`/v1/logs`)"
- "traefik.http.routers.otel-http.entrypoints=otelhttp" # Match entrypoint defined in traefik.yml
- "traefik.http.routers.otel-http.service=otel-http-svc"
# HTTP Service for OTLP/HTTP
- "traefik.http.services.otel-http-svc.loadbalancer.server.port=${OTEL_COLLECTOR_HTTP_PORT:-5318}" # Target internal HTTP port
# TCP Router for OTLP/gRPC
- "traefik.tcp.routers.otel-grpc.rule=HostSNI(`*`)" # Match any SNI for gRPC
- "traefik.tcp.routers.otel-grpc.entrypoints=otelgrpc" # Match entrypoint defined in traefik.yml
- "traefik.tcp.routers.otel-grpc.service=otel-grpc-svc"
# TCP Service for OTLP/gRPC
- "traefik.tcp.services.otel-grpc-svc.loadbalancer.server.port=${OTEL_COLLECTOR_GRPC_PORT:-5317}" # Target internal gRPC port
# Pass through Authorization header (collector needs to handle it)
# Traefik passes most headers by default, no specific middleware needed here unless transformation is required at proxy level
kafka:
# Using latest stable 3.x release supporting KRaft, as 4.0 is not released (as of Apr 2025)
image: bitnami/kafka:3.7
container_name: kafka
platform: linux/arm64 # Specify platform for ARM64 hosts (Apple Silicon)
ports:
- "9092:9092" # Expose the PLAINTEXT port
volumes:
- kafka_data:/bitnami/kafka # Bitnami uses /bitnami/kafka path
networks:
- otel-net
environment:
# KRaft settings for Bitnami image
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CLUSTER_ID=RZkEa4XtSqy-k-Q5v_fYeg # Example pre-generated Cluster ID
# Replication settings for single node cluster
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
# Other settings
- KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data # Match volume mount path for bitnami
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- ALLOW_PLAINTEXT_LISTENER=yes # Bitnami specific
restart: unless-stopped
clickhouse:
image: clickhouse/clickhouse-server:latest
container_name: clickhouse
platform: linux/arm64 # Specify platform for ARM64 hosts (Apple Silicon)
ports:
- "8123:8123"
- "9000:9000"
volumes:
- clickhouse-data:/var/lib/clickhouse
- ./clickhouse-init:/docker-entrypoint-initdb.d
networks:
- otel-net
ulimits:
nofile:
soft: 262144
hard: 262144
environment:
- CLICKHOUSE_DB=${CLICKHOUSE_DB}
- CLICKHOUSE_USER=${CLICKHOUSE_USER}
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD}
- KAFKA_BROKERS=kafka:9092 # Updated broker env var name and value
- KAFKA_TOPIC=${KAFKA_TOPIC} # Updated topic env var name
depends_on:
kafka: # Changed dependency to kafka
condition: service_started # Changed condition as no healthcheck
restart: unless-stopped
EOF
# Create Traefik static configuration (traefik/traefik.yml)
echo "Creating Traefik static configuration..."
cat << 'EOF' > traefik/traefik.yml
# Traefik static configuration
# Define Entrypoints
entryPoints:
otelgrpc: # Entrypoint for OTLP/gRPC (TCP)
address: ":${TRAEFIK_OTEL_GRPC_PORT:-4317}" # Use env var for port
otelhttp: # Entrypoint for OTLP/HTTP
address: ":${TRAEFIK_OTEL_HTTP_PORT:-4318}" # Use env var for port
dashboard: # Entrypoint for Traefik dashboard (optional)
address: ":8080"
# Configure API and Dashboard (optional, insecure by default)
api:
insecure: true # Enable API/dashboard without auth (for local dev)
dashboard: true # Enable the dashboard UI
# Configure Docker Provider
providers:
docker:
endpoint: "unix:///var/run/docker.sock" # Connect to Docker socket
exposedByDefault: false # Only expose containers with 'traefik.enable=true' label
network: otel-kafka-clickhouse-stack_otel-net # Specify the network Traefik should use
# Note: Name is <project_dir>_<network_name> by default
# Enable Logging
log:
level: INFO # Log level (DEBUG, INFO, WARNING, ERROR)
# Enable Access Logs (optional)
accessLog: {} # Default settings (stdout, Common Log Format)
EOF
# Create OTEL Collector configuration (otel-collector/otel-collector-config.yaml)
# Modified transform processor to extract API key from Authorization header
echo "Creating OTEL Collector configuration..."
cat << 'EOF' > otel-collector/otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:${env:OTEL_COLLECTOR_GRPC_PORT}
http:
endpoint: 0.0.0.0:${env:OTEL_COLLECTOR_HTTP_PORT}
cors:
allowed_origins: ["http://*", "https://*"]
processors:
batch:
send_batch_size: 8192
timeout: 1s
memory_limiter:
check_interval: 1s
limit_percentage: 75
spike_limit_percentage: 25
transform:
error_mode: ignore
# Updated statements to extract API key from Authorization header if present
trace_statements:
- context: resource
statements:
- set(attributes["otel_collector_arrival_unix_nano"], NanoTime(Now()))
- set(attributes["otel_collector_message_uuid"], UUID())
# Try extracting from 'Authorization: Bearer <key>' header first
- set(attributes["application_uuid"], ReplaceRegexpMatch(http.request.header["Authorization"][0], "^Bearer\\s+(.*)$", "$1")) where IsMatch(http.request.header["Authorization"][0], "^Bearer\\s+.*$")
# Fallback to 'api.key' attribute if Authorization header didn't work
- set(attributes["application_uuid"], attributes["api.key"]) where attributes["application_uuid"] == nil and attributes["api.key"] != nil
metric_statements:
- context: resource
statements:
- set(attributes["otel_collector_arrival_unix_nano"], NanoTime(Now()))
- set(attributes["otel_collector_message_uuid"], UUID())
# Try extracting from 'Authorization: Bearer <key>' header first
- set(attributes["application_uuid"], ReplaceRegexpMatch(http.request.header["Authorization"][0], "^Bearer\\s+(.*)$", "$1")) where IsMatch(http.request.header["Authorization"][0], "^Bearer\\s+.*$")
# Fallback to 'api.key' attribute if Authorization header didn't work
- set(attributes["application_uuid"], attributes["api.key"]) where attributes["application_uuid"] == nil and attributes["api.key"] != nil
log_statements:
- context: resource
statements:
- set(attributes["otel_collector_arrival_unix_nano"], NanoTime(Now()))
- set(attributes["otel_collector_message_uuid"], UUID())
# Try extracting from 'Authorization: Bearer <key>' header first
- set(attributes["application_uuid"], ReplaceRegexpMatch(http.request.header["Authorization"][0], "^Bearer\\s+(.*)$", "$1")) where IsMatch(http.request.header["Authorization"][0], "^Bearer\\s+.*$")
# Fallback to 'api.key' attribute if Authorization header didn't work
- set(attributes["application_uuid"], attributes["api.key"]) where attributes["application_uuid"] == nil and attributes["api.key"] != nil
exporters:
kafka:
brokers: [${env:KAFKA_BROKERS}] # Uses KAFKA_BROKERS env var
topic: ${env:KAFKA_TOPIC} # Uses KAFKA_TOPIC env var
encoding: otlp_json
# Added settings for resilience:
retry_on_failure:
enabled: true
initial_interval: 5s # Start retrying after 5s
max_interval: 30s # Max time between retries
max_elapsed_time: 5m # Retry for up to 5 minutes before giving up
sending_queue:
enabled: true
num_consumers: 2 # Number of Kafka producer connections
queue_size: 1000 # Buffer size
extensions:
health_check: { endpoint: 0.0.0.0:13133 }
pprof: { endpoint: 0.0.0.0:1777 }
zpages: { endpoint: 0.0.0.0:55679 }
service:
extensions: [health_check, pprof, zpages]
pipelines:
traces: { receivers: [otlp], processors: [memory_limiter, transform, batch], exporters: [kafka] }
metrics: { receivers: [otlp], processors: [memory_limiter, transform, batch], exporters: [kafka] }
logs: { receivers: [otlp], processors: [memory_limiter, transform, batch], exporters: [kafka] }
EOF
# Create ClickHouse initialization script (clickhouse-init/init-db.sh)
# No changes needed here as it depends on Kafka, not the proxy
echo "Creating ClickHouse init script..."
cat << 'EOF' > clickhouse-init/init-db.sh
#!/bin/bash
set -e
clickhouse client -n <<-EOSQL
CREATE DATABASE IF NOT EXISTS ${CLICKHOUSE_DB};
-- Kafka queue table uses updated env vars
CREATE TABLE IF NOT EXISTS ${CLICKHOUSE_DB}.otel_kafka_queue (
raw_message String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = '${KAFKA_BROKERS}', -- Uses KAFKA_BROKERS env var
kafka_topic_list = '${KAFKA_TOPIC}', -- Uses KAFKA_TOPIC env var
kafka_group_name = 'clickhouse_otel_consumer_group_${CLICKHOUSE_DB}',
kafka_format = 'JSONAsString',
kafka_num_consumers = 2,
kafka_max_block_size = 1048576,
kafka_skip_broken_messages = 10;
# No SASL settings
-- Helper functions
DROP FUNCTION IF EXISTS ${CLICKHOUSE_DB}.getResourceAttr;
CREATE FUNCTION ${CLICKHOUSE_DB}.getResourceAttr AS
(resource_attributes_raw, attr_key) ->
ifNull(JSONExtractString(resource_attributes_raw, concat('[?]((@.key==\'', attr_key, '\').value.stringValue)')), '');
DROP FUNCTION IF EXISTS ${CLICKHOUSE_DB}.getAttributesMap;
CREATE FUNCTION ${CLICKHOUSE_DB}.getAttributesMap AS
(attributes_raw) ->
if( JSONIsNull(attributes_raw) OR JSONLength(attributes_raw) = 0,
map(),
mapFromArrays(
arrayMap(x -> JSONExtractString(x, 'key'), JSONExtractArrayRaw(attributes_raw)),
arrayMap(x -> JSONExtractString(x, 'value', 'stringValue'), JSONExtractArrayRaw(attributes_raw))
)
);
-- Target table for SPANS
CREATE TABLE IF NOT EXISTS ${CLICKHOUSE_DB}.otel_spans (
arrival_time DateTime64(9) CODEC(Delta(8), ZSTD(1)), message_uuid UUID CODEC(ZSTD(1)), application_uuid LowCardinality(String) CODEC(ZSTD(1)),
service_name LowCardinality(String) CODEC(ZSTD(1)), scope_name LowCardinality(String) CODEC(ZSTD(1)), scope_version LowCardinality(String) CODEC(ZSTD(1)),
trace_id String CODEC(ZSTD(1)), span_id String CODEC(ZSTD(1)), parent_span_id String CODEC(ZSTD(1)), span_name String CODEC(ZSTD(1)),
span_kind LowCardinality(String) CODEC(ZSTD(1)), start_time DateTime64(9) CODEC(Delta(8), ZSTD(1)), end_time DateTime64(9) CODEC(Delta(8), ZSTD(1)),
duration_ms Int64 CODEC(T64, ZSTD(1)), status_code LowCardinality(String) CODEC(ZSTD(1)), status_message String CODEC(ZSTD(1)),
attributes Map(String, String) CODEC(ZSTD(3)), events String CODEC(ZSTD(3))
) ENGINE = MergeTree() PARTITION BY toYYYYMM(arrival_time) ORDER BY (application_uuid, service_name, span_name, start_time) SETTINGS index_granularity = 8192;
-- MV for spans
CREATE MATERIALIZED VIEW IF NOT EXISTS ${CLICKHOUSE_DB}.otel_spans_mv TO ${CLICKHOUSE_DB}.otel_spans AS
SELECT
ifNull(fromUnixTimestamp64Nano(CAST(JSONExtractRaw(resource_attributes_raw, '[?]((@.key==\'otel_collector_arrival_unix_nano\').value.intValue)') AS Int64)), now64(9)) as arrival_time,
ifNull(toUUID(JSONExtractString(resource_attributes_raw, '[?]((@.key==\'otel_collector_message_uuid\').value.stringValue)')), generateUUIDv4()) AS message_uuid,
${CLICKHOUSE_DB}.getResourceAttr(resource_attributes_raw, 'application_uuid') AS application_uuid, ${CLICKHOUSE_DB}.getResourceAttr(resource_attributes_raw, 'service.name') AS service_name,
ifNull(JSONExtractString(scope_spans_raw, 'scope', 'name'), '') AS scope_name, ifNull(JSONExtractString(scope_spans_raw, 'scope', 'version'), '') AS scope_version,
hex(JSONExtract(span_raw, 'traceId', 'String')) AS trace_id, hex(JSONExtract(span_raw, 'spanId', 'String')) AS span_id, hex(JSONExtract(span_raw, 'parentSpanId', 'String')) AS parent_span_id,
ifNull(JSONExtractString(span_raw, 'name'), '') AS span_name, ifNull(JSONExtractString(span_raw, 'kind'), 'SPAN_KIND_UNSPECIFIED') AS span_kind,
fromUnixTimestamp64Nano(JSONExtractUInt(span_raw, 'startTimeUnixNano')) AS start_time, fromUnixTimestamp64Nano(JSONExtractUInt(span_raw, 'endTimeUnixNano')) AS end_time,
toInt64((JSONExtractUInt(span_raw, 'endTimeUnixNano') - JSONExtractUInt(span_raw, 'startTimeUnixNano')) / 1000000) AS duration_ms,
ifNull(JSONExtractString(span_raw, 'status', 'code'), 'STATUS_CODE_UNSET') AS status_code, ifNull(JSONExtractString(span_raw, 'status', 'message'), '') AS status_message,
${CLICKHOUSE_DB}.getAttributesMap(JSONExtract(span_raw, 'attributes')) AS attributes, ifNull(JSONExtractString(span_raw, 'events'), '[]') AS events
FROM ${CLICKHOUSE_DB}.otel_kafka_queue
CROSS JOIN (SELECT JSONExtract(raw_message, 'resource', 'attributes') AS resource_attributes_raw) AS res
ARRAY JOIN JSONExtractArrayRaw(raw_message, 'scopeSpans') AS scope_spans_raw ARRAY JOIN JSONExtractArrayRaw(scope_spans_raw, 'spans') AS span_raw
WHERE JSONHas(raw_message, 'scopeSpans') AND JSONLength(JSONExtractArrayRaw(raw_message, 'scopeSpans')) > 0;
-- Target table for LOGS
CREATE TABLE IF NOT EXISTS ${CLICKHOUSE_DB}.otel_logs (
arrival_time DateTime64(9) CODEC(Delta(8), ZSTD(1)), message_uuid UUID CODEC(ZSTD(1)), application_uuid LowCardinality(String) CODEC(ZSTD(1)),
service_name LowCardinality(String) CODEC(ZSTD(1)), timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)), severity_text LowCardinality(String) CODEC(ZSTD(1)),
severity_number Int32 CODEC(T64, ZSTD(1)), log_name String CODEC(ZSTD(1)), body String CODEC(ZSTD(3)), trace_id String CODEC(ZSTD(1)), span_id String CODEC(ZSTD(1)),
attributes Map(String, String) CODEC(ZSTD(3))
) ENGINE = MergeTree() PARTITION BY toYYYYMM(arrival_time) ORDER BY (application_uuid, service_name, severity_number, timestamp) SETTINGS index_granularity = 8192;
-- MV for logs
CREATE MATERIALIZED VIEW IF NOT EXISTS ${CLICKHOUSE_DB}.otel_logs_mv TO ${CLICKHOUSE_DB}.otel_logs AS
SELECT
ifNull(fromUnixTimestamp64Nano(CAST(JSONExtractRaw(resource_attributes_raw, '[?]((@.key==\'otel_collector_arrival_unix_nano\').value.intValue)') AS Int64)), now64(9)) as arrival_time,
ifNull(toUUID(JSONExtractString(resource_attributes_raw, '[?]((@.key==\'otel_collector_message_uuid\').value.stringValue)')), generateUUIDv4()) AS message_uuid,
${CLICKHOUSE_DB}.getResourceAttr(resource_attributes_raw, 'application_uuid') AS application_uuid, ${CLICKHOUSE_DB}.getResourceAttr(resource_attributes_raw, 'service.name') AS service_name,
fromUnixTimestamp64Nano(JSONExtractUInt(log_record_raw, 'timeUnixNano')) AS timestamp, ifNull(JSONExtractString(log_record_raw, 'severityText'), '') AS severity_text,
ifNull(JSONExtractInt(log_record_raw, 'severityNumber'), 0) AS severity_number, ifNull(JSONExtractString(log_record_raw, 'name'), '') AS log_name,
ifNull(JSONExtractString(log_record_raw, 'body', 'stringValue'), '') AS body, hex(JSONExtract(log_record_raw, 'traceId', 'String')) AS trace_id, hex(JSONExtract(log_record_raw, 'spanId', 'String')) AS span_id,
${CLICKHOUSE_DB}.getAttributesMap(JSONExtract(log_record_raw, 'attributes')) AS attributes
FROM ${CLICKHOUSE_DB}.otel_kafka_queue
CROSS JOIN (SELECT JSONExtract(raw_message, 'resource', 'attributes') AS resource_attributes_raw) AS res
ARRAY JOIN JSONExtractArrayRaw(raw_message, 'resourceLogs') AS rl ARRAY JOIN JSONExtractArrayRaw(rl, 'scopeLogs') AS sl ARRAY JOIN JSONExtractArrayRaw(sl, 'logRecords') AS log_record_raw
WHERE JSONHas(raw_message, 'resourceLogs') AND JSONLength(JSONExtractArrayRaw(raw_message, 'resourceLogs')) > 0;
-- Target table for METRICS
CREATE TABLE IF NOT EXISTS ${CLICKHOUSE_DB}.otel_metrics (
arrival_time DateTime64(9) CODEC(Delta(8), ZSTD(1)), message_uuid UUID CODEC(ZSTD(1)), application_uuid LowCardinality(String) CODEC(ZSTD(1)),
service_name LowCardinality(String) CODEC(ZSTD(1)), metric_name String CODEC(ZSTD(1)), metric_description String CODEC(ZSTD(1)), metric_unit String CODEC(ZSTD(1)),
metric_type LowCardinality(String) CODEC(ZSTD(1)), dp_time DateTime64(9) CODEC(Delta(8), ZSTD(1)), dp_start_time DateTime64(9) CODEC(Delta(8), ZSTD(1)),
dp_value_float Float64 CODEC(Gorilla, ZSTD(1)), dp_value_int Int64 CODEC(T64, ZSTD(1)), dp_is_monotonic Bool CODEC(ZSTD(1)),
dp_aggregation_temporality LowCardinality(String) CODEC(ZSTD(1)), dp_attributes Map(String, String) CODEC(ZSTD(3))
) ENGINE = MergeTree() PARTITION BY toYYYYMM(arrival_time) ORDER BY (application_uuid, service_name, metric_name, metric_type, dp_time) SETTINGS index_granularity = 8192;
-- MV for metrics
CREATE MATERIALIZED VIEW IF NOT EXISTS ${CLICKHOUSE_DB}.otel_metrics_mv TO ${CLICKHOUSE_DB}.otel_metrics AS
SELECT arrival_time, message_uuid, application_uuid, service_name, metric_name, metric_description, metric_unit, 'Gauge' AS metric_type, dp_time, dp_start_time, dp_value_float, toInt64(0) AS dp_value_int, toBool(0) AS dp_is_monotonic, '' AS dp_aggregation_temporality, dp_attributes
FROM ( SELECT ifNull(fromUnixTimestamp64Nano(CAST(JSONExtractRaw(resource_attributes_raw, '[?]((@.key==\'otel_collector_arrival_unix_nano\').value.intValue)') AS Int64)), now64(9)) as arrival_time, ifNull(toUUID(JSONExtractString(resource_attributes_raw, '[?]((@.key==\'otel_collector_message_uuid\').value.stringValue)')), generateUUIDv4()) AS message_uuid, ${CLICKHOUSE_DB}.getResourceAttr(resource_attributes_raw, 'application_uuid') AS application_uuid, ${CLICKHOUSE_DB}.getResourceAttr(resource_attributes_raw, 'service.name') AS service_name, ifNull(JSONExtractString(metric_raw, 'name'), '') AS metric_name, ifNull(JSONExtractString(metric_raw, 'description'), '') AS metric_description, ifNull(JSONExtractString(metric_raw, 'unit'), '') AS metric_unit, fromUnixTimestamp64Nano(JSONExtractUInt(dp_raw, 'timeUnixNano')) AS dp_time, fromUnixTimestamp64Nano(JSONExtractUInt(dp_raw, 'startTimeUnixNano')) AS dp_start_time, JSONExtractFloat(dp_raw, 'value', 'asDouble') AS dp_value_float, ${CLICKHOUSE_DB}.getAttributesMap(JSONExtract(dp_raw, 'attributes')) AS dp_attributes FROM ${CLICKHOUSE_DB}.otel_kafka_queue CROSS JOIN (SELECT JSONExtract(raw_message, 'resource', 'attributes') AS resource_attributes_raw) AS res ARRAY JOIN JSONExtractArrayRaw(raw_message, 'resourceMetrics') AS rm ARRAY JOIN JSONExtractArrayRaw(rm, 'scopeMetrics') AS sm ARRAY JOIN JSONExtractArrayRaw(sm, 'metrics') AS metric_raw WHERE JSONHas(metric_raw, 'gauge') ARRAY JOIN JSONExtractArrayRaw(metric_raw, 'gauge', 'dataPoints') AS dp_raw )
UNION ALL
SELECT arrival_time, message_uuid, application_uuid, service_name, metric_name, metric_description, metric_unit, 'Sum' AS metric_type, dp_time, dp_start_time, toFloat64(0.0) AS dp_value_float, dp_value_int, dp_is_monotonic, dp_aggregation_temporality, dp_attributes
FROM ( SELECT ifNull(fromUnixTimestamp64Nano(CAST(JSONExtractRaw(resource_attributes_raw, '[?]((@.key==\'otel_collector_arrival_unix_nano\').value.intValue)') AS Int64)), now64(9)) as arrival_time, ifNull(toUUID(JSONExtractString(resource_attributes_raw, '[?]((@.key==\'otel_collector_message_uuid\').value.stringValue)')), generateUUIDv4()) AS message_uuid, ${CLICKHOUSE_DB}.getResourceAttr(resource_attributes_raw, 'application_uuid') AS application_uuid, ${CLICKHOUSE_DB}.getResourceAttr(resource_attributes_raw, 'service.name') AS service_name, ifNull(JSONExtractString(metric_raw, 'name'), '') AS metric_name, ifNull(JSONExtractString(metric_raw, 'description'), '') AS metric_description, ifNull(JSONExtractString(metric_raw, 'unit'), '') AS metric_unit, JSONExtractBool(metric_raw, 'sum', 'isMonotonic') AS dp_is_monotonic, JSONExtractString(metric_raw, 'sum', 'aggregationTemporality') AS dp_aggregation_temporality, fromUnixTimestamp64Nano(JSONExtractUInt(dp_raw, 'timeUnixNano')) AS dp_time, fromUnixTimestamp64Nano(JSONExtractUInt(dp_raw, 'startTimeUnixNano')) AS dp_start_time, JSONExtractInt(dp_raw, 'value', 'asInt') AS dp_value_int, ${CLICKHOUSE_DB}.getAttributesMap(JSONExtract(dp_raw, 'attributes')) AS dp_attributes FROM ${CLICKHOUSE_DB}.otel_kafka_queue CROSS JOIN (SELECT JSONExtract(raw_message, 'resource', 'attributes') AS resource_attributes_raw) AS res ARRAY JOIN JSONExtractArrayRaw(raw_message, 'resourceMetrics') AS rm ARRAY JOIN JSONExtractArrayRaw(rm, 'scopeMetrics') AS sm ARRAY JOIN JSONExtractArrayRaw(sm, 'metrics') AS metric_raw WHERE JSONHas(metric_raw, 'sum') ARRAY JOIN JSONExtractArrayRaw(metric_raw, 'sum', 'dataPoints') AS dp_raw );
EOSQL
echo "ClickHouse initialization script finished."
EOF
# Create dummy init-db.sql
touch clickhouse-init/init-db.sql
# Make init script executable
chmod +x clickhouse-init/init-db.sh
# Create build.sh
echo "Creating build.sh..."
cat << 'EOF' > build.sh
#!/bin/bash
echo "Pulling latest images (respecting platform setting)..."
docker-compose pull
echo "Build script finished."
EOF
# Create up.sh
echo "Creating up.sh..."
cat << 'EOF' > up.sh
#!/bin/bash
# Load .env file to make sure network name is correct in traefik.yml (if needed, though docker-compose handles it)
# source .env
echo "Starting services in detached mode..."
docker-compose up -d --remove-orphans
echo "Services started. Check status with 'docker-compose ps' or 'docker logs <container_name>'."
EOF
# Create down.sh
echo "Creating down.sh..."
cat << 'EOF' > down.sh
#!/bin/bash
echo "Stopping and removing containers, networks, and volumes..."
docker-compose down -v
echo "Cleanup complete."
EOF
# Make scripts executable
chmod +x build.sh up.sh down.sh
echo ""
echo "Setup complete!"
echo "Project created in: $(pwd)"
echo "Using Traefik as reverse proxy."
echo "Using Apache Kafka 3.7 (KRaft mode, PLAINTEXT auth)."
echo "OTEL Collector Kafka exporter includes retry settings."
echo "ClickHouse setup includes parsing for Spans, Logs, and Metrics (Gauge/Sum)."
echo "OTEL Collector is configured to extract API key from 'Authorization: Bearer <key>' header or 'api.key' resource attribute."
echo "Ensure you are in this directory before running scripts."
echo "Run the following commands:"
echo " ./build.sh # Optional: Pull images explicitly"
echo " ./down.sh # Ensure clean state if run previously"
echo " ./up.sh # Start the stack"
echo " ./down.sh # Stop and clean up the stack"
echo ""
echo "Endpoints:"
# Use Traefik env vars in output
echo " - OTLP/gRPC: localhost:${TRAEFIK_OTEL_GRPC_PORT:-4317} (Requires 'api.key' resource attribute if HTTP header not set)"
echo " - OTLP/HTTP: localhost:${TRAEFIK_OTEL_HTTP_PORT:-4318} (Requires 'Authorization: Bearer <your-api-key>')"
echo " - ClickHouse HTTP: http://localhost:8123 (user: ${CLICKHOUSE_USER:-otel_user})"
echo " - Kafka (PLAINTEXT): localhost:9092"
echo " - Traefik Dashboard: http://traefik.localhost:${TRAEFIK_DASHBOARD_PORT:-8080} (or http://localhost:${TRAEFIK_DASHBOARD_PORT:-8080})"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment