Skip to content

Instantly share code, notes, and snippets.

@codefromthecrypt
Created April 17, 2025 03:54
Show Gist options
  • Save codefromthecrypt/a1c8b4bc8017cdb0a43c7508e9b323a3 to your computer and use it in GitHub Desktop.
Save codefromthecrypt/a1c8b4bc8017cdb0a43c7508e9b323a3 to your computer and use it in GitHub Desktop.
Correlating OTLP traffic with Elasticsearch POSTS

rename elasticsearch to elasticsearch_real and change its port to 9201 rename otel-collector to otel-collector_real and change its port to 4319

add this container with the below flow.py. Then make sure you are sending with javascript as python doesn't support OTEL_EXPORTER_OTLP_PROTOCOL=http/json yet.

then, you can do docker compose logs elasticsearch and see the POSTs.

  elasticsearch:
    image: mitmproxy/mitmproxy
    container_name: elasticsearch
    user: "0:0" # Run as root
    command:
      - mitmdump
      - -q
      - --mode
      - upstream:http://elasticsearch_real:9201@9200
      - --mode
      - upstream:http://otel-collector_real:4319@4318
      - -s
      - /flow.py
    volumes:
      - ./flow.py:/flow.py:ro
    ports:
      - 9200:9200
      - 4318:4318
    depends_on:
      elasticsearch_real:
        condition: service_healthy
      otel-collector_real:
        condition: service_started
    networks:
      default:
        aliases:
          - otel-collector
    healthcheck:
      test: ["CMD-SHELL", "exit 0"]
      start_period: "1s"
from mitmproxy import http


def response(flow: http.HTTPFlow) -> None:
    if is_otlp_related(flow):
        request_str = format_request(flow)
        response_str = format_response(flow)
        print(f"\n---\n{request_str}\n{response_str}\n")

def is_otlp_related(flow: http.HTTPFlow) -> bool:
    if flow.request.headers.get("x-elastic-product-origin") == "kibana":
        return False
    return flow.request.method == "POST" and flow.request.path.startswith(("/_bulk", "/v1"))

def format_request(flow: http.HTTPFlow) -> str:
    method = flow.request.method
    path = flow.request.path
    http_version = flow.request.http_version
    headers = "\n".join(f"{k}: {v}" for k, v in flow.request.headers.items())
    body = (
        "\n\n" + flow.request.content.decode('utf-8', errors='replace')
        if flow.request.content else "\n"
    )
    return f"{method} {path} {http_version}\n{headers}{body}"

def format_response(flow: http.HTTPFlow) -> str:
    http_version = flow.response.http_version
    status_code = flow.response.status_code
    reason = flow.response.reason
    headers = "\n".join(f"{k}: {v}" for k, v in flow.response.headers.items())
    body = (
        "\n\n" + flow.response.content.decode('utf-8', errors='replace')
        if flow.response.content else "\n"
    )
    return f"{http_version} {status_code} {reason}\n{headers}{body}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment