Skip to content

Claude Code → OpenObserve

Monitor your team's Claude Code usage in OpenObserve using OpenTelemetry.

Claude Code exports three OTLP signals (metrics, events, and traces) and can send them straight to OpenObserve. There are two ways to set this up:

Approach Signals Setup Notes
Native OpenTelemetry (recommended) Metrics, events, and traces Environment variables, no code Traces are a beta feature; start here
Stop hook (alternative) Traces only A custom Python hook Per-turn traces without the beta flag, with full control over span shape

Both send data over OTLP and can run side by side. All telemetry goes only to the endpoint you configure. Nothing is sent to Anthropic.

Native OpenTelemetry monitoring

Claude Code is itself an OpenTelemetry (OTLP) client. Point it at OpenObserve's OTLP endpoint and it streams metrics, events, and (in beta) traces directly, with no custom hook and no OpenTelemetry Collector required.

What you get

Signal Examples Stored in OpenObserve as
Metrics session count, token usage, cost (USD), lines added/removed, commits, PRs, tool-accept/reject decisions, active time Metric streams (one per metric name)
Events (logs) user prompts, tool results, API requests, API errors, tool permission decisions A log stream you name
Traces (beta) per-turn span tree: interaction → LLM request + tool calls A traces stream

How it works

Claude Code (OTLP exporter)
  → metrics       → POST <openobserve>/api/<org>/v1/metrics
  → events        → POST <openobserve>/api/<org>/v1/logs
  → traces (beta) → POST <openobserve>/api/<org>/v1/traces
                        OpenObserve streams
                     (dashboards, alerts, search)

Claude Code exports on an interval (metrics every 60s, events every 5s by default), so data appears continuously while sessions are active.

Prerequisites

  • Claude Code installed and authenticated.
  • An OpenObserve instance (Cloud or self-hosted) and your login credentials.

No SDK to install and no collector to deploy.

Setup

1. Get your OpenObserve endpoint and auth token

OpenObserve accepts OTLP over HTTP at:

https://<your-openobserve-host>/api/<your-org>/v1/{metrics,logs,traces}

The OpenTelemetry exporter appends the /v1/metrics, /v1/logs, and /v1/traces suffixes itself, so you configure Claude Code with the base endpoint https://<your-openobserve-host>/api/<your-org> (no trailing slash).

Generate the Authorization value, which is Basic <base64(email:password)>:

echo -n 'your-email:your-password' | base64
# cm9vdEBleGFtcGxlLmNvbTpDb21wbGV4cGFzcyMxMjM=

Find your host and org

For OpenObserve Cloud, the host and organization are shown under Ingestion → Custom → OpenTelemetry in the UI. For self-hosted, the default host is http://localhost:5080 and the default org is default.

2. Configure Claude Code

Set the OpenTelemetry environment variables. The recommended place is the env block of ~/.claude/settings.json (persists across all projects); for a quick test you can export them in your shell instead.

Add to ~/.claude/settings.json:

{
  "env": {
    "CLAUDE_CODE_ENABLE_TELEMETRY": "1",
    "OTEL_METRICS_EXPORTER": "otlp",
    "OTEL_LOGS_EXPORTER": "otlp",
    "OTEL_TRACES_EXPORTER": "otlp",
    "CLAUDE_CODE_ENHANCED_TELEMETRY_BETA": "1",
    "OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
    "OTEL_EXPORTER_OTLP_ENDPOINT": "https://<your-openobserve-host>/api/<your-org>",
    "OTEL_EXPORTER_OTLP_HEADERS": "Authorization=Basic <base64-token>,stream-name=<your-stream>"
  }
}
export CLAUDE_CODE_ENABLE_TELEMETRY=1
export OTEL_METRICS_EXPORTER=otlp
export OTEL_LOGS_EXPORTER=otlp
export OTEL_TRACES_EXPORTER=otlp
export CLAUDE_CODE_ENHANCED_TELEMETRY_BETA=1
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
export OTEL_EXPORTER_OTLP_ENDPOINT="https://<your-openobserve-host>/api/<your-org>"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic <base64-token>,stream-name=<your-stream>"

Restart Claude Code (or open a new session) after changing settings.

No trailing slash on the endpoint

OTEL_EXPORTER_OTLP_ENDPOINT must not end with /. The exporter appends /v1/metrics, /v1/logs, and /v1/traces; a trailing slash produces a 404.

Core environment variables

Variable Description Example
CLAUDE_CODE_ENABLE_TELEMETRY Master switch (set to 1) 1
OTEL_METRICS_EXPORTER Metrics exporter otlp
OTEL_LOGS_EXPORTER Events/logs exporter otlp
OTEL_TRACES_EXPORTER Traces exporter (beta) otlp
CLAUDE_CODE_ENHANCED_TELEMETRY_BETA Required to enable traces (beta) 1
OTEL_EXPORTER_OTLP_PROTOCOL Transport: http/protobuf or grpc http/protobuf
OTEL_EXPORTER_OTLP_ENDPOINT OpenObserve base endpoint (no trailing slash) https://host/api/default
OTEL_EXPORTER_OTLP_HEADERS Comma-separated headers; include Authorization and a stream-name for events Authorization=Basic ...,stream-name=<your-stream>

Where events land

The stream-name header names the log and traces streams that events and spans are written to (<your-stream> is any name you choose, e.g. claude_code). Metrics ignore it: they're stored as separate metric streams named after each metric (e.g. claude_code.session.count).

Traces are beta

Trace export needs CLAUDE_CODE_ENHANCED_TELEMETRY_BETA=1 (metrics and logs do not) and is off in Claude Code by default; the span schema may still change. To capture per-turn traces without the beta flag, use the Stop hook instead.

Alternative: OTLP over gRPC

OpenObserve's gRPC endpoint listens on port 5081. To use it, switch the protocol and add the organization header:

export OTEL_EXPORTER_OTLP_PROTOCOL=grpc
export OTEL_EXPORTER_OTLP_ENDPOINT="https://<your-openobserve-host>:5081"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic <base64-token>,organization=<your-org>,stream-name=<your-stream>"

HTTP is simpler and works through most proxies; gRPC offers higher throughput. Both are fully supported.

3. Verify

  1. Run a Claude Code session and send it a prompt or two.
  2. Check the telemetry config is active:

    claude doctor
    
  3. In the OpenObserve UI:

    • Open Metrics and look for streams prefixed claude_code. (e.g. claude_code.token.usage).
    • Open Logs, select the <your-stream> stream, and you should see claude_code.user_prompt and claude_code.api_request events within a few seconds.
    • Open Traces, select the <your-stream> stream, and you should see per-turn claude_code.interaction spans.

If nothing appears, see Troubleshooting.

What gets exported

Metrics

Metric Unit Description Notable attributes
claude_code.session.count count CLI sessions started
claude_code.token.usage tokens Tokens consumed type (input, output, cacheRead, cacheCreation), model
claude_code.cost.usage USD Estimated session cost model
claude_code.lines_of_code.count count Lines changed type (added, removed)
claude_code.commit.count count Git commits created by Claude Code
claude_code.pull_request.count count Pull requests created
claude_code.code_edit_tool.decision count Edit/Write permission decisions tool_name, decision (accept, reject), language
claude_code.active_time.total s Active (non-idle) time

Cost is an estimate

claude_code.cost.usage is approximate and intended for trend analysis, not billing. Use your official billing source for invoicing.

Events (emitted when OTEL_LOGS_EXPORTER=otlp is set)

Event Fires when Notable attributes
claude_code.user_prompt User submits a prompt prompt_length, prompt (redacted by default)
claude_code.tool_result A tool finishes tool_name, success, duration_ms
claude_code.tool_decision A permission decision is made tool_name, decision, source
claude_code.api_request A request to the model completes model, cost_usd, duration_ms, input_tokens, output_tokens
claude_code.api_error A model request fails model, status_code, error, duration_ms

Claude Code emits additional events (MCP connections, compaction, auth, plugin/skill activity, and more). All events share standard attributes such as session.id, organization.id, user.email (when authenticated), and terminal.type.

Traces (beta, emitted when OTEL_TRACES_EXPORTER=otlp and CLAUDE_CODE_ENHANCED_TELEMETRY_BETA=1 are set)

Each user turn becomes a claude_code.interaction root span, with a claude_code.llm_request child per model call (tokens, time-to-first-token, stop_reason) and a claude_code.tool child per tool call. Spans link across subprocesses and outbound API calls via W3C trace context, and land in the <your-stream> traces stream.

Privacy and cardinality

Sensitive content is redacted by default. Opt in only what you need:

Variable Effect Default
OTEL_LOG_USER_PROMPTS Include the full prompt text in user_prompt events off (length only)
OTEL_LOG_TOOL_DETAILS Include tool parameters and command names off
OTEL_LOG_TOOL_CONTENT Include tool input/output in trace spans (requires traces) off
OTEL_METRICS_INCLUDE_SESSION_ID Add session.id to metric attributes true
OTEL_METRICS_INCLUDE_VERSION Add app.version to metric attributes false
OTEL_METRICS_INCLUDE_ACCOUNT_UUID Add account UUID to metric attributes true

Prompt logging carries PII risk

Leave OTEL_LOG_USER_PROMPTS off unless you have a deliberate reason to capture prompt text, and ensure your OpenObserve retention and access controls match your data-handling policy.

For high-volume fleets, dropping high-cardinality attributes (e.g. session ID) keeps metric stream sizes manageable.

Org-wide rollout

To enable monitoring for an entire team, distribute the same env block via managed settings so it can't be turned off by individual users. See Anthropic's managed settings for how to push settings.json through MDM/device management. The OpenObserve configuration is identical to the per-user setup above.

Route through an OpenTelemetry Collector (optional)

Pointing Claude Code straight at OpenObserve is the simplest path. If you already run an OpenTelemetry Collector for buffering, retries, or fan-out to multiple backends, send Claude Code to the collector instead and configure an OpenObserve exporter there:

exporters:
  otlphttp/openobserve:
    endpoint: https://<your-openobserve-host>/api/<your-org>
    headers:
      Authorization: Basic <base64-token>
      stream-name: <your-stream>

service:
  pipelines:
    metrics:
      receivers: [otlp]
      exporters: [otlphttp/openobserve]
    logs:
      receivers: [otlp]
      exporters: [otlphttp/openobserve]

Then set OTEL_EXPORTER_OTLP_ENDPOINT on Claude Code to your collector's address (commonly http://localhost:4318 for HTTP).

Troubleshooting

Common issues and quick checks (click to expand)

For faster feedback while testing, shorten the export intervals so data shows up almost immediately, then reset for production:

export OTEL_METRIC_EXPORT_INTERVAL=10000   # 10s (default 60000)
export OTEL_LOGS_EXPORT_INTERVAL=5000      # 5s  (default 5000)

Common issues

Symptom Cause Fix
No data at all CLAUDE_CODE_ENABLE_TELEMETRY not set Set it to 1 and start a new session
404 on export Trailing slash on the endpoint Remove the trailing /; use .../api/<org>
401/403 errors Wrong auth header Re-encode email:password and use Authorization=Basic <base64>
Metrics but no events OTEL_LOGS_EXPORTER unset Set OTEL_LOGS_EXPORTER=otlp
Events go to the wrong stream Missing stream-name header Add stream-name=<your-stream> to OTEL_EXPORTER_OTLP_HEADERS
Config not applied Old session still running Restart Claude Code after editing settings.json

Run claude doctor to confirm the telemetry configuration Claude Code has loaded.

Per-turn tracing via a Stop hook

This is an alternative to Claude Code's native trace export above. It produces the same per-turn span tree using a custom hook. Use it when you want traces without enabling the beta flag, or when you need full control over the span shape and attributes.

Claude Code supports hooks, shell commands that run at specific lifecycle points. This approach uses the Stop hook, which fires after every Claude Code response.

When triggered, the hook:

  1. Reads a JSON payload from stdin containing session_id and transcript_path
  2. Incrementally reads new JSONL entries from the transcript file (only new bytes since last run)
  3. Groups messages into turns (user → assistant → tool calls/results)
  4. Exports each turn as an OpenTelemetry trace to OpenObserve, with child spans for LLM generation and tool calls
Stop hook fires
  → Read stdin payload (session_id, transcript_path)
  → Read new transcript lines (incremental, offset-based)
  → Group into turns
  → Export as OTel traces to OpenObserve

Prerequisites

pip install openobserve-telemetry-sdk

Setup

1. Register the hook globally

Add the Stop hook to ~/.claude/settings.json:

{
  "hooks": {
    "Stop": [
      {
        "hooks": [
          {
            "type": "command",
            "command": "python3 ~/.claude/hooks/openobserve_hooks.py"
          }
        ]
      }
    ]
  }
}

This makes the hook run after every Claude Code response across all projects.

2. Configure environment variables

Set the required environment variables at the global level (all projects) or per-project level.

Add env vars to ~/.claude/settings.json alongside the hook definition:

{
  "env": {
    "TRACE_TO_OPENOBSERVE": "true",
    "OPENOBSERVE_URL": "http://localhost:5080",
    "OPENOBSERVE_ORG": "default",
    "OPENOBSERVE_AUTH_TOKEN": "Basic <base64-encoded user:password>"
  },
  "hooks": {
    "Stop": [
      {
        "hooks": [
          {
            "type": "command",
            "command": "python3 ~/.claude/hooks/openobserve_hooks.py"
          }
        ]
      }
    ]
  }
}

Enable tracing selectively by adding env vars to each project's .claude/settings.local.json:

{
  "env": {
    "TRACE_TO_OPENOBSERVE": "true",
    "OPENOBSERVE_URL": "http://localhost:5080",
    "OPENOBSERVE_ORG": "default",
    "OPENOBSERVE_AUTH_TOKEN": "Basic <base64-encoded user:password>",
    "CC_OPENOBSERVE_DEBUG": "true"
  }
}

Environment variables

Variable Description Required Default
TRACE_TO_OPENOBSERVE Set to "true" to enable tracing Yes
OPENOBSERVE_URL OpenObserve base URL Yes
OPENOBSERVE_ORG OpenObserve organization Yes
OPENOBSERVE_AUTH_TOKEN Basic <base64> or Bearer token Yes
OPENOBSERVE_TRACES_STREAM_NAME Target stream name No "default"
OPENOBSERVE_PROTOCOL "http/protobuf" or "grpc" No "http/protobuf"
OPENOBSERVE_USER_ID User identifier (added as resource attribute) No None
CC_OPENOBSERVE_DEBUG Set to "true" for verbose logging No "false"
CC_OPENOBSERVE_MAX_CHARS Max characters per text field before truncation No 20000

Generating the auth token. The token is a Base64-encoded email:password string. For example, with root@example.com / Complexpass#123:

echo -n "root@example.com:Complexpass#123" | base64
# cm9vdEBleGFtcGxlLmNvbTpDb21wbGV4cGFzcyMxMjM=

Then set OPENOBSERVE_AUTH_TOKEN to Basic cm9vdEBleGFtcGxlLmNvbTpDb21wbGV4cGFzcyMxMjM=.

Troubleshooting

Diagnostics and common issues (click to expand)

Check the log file

tail -f ~/.claude/state/openobserve_hook.log

Enable debug logging by setting CC_OPENOBSERVE_DEBUG=true in your project's env config.

Test the hook manually

echo '{"session_id":"test","transcript_path":"/path/to/transcript.jsonl"}' | \
  TRACE_TO_OPENOBSERVE=true \
  OPENOBSERVE_URL=http://localhost:5080 \
  OPENOBSERVE_ORG=default \
  OPENOBSERVE_AUTH_TOKEN="Basic ..." \
  python3 ~/.claude/hooks/openobserve_hooks.py

Common issues

Symptom Cause Fix
No traces appear TRACE_TO_OPENOBSERVE not set Add env vars to .claude/settings.local.json
Hook silently exits Missing openobserve-telemetry-sdk Run pip install openobserve-telemetry-sdk
Auth errors in log Wrong token format Ensure token is Basic <base64> format
Partial traces OpenObserve unreachable Verify OPENOBSERVE_URL and that the service is running

State files. The hook maintains incremental state in ~/.claude/state/:

  • openobserve_state.json: per-session offsets and turn counts
  • openobserve_state.lock: file lock for concurrent access
  • openobserve_hook.log: debug and info log

Limitations. System prompts are not included in Claude Code's conversation transcripts, so they are not part of the trace.

Hook source code

The complete openobserve_hooks.py script is included below. Save it to ~/.claude/hooks/openobserve_hooks.py.

openobserve_hooks.py (click to expand)
#!/usr/bin/env python3
"""
Claude Code -> OpenObserve hook

Sends trace data from Claude Code sessions to OpenObserve via OpenTelemetry.

Environment variables:
    TRACE_TO_OPENOBSERVE=true          Enable tracing
    OPENOBSERVE_URL                    OpenObserve base URL
    OPENOBSERVE_ORG                    OpenObserve organization
    OPENOBSERVE_AUTH_TOKEN             Authorization token
    OPENOBSERVE_TRACES_STREAM_NAME     Stream name (default: "default")
    OPENOBSERVE_PROTOCOL               "http/protobuf" or "grpc" (default: "http/protobuf")
    OPENOBSERVE_USER_ID                User identifier (default: None, added as resource attribute)
    CC_OPENOBSERVE_DEBUG=true          Enable debug logging
    CC_OPENOBSERVE_MAX_CHARS=20000     Max chars per text field
"""

import json
import os
import socket
import sys
import time
import hashlib
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

# --- OpenObserve / OpenTelemetry import (fail-open) ---
try:
    from openobserve import openobserve_init_traces, openobserve_flush, openobserve_shutdown
    from opentelemetry import trace
except Exception:
    sys.exit(0)

# --- Paths ---
STATE_DIR = Path.home() / ".claude" / "state"
LOG_FILE = STATE_DIR / "openobserve_hook.log"
STATE_FILE = STATE_DIR / "openobserve_state.json"
LOCK_FILE = STATE_DIR / "openobserve_state.lock"

DEBUG = os.environ.get("CC_OPENOBSERVE_DEBUG", "").lower() == "true"
MAX_CHARS = int(os.environ.get("CC_OPENOBSERVE_MAX_CHARS", "20000"))
USER_ID = os.environ.get("OPENOBSERVE_USER_ID") or None
HOSTNAME = socket.gethostname()

# ----------------- Logging -----------------
def _log(level: str, message: str) -> None:
    try:
        STATE_DIR.mkdir(parents=True, exist_ok=True)
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        with open(LOG_FILE, "a", encoding="utf-8") as f:
            f.write(f"{ts} [{level}] {message}\n")
    except Exception:
        # Never block
        pass

def debug(msg: str) -> None:
    if DEBUG:
        _log("DEBUG", msg)

def info(msg: str) -> None:
    _log("INFO", msg)

def warn(msg: str) -> None:
    _log("WARN", msg)

def error(msg: str) -> None:
    _log("ERROR", msg)

# ----------------- State locking (best-effort) -----------------
class FileLock:
    def __init__(self, path: Path, timeout_s: float = 2.0):
        self.path = path
        self.timeout_s = timeout_s
        self._fh = None

    def __enter__(self):
        STATE_DIR.mkdir(parents=True, exist_ok=True)
        self._fh = open(self.path, "a+", encoding="utf-8")
        try:
            import fcntl  # Unix only
            deadline = time.time() + self.timeout_s
            while True:
                try:
                    fcntl.flock(self._fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
                    break
                except BlockingIOError:
                    if time.time() > deadline:
                        break
                    time.sleep(0.05)
        except Exception:
            # If locking isn't available, proceed without it.
            pass
        return self

    def __exit__(self, exc_type, exc, tb):
        if self._fh is None:
            return
        try:
            import fcntl
            fcntl.flock(self._fh.fileno(), fcntl.LOCK_UN)
        except Exception:
            pass
        try:
            self._fh.close()
        except Exception:
            pass

def load_state() -> Dict[str, Any]:
    try:
        if not STATE_FILE.exists():
            return {}
        return json.loads(STATE_FILE.read_text(encoding="utf-8"))
    except Exception:
        return {}

def save_state(state: Dict[str, Any]) -> None:
    try:
        STATE_DIR.mkdir(parents=True, exist_ok=True)
        tmp = STATE_FILE.with_suffix(".tmp")
        tmp.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
        os.replace(tmp, STATE_FILE)
    except Exception as e:
        debug(f"save_state failed: {e}")

STATE_MAX_AGE_DAYS = 7

def prune_stale_entries(state: Dict[str, Any]) -> Dict[str, Any]:
    """Remove state entries older than STATE_MAX_AGE_DAYS."""
    now = datetime.now(timezone.utc)
    pruned: Dict[str, Any] = {}
    for key, val in state.items():
        updated = val.get("updated") if isinstance(val, dict) else None
        if updated:
            try:
                ts = datetime.fromisoformat(updated)
                if (now - ts).days > STATE_MAX_AGE_DAYS:
                    continue
            except Exception:
                pass
        pruned[key] = val
    return pruned

def state_key(session_id: str, transcript_path: str) -> str:
    # stable key even if session_id collides
    raw = f"{session_id}::{transcript_path}"
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()

# ----------------- Hook payload -----------------
def read_hook_payload() -> Dict[str, Any]:
    """
    Claude Code hooks pass a JSON payload on stdin.
    This script tolerates missing/empty stdin by returning {}.
    """
    try:
        data = sys.stdin.read()
        if not data.strip():
            return {}
        return json.loads(data)
    except Exception:
        return {}

def extract_session_and_transcript(payload: Dict[str, Any]) -> Tuple[Optional[str], Optional[Path]]:
    """
    Hook payload uses snake_case: session_id, transcript_path.
    """
    session_id = (
        payload.get("session_id")
        or payload.get("sessionId")
    )

    transcript = (
        payload.get("transcript_path")
        or payload.get("transcriptPath")
    )

    if transcript:
        try:
            transcript_path = Path(transcript).expanduser().resolve()
        except Exception:
            transcript_path = None
    else:
        transcript_path = None

    return session_id, transcript_path

# ----------------- Transcript parsing helpers -----------------
def get_content(msg: Dict[str, Any]) -> Any:
    if not isinstance(msg, dict):
        return None
    if "message" in msg and isinstance(msg.get("message"), dict):
        return msg["message"].get("content")
    return msg.get("content")

def get_role(msg: Dict[str, Any]) -> Optional[str]:
    # Claude Code transcript lines commonly have type=user/assistant OR message.role
    t = msg.get("type")
    if t in ("user", "assistant"):
        return t
    m = msg.get("message")
    if isinstance(m, dict):
        r = m.get("role")
        if r in ("user", "assistant"):
            return r
    return None

def is_tool_result(msg: Dict[str, Any]) -> bool:
    role = get_role(msg)
    if role != "user":
        return False
    content = get_content(msg)
    if isinstance(content, list):
        return any(isinstance(x, dict) and x.get("type") == "tool_result" for x in content)
    return False

def iter_tool_results(content: Any) -> List[Dict[str, Any]]:
    out: List[Dict[str, Any]] = []
    if isinstance(content, list):
        for x in content:
            if isinstance(x, dict) and x.get("type") == "tool_result":
                out.append(x)
    return out

def iter_tool_uses(content: Any) -> List[Dict[str, Any]]:
    out: List[Dict[str, Any]] = []
    if isinstance(content, list):
        for x in content:
            if isinstance(x, dict) and x.get("type") == "tool_use":
                out.append(x)
    return out

def extract_text(content: Any) -> str:
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        parts: List[str] = []
        for x in content:
            if isinstance(x, dict) and x.get("type") == "text":
                parts.append(x.get("text", ""))
            elif isinstance(x, str):
                parts.append(x)
        return "\n".join([p for p in parts if p])
    return ""

def truncate_text(s: str, max_chars: int = MAX_CHARS) -> Tuple[str, Dict[str, Any]]:
    if s is None:
        return "", {"truncated": False, "orig_len": 0}
    orig_len = len(s)
    if orig_len <= max_chars:
        return s, {"truncated": False, "orig_len": orig_len}
    head = s[:max_chars]
    return head, {"truncated": True, "orig_len": orig_len, "kept_len": len(head), "sha256": hashlib.sha256(s.encode("utf-8")).hexdigest()}

def get_model(msg: Dict[str, Any]) -> str:
    m = msg.get("message")
    if isinstance(m, dict):
        return m.get("model") or "claude"
    return "claude"

def get_message_id(msg: Dict[str, Any]) -> Optional[str]:
    m = msg.get("message")
    if isinstance(m, dict):
        mid = m.get("id")
        if isinstance(mid, str) and mid:
            return mid
    return None

def get_usage(msg: Dict[str, Any]) -> Dict[str, Any]:
    """Extract usage/token data from an assistant transcript message."""
    m = msg.get("message")
    if isinstance(m, dict):
        return m.get("usage") or {}
    return {}

# ----------------- Incremental reader -----------------
@dataclass
class SessionState:
    offset: int = 0
    buffer: str = ""
    turn_count: int = 0

def load_session_state(global_state: Dict[str, Any], key: str) -> SessionState:
    s = global_state.get(key, {})
    return SessionState(
        offset=int(s.get("offset", 0)),
        buffer=str(s.get("buffer", "")),
        turn_count=int(s.get("turn_count", 0)),
    )

def write_session_state(global_state: Dict[str, Any], key: str, ss: SessionState) -> None:
    global_state[key] = {
        "offset": ss.offset,
        "buffer": ss.buffer,
        "turn_count": ss.turn_count,
        "updated": datetime.now(timezone.utc).isoformat(),
    }

def read_new_jsonl(transcript_path: Path, ss: SessionState) -> Tuple[List[Dict[str, Any]], SessionState]:
    """
    Reads only new bytes since ss.offset. Keeps ss.buffer for partial last line.
    Returns parsed JSON lines (best-effort) and updated state.
    """
    if not transcript_path.exists():
        return [], ss

    try:
        with open(transcript_path, "rb") as f:
            f.seek(ss.offset)
            chunk = f.read()
            new_offset = f.tell()
    except Exception as e:
        debug(f"read_new_jsonl failed: {e}")
        return [], ss

    if not chunk:
        return [], ss

    text = chunk.decode("utf-8", errors="replace")

    combined = ss.buffer + text
    lines = combined.split("\n")
    # last element may be incomplete
    ss.buffer = lines[-1]
    ss.offset = new_offset

    msgs: List[Dict[str, Any]] = []
    for line in lines[:-1]:
        line = line.strip()
        if not line:
            continue
        try:
            msgs.append(json.loads(line))
        except Exception:
            continue

    return msgs, ss

# ----------------- Turn assembly -----------------
@dataclass
class Turn:
    user_msg: Dict[str, Any]
    assistant_msgs: List[Dict[str, Any]]
    tool_results_by_id: Dict[str, Any]

def build_turns(messages: List[Dict[str, Any]]) -> List[Turn]:
    """
    Groups incremental transcript rows into turns:
    user (non-tool-result) -> assistant messages -> (tool_result rows, possibly interleaved)
    Uses:
    - assistant message dedupe by message.id (latest row wins)
    - tool results dedupe by tool_use_id (latest wins)
    """
    turns: List[Turn] = []
    current_user: Optional[Dict[str, Any]] = None

    # assistant messages for current turn:
    assistant_order: List[str] = []             # message ids in order of first appearance (or synthetic)
    assistant_latest: Dict[str, Dict[str, Any]] = {}  # id -> latest msg

    tool_results_by_id: Dict[str, Any] = {}     # tool_use_id -> content

    def flush_turn():
        nonlocal current_user, assistant_order, assistant_latest, tool_results_by_id, turns
        if current_user is None:
            return
        if not assistant_latest:
            return
        assistants = [assistant_latest[mid] for mid in assistant_order if mid in assistant_latest]
        turns.append(Turn(user_msg=current_user, assistant_msgs=assistants, tool_results_by_id=dict(tool_results_by_id)))

    for msg in messages:
        role = get_role(msg)

        # tool_result rows show up as role=user with content blocks of type tool_result
        if is_tool_result(msg):
            for tr in iter_tool_results(get_content(msg)):
                tid = tr.get("tool_use_id")
                if tid:
                    tool_results_by_id[str(tid)] = tr.get("content")
            continue

        if role == "user":
            # new user message -> finalize previous turn
            flush_turn()

            # start a new turn
            current_user = msg
            assistant_order = []
            assistant_latest = {}
            tool_results_by_id = {}
            continue

        if role == "assistant":
            if current_user is None:
                # ignore assistant rows until we see a user message
                continue

            mid = get_message_id(msg) or f"noid:{len(assistant_order)}"
            if mid not in assistant_latest:
                assistant_order.append(mid)
            assistant_latest[mid] = msg
            continue

        # ignore unknown rows

    # flush last
    flush_turn()
    return turns

# ----------------- OpenObserve emit via OpenTelemetry -----------------
def _tool_calls_from_assistants(assistant_msgs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    calls: List[Dict[str, Any]] = []
    for am in assistant_msgs:
        for tu in iter_tool_uses(get_content(am)):
            tid = tu.get("id") or ""
            calls.append({
                "id": str(tid),
                "name": tu.get("name") or "unknown",
                "input": tu.get("input") if isinstance(tu.get("input"), (dict, list, str, int, float, bool)) else {},
            })
    return calls

def emit_turn(tracer: trace.Tracer, session_id: str, turn_num: int, turn: Turn, transcript_path: Path) -> None:
    user_text_raw = extract_text(get_content(turn.user_msg))
    user_text, user_text_meta = truncate_text(user_text_raw)

    last_assistant = turn.assistant_msgs[-1]
    assistant_text_raw = extract_text(get_content(last_assistant))
    assistant_text, assistant_text_meta = truncate_text(assistant_text_raw)

    model = get_model(turn.assistant_msgs[0])

    tool_calls = _tool_calls_from_assistants(turn.assistant_msgs)

    # Token usage from the last (final) assistant message in this turn
    usage = get_usage(last_assistant)

    # attach tool outputs
    for c in tool_calls:
        if c["id"] and c["id"] in turn.tool_results_by_id:
            out_raw = turn.tool_results_by_id[c["id"]]
            out_str = out_raw if isinstance(out_raw, str) else json.dumps(out_raw, ensure_ascii=False)
            out_trunc, out_meta = truncate_text(out_str)
            c["output"] = out_trunc
            c["output_meta"] = out_meta
        else:
            c["output"] = None

    # Root span for the turn (summary only — details on children)
    with tracer.start_as_current_span(
        name=f"Claude Code - Turn {turn_num}",
        attributes={
            "claude_code.source": "claude-code",
            "session.id": session_id,
            "claude_code.turn_number": turn_num,
            "claude_code.transcript_path": str(transcript_path),
            "host.name": HOSTNAME,
            "llm.model": model,
            "gen_ai.provider.name": "anthropic",
            "gen_ai.operation.name": "chat",
            "claude_code.tool_count": len(tool_calls),
        },
    ):
        # LLM generation child span (carries full message & usage details)
        with tracer.start_as_current_span(
            name="Claude Response",
            attributes={
                "gen_ai.provider.name": "anthropic",
                "gen_ai.request.model": model,
                "gen_ai.operation.name": "chat",
                "gen_ai.input.messages": user_text,
                "input.truncated": user_text_meta.get("truncated", False),
                "input.orig_len": user_text_meta.get("orig_len", 0),
                "gen_ai.output.messages": assistant_text,
                "output.truncated": assistant_text_meta.get("truncated", False),
                "output.orig_len": assistant_text_meta.get("orig_len", 0),
                "claude_code.tool_count": len(tool_calls),
                "host.name": HOSTNAME,
                "gen_ai.usage.input_tokens": usage.get("input_tokens", 0) or 0,
                "gen_ai.usage.output_tokens": usage.get("output_tokens", 0) or 0,
                "gen_ai.usage.cache_read_tokens": usage.get("cache_read_input_tokens", 0) or 0,
                "gen_ai.usage.cache_write_tokens": usage.get("cache_creation_input_tokens", 0) or 0,
            },
        ):
            pass

        # Tool child spans
        for tc in tool_calls:
            in_obj = tc["input"]
            if isinstance(in_obj, str):
                in_str, _ = truncate_text(in_obj)
            elif isinstance(in_obj, (dict, list)):
                in_str, _ = truncate_text(json.dumps(in_obj, ensure_ascii=False))
            else:
                in_str = str(in_obj)

            out_str = tc.get("output") or ""

            with tracer.start_as_current_span(
                name=f"Tool: {tc['name']}",
                attributes={
                    "gen_ai.tool.name": tc["name"],
                    "gen_ai.tool.call.id": tc["id"],
                    "gen_ai.tool.call.arguments": in_str,
                    "gen_ai.tool.call.result": out_str,
                    "host.name": HOSTNAME,
                },
            ):
                pass

# ----------------- Main -----------------
def main() -> int:
    start = time.time()
    debug("Hook started")

    if os.environ.get("TRACE_TO_OPENOBSERVE", "").lower() != "true":
        return 0

    # Validate required env vars are present (openobserve SDK reads them itself)
    if not os.environ.get("OPENOBSERVE_AUTH_TOKEN"):
        debug("OPENOBSERVE_AUTH_TOKEN not set; exiting.")
        return 0

    payload = read_hook_payload()
    session_id, transcript_path = extract_session_and_transcript(payload)

    if not session_id or not transcript_path:
        debug("Missing session_id or transcript_path from hook payload; exiting.")
        return 0

    if not transcript_path.exists():
        debug(f"Transcript path does not exist: {transcript_path}")
        return 0

    try:
        resource_attrs = {
            "service.name": "claude-code",
            "session.id": session_id,
            "host.name": HOSTNAME,
        }
        if USER_ID:
            resource_attrs["user.id"] = USER_ID
        openobserve_init_traces(
            resource_attributes=resource_attrs,
        )
        tracer = trace.get_tracer("claude-code-hook")
    except Exception as e:
        debug(f"Failed to initialize OpenObserve traces: {e}")
        return 0

    # From here on, openobserve was initialized — ensure shutdown in finally.
    try:
        emitted = 0
        with FileLock(LOCK_FILE):
            state = prune_stale_entries(load_state())
            key = state_key(session_id, str(transcript_path))
            ss = load_session_state(state, key)

            msgs, ss = read_new_jsonl(transcript_path, ss)
            if not msgs:
                write_session_state(state, key, ss)
                save_state(state)
                return 0

            turns = build_turns(msgs)
            if not turns:
                write_session_state(state, key, ss)
                save_state(state)
                return 0

            for t in turns:
                emitted += 1
                turn_num = ss.turn_count + emitted
                try:
                    emit_turn(tracer, session_id, turn_num, t, transcript_path)
                except Exception as e:
                    debug(f"emit_turn failed: {e}")

            ss.turn_count += emitted
            write_session_state(state, key, ss)
            save_state(state)

        try:
            openobserve_flush()
        except Exception:
            pass

        dur = time.time() - start
        info(f"Processed {emitted} turns in {dur:.2f}s (session={session_id})")
        return 0

    except Exception as e:
        debug(f"Unexpected failure: {e}")
        return 0

    finally:
        try:
            openobserve_shutdown()
        except Exception:
            pass

if __name__ == "__main__":
    sys.exit(main())

Need some help?