Claude Code → OpenObserve
Monitor your team's Claude Code usage in OpenObserve using OpenTelemetry.
Claude Code exports three OTLP signals (metrics, events, and traces) and can send them straight to OpenObserve. There are two ways to set this up:
| Approach | Signals | Setup | Notes |
|---|---|---|---|
| Native OpenTelemetry (recommended) | Metrics, events, and traces | Environment variables, no code | Traces are a beta feature; start here |
| Stop hook (alternative) | Traces only | A custom Python hook | Per-turn traces without the beta flag, with full control over span shape |
Both send data over OTLP and can run side by side. All telemetry goes only to the endpoint you configure. Nothing is sent to Anthropic.
Native OpenTelemetry monitoring
Claude Code is itself an OpenTelemetry (OTLP) client. Point it at OpenObserve's OTLP endpoint and it streams metrics, events, and (in beta) traces directly, with no custom hook and no OpenTelemetry Collector required.
What you get
| Signal | Examples | Stored in OpenObserve as |
|---|---|---|
| Metrics | session count, token usage, cost (USD), lines added/removed, commits, PRs, tool-accept/reject decisions, active time | Metric streams (one per metric name) |
| Events (logs) | user prompts, tool results, API requests, API errors, tool permission decisions | A log stream you name |
| Traces (beta) | per-turn span tree: interaction → LLM request + tool calls | A traces stream |
How it works
Claude Code (OTLP exporter)
→ metrics → POST <openobserve>/api/<org>/v1/metrics
→ events → POST <openobserve>/api/<org>/v1/logs
→ traces (beta) → POST <openobserve>/api/<org>/v1/traces
↓
OpenObserve streams
(dashboards, alerts, search)
Claude Code exports on an interval (metrics every 60s, events every 5s by default), so data appears continuously while sessions are active.
Prerequisites
- Claude Code installed and authenticated.
- An OpenObserve instance (Cloud or self-hosted) and your login credentials.
No SDK to install and no collector to deploy.
Setup
1. Get your OpenObserve endpoint and auth token
OpenObserve accepts OTLP over HTTP at:
The OpenTelemetry exporter appends the /v1/metrics, /v1/logs, and /v1/traces suffixes itself, so you configure Claude Code with the base endpoint https://<your-openobserve-host>/api/<your-org> (no trailing slash).
Generate the Authorization value, which is Basic <base64(email:password)>:
Find your host and org
For OpenObserve Cloud, the host and organization are shown under Ingestion → Custom → OpenTelemetry in the UI. For self-hosted, the default host is http://localhost:5080 and the default org is default.
2. Configure Claude Code
Set the OpenTelemetry environment variables. The recommended place is the env block of ~/.claude/settings.json (persists across all projects); for a quick test you can export them in your shell instead.
Add to ~/.claude/settings.json:
{
"env": {
"CLAUDE_CODE_ENABLE_TELEMETRY": "1",
"OTEL_METRICS_EXPORTER": "otlp",
"OTEL_LOGS_EXPORTER": "otlp",
"OTEL_TRACES_EXPORTER": "otlp",
"CLAUDE_CODE_ENHANCED_TELEMETRY_BETA": "1",
"OTEL_EXPORTER_OTLP_PROTOCOL": "http/protobuf",
"OTEL_EXPORTER_OTLP_ENDPOINT": "https://<your-openobserve-host>/api/<your-org>",
"OTEL_EXPORTER_OTLP_HEADERS": "Authorization=Basic <base64-token>,stream-name=<your-stream>"
}
}
export CLAUDE_CODE_ENABLE_TELEMETRY=1
export OTEL_METRICS_EXPORTER=otlp
export OTEL_LOGS_EXPORTER=otlp
export OTEL_TRACES_EXPORTER=otlp
export CLAUDE_CODE_ENHANCED_TELEMETRY_BETA=1
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
export OTEL_EXPORTER_OTLP_ENDPOINT="https://<your-openobserve-host>/api/<your-org>"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic <base64-token>,stream-name=<your-stream>"
Restart Claude Code (or open a new session) after changing settings.
No trailing slash on the endpoint
OTEL_EXPORTER_OTLP_ENDPOINT must not end with /. The exporter appends /v1/metrics, /v1/logs, and /v1/traces; a trailing slash produces a 404.
Core environment variables
| Variable | Description | Example |
|---|---|---|
CLAUDE_CODE_ENABLE_TELEMETRY |
Master switch (set to 1) |
1 |
OTEL_METRICS_EXPORTER |
Metrics exporter | otlp |
OTEL_LOGS_EXPORTER |
Events/logs exporter | otlp |
OTEL_TRACES_EXPORTER |
Traces exporter (beta) | otlp |
CLAUDE_CODE_ENHANCED_TELEMETRY_BETA |
Required to enable traces (beta) | 1 |
OTEL_EXPORTER_OTLP_PROTOCOL |
Transport: http/protobuf or grpc |
http/protobuf |
OTEL_EXPORTER_OTLP_ENDPOINT |
OpenObserve base endpoint (no trailing slash) | https://host/api/default |
OTEL_EXPORTER_OTLP_HEADERS |
Comma-separated headers; include Authorization and a stream-name for events |
Authorization=Basic ...,stream-name=<your-stream> |
Where events land
The stream-name header names the log and traces streams that events and spans are written to (<your-stream> is any name you choose, e.g. claude_code). Metrics ignore it: they're stored as separate metric streams named after each metric (e.g. claude_code.session.count).
Traces are beta
Trace export needs CLAUDE_CODE_ENHANCED_TELEMETRY_BETA=1 (metrics and logs do not) and is off in Claude Code by default; the span schema may still change. To capture per-turn traces without the beta flag, use the Stop hook instead.
Alternative: OTLP over gRPC
OpenObserve's gRPC endpoint listens on port 5081. To use it, switch the protocol and add the organization header:
export OTEL_EXPORTER_OTLP_PROTOCOL=grpc
export OTEL_EXPORTER_OTLP_ENDPOINT="https://<your-openobserve-host>:5081"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic <base64-token>,organization=<your-org>,stream-name=<your-stream>"
HTTP is simpler and works through most proxies; gRPC offers higher throughput. Both are fully supported.
3. Verify
- Run a Claude Code session and send it a prompt or two.
-
Check the telemetry config is active:
-
In the OpenObserve UI:
- Open Metrics and look for streams prefixed
claude_code.(e.g.claude_code.token.usage). - Open Logs, select the
<your-stream>stream, and you should seeclaude_code.user_promptandclaude_code.api_requestevents within a few seconds. - Open Traces, select the
<your-stream>stream, and you should see per-turnclaude_code.interactionspans.
- Open Metrics and look for streams prefixed
If nothing appears, see Troubleshooting.
What gets exported
Metrics
| Metric | Unit | Description | Notable attributes |
|---|---|---|---|
claude_code.session.count |
count | CLI sessions started | |
claude_code.token.usage |
tokens | Tokens consumed | type (input, output, cacheRead, cacheCreation), model |
claude_code.cost.usage |
USD | Estimated session cost | model |
claude_code.lines_of_code.count |
count | Lines changed | type (added, removed) |
claude_code.commit.count |
count | Git commits created by Claude Code | |
claude_code.pull_request.count |
count | Pull requests created | |
claude_code.code_edit_tool.decision |
count | Edit/Write permission decisions | tool_name, decision (accept, reject), language |
claude_code.active_time.total |
s | Active (non-idle) time |
Cost is an estimate
claude_code.cost.usage is approximate and intended for trend analysis, not billing. Use your official billing source for invoicing.
Events (emitted when OTEL_LOGS_EXPORTER=otlp is set)
| Event | Fires when | Notable attributes |
|---|---|---|
claude_code.user_prompt |
User submits a prompt | prompt_length, prompt (redacted by default) |
claude_code.tool_result |
A tool finishes | tool_name, success, duration_ms |
claude_code.tool_decision |
A permission decision is made | tool_name, decision, source |
claude_code.api_request |
A request to the model completes | model, cost_usd, duration_ms, input_tokens, output_tokens |
claude_code.api_error |
A model request fails | model, status_code, error, duration_ms |
Claude Code emits additional events (MCP connections, compaction, auth, plugin/skill activity, and more). All events share standard attributes such as session.id, organization.id, user.email (when authenticated), and terminal.type.
Traces (beta, emitted when OTEL_TRACES_EXPORTER=otlp and CLAUDE_CODE_ENHANCED_TELEMETRY_BETA=1 are set)
Each user turn becomes a claude_code.interaction root span, with a claude_code.llm_request child per model call (tokens, time-to-first-token, stop_reason) and a claude_code.tool child per tool call. Spans link across subprocesses and outbound API calls via W3C trace context, and land in the <your-stream> traces stream.
Privacy and cardinality
Sensitive content is redacted by default. Opt in only what you need:
| Variable | Effect | Default |
|---|---|---|
OTEL_LOG_USER_PROMPTS |
Include the full prompt text in user_prompt events |
off (length only) |
OTEL_LOG_TOOL_DETAILS |
Include tool parameters and command names | off |
OTEL_LOG_TOOL_CONTENT |
Include tool input/output in trace spans (requires traces) | off |
OTEL_METRICS_INCLUDE_SESSION_ID |
Add session.id to metric attributes |
true |
OTEL_METRICS_INCLUDE_VERSION |
Add app.version to metric attributes |
false |
OTEL_METRICS_INCLUDE_ACCOUNT_UUID |
Add account UUID to metric attributes | true |
Prompt logging carries PII risk
Leave OTEL_LOG_USER_PROMPTS off unless you have a deliberate reason to capture prompt text, and ensure your OpenObserve retention and access controls match your data-handling policy.
For high-volume fleets, dropping high-cardinality attributes (e.g. session ID) keeps metric stream sizes manageable.
Org-wide rollout
To enable monitoring for an entire team, distribute the same env block via managed settings so it can't be turned off by individual users. See Anthropic's managed settings for how to push settings.json through MDM/device management. The OpenObserve configuration is identical to the per-user setup above.
Route through an OpenTelemetry Collector (optional)
Pointing Claude Code straight at OpenObserve is the simplest path. If you already run an OpenTelemetry Collector for buffering, retries, or fan-out to multiple backends, send Claude Code to the collector instead and configure an OpenObserve exporter there:
exporters:
otlphttp/openobserve:
endpoint: https://<your-openobserve-host>/api/<your-org>
headers:
Authorization: Basic <base64-token>
stream-name: <your-stream>
service:
pipelines:
metrics:
receivers: [otlp]
exporters: [otlphttp/openobserve]
logs:
receivers: [otlp]
exporters: [otlphttp/openobserve]
Then set OTEL_EXPORTER_OTLP_ENDPOINT on Claude Code to your collector's address (commonly http://localhost:4318 for HTTP).
Troubleshooting
Common issues and quick checks (click to expand)
For faster feedback while testing, shorten the export intervals so data shows up almost immediately, then reset for production:
export OTEL_METRIC_EXPORT_INTERVAL=10000 # 10s (default 60000)
export OTEL_LOGS_EXPORT_INTERVAL=5000 # 5s (default 5000)
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| No data at all | CLAUDE_CODE_ENABLE_TELEMETRY not set |
Set it to 1 and start a new session |
404 on export |
Trailing slash on the endpoint | Remove the trailing /; use .../api/<org> |
401/403 errors |
Wrong auth header | Re-encode email:password and use Authorization=Basic <base64> |
| Metrics but no events | OTEL_LOGS_EXPORTER unset |
Set OTEL_LOGS_EXPORTER=otlp |
| Events go to the wrong stream | Missing stream-name header |
Add stream-name=<your-stream> to OTEL_EXPORTER_OTLP_HEADERS |
| Config not applied | Old session still running | Restart Claude Code after editing settings.json |
Run claude doctor to confirm the telemetry configuration Claude Code has loaded.
Per-turn tracing via a Stop hook
This is an alternative to Claude Code's native trace export above. It produces the same per-turn span tree using a custom hook. Use it when you want traces without enabling the beta flag, or when you need full control over the span shape and attributes.
Claude Code supports hooks, shell commands that run at specific lifecycle points. This approach uses the Stop hook, which fires after every Claude Code response.
When triggered, the hook:
- Reads a JSON payload from stdin containing
session_idandtranscript_path - Incrementally reads new JSONL entries from the transcript file (only new bytes since last run)
- Groups messages into turns (user → assistant → tool calls/results)
- Exports each turn as an OpenTelemetry trace to OpenObserve, with child spans for LLM generation and tool calls
Stop hook fires
→ Read stdin payload (session_id, transcript_path)
→ Read new transcript lines (incremental, offset-based)
→ Group into turns
→ Export as OTel traces to OpenObserve
Prerequisites
- Python 3.9+
- openobserve-telemetry-sdk
Setup
1. Register the hook globally
Add the Stop hook to ~/.claude/settings.json:
{
"hooks": {
"Stop": [
{
"hooks": [
{
"type": "command",
"command": "python3 ~/.claude/hooks/openobserve_hooks.py"
}
]
}
]
}
}
This makes the hook run after every Claude Code response across all projects.
2. Configure environment variables
Set the required environment variables at the global level (all projects) or per-project level.
Add env vars to ~/.claude/settings.json alongside the hook definition:
{
"env": {
"TRACE_TO_OPENOBSERVE": "true",
"OPENOBSERVE_URL": "http://localhost:5080",
"OPENOBSERVE_ORG": "default",
"OPENOBSERVE_AUTH_TOKEN": "Basic <base64-encoded user:password>"
},
"hooks": {
"Stop": [
{
"hooks": [
{
"type": "command",
"command": "python3 ~/.claude/hooks/openobserve_hooks.py"
}
]
}
]
}
}
Enable tracing selectively by adding env vars to each project's .claude/settings.local.json:
Environment variables
| Variable | Description | Required | Default |
|---|---|---|---|
TRACE_TO_OPENOBSERVE |
Set to "true" to enable tracing |
Yes | |
OPENOBSERVE_URL |
OpenObserve base URL | Yes | |
OPENOBSERVE_ORG |
OpenObserve organization | Yes | |
OPENOBSERVE_AUTH_TOKEN |
Basic <base64> or Bearer token |
Yes | |
OPENOBSERVE_TRACES_STREAM_NAME |
Target stream name | No | "default" |
OPENOBSERVE_PROTOCOL |
"http/protobuf" or "grpc" |
No | "http/protobuf" |
OPENOBSERVE_USER_ID |
User identifier (added as resource attribute) | No | None |
CC_OPENOBSERVE_DEBUG |
Set to "true" for verbose logging |
No | "false" |
CC_OPENOBSERVE_MAX_CHARS |
Max characters per text field before truncation | No | 20000 |
Generating the auth token. The token is a Base64-encoded email:password string. For example, with root@example.com / Complexpass#123:
Then set OPENOBSERVE_AUTH_TOKEN to Basic cm9vdEBleGFtcGxlLmNvbTpDb21wbGV4cGFzcyMxMjM=.
Troubleshooting
Diagnostics and common issues (click to expand)
Check the log file
Enable debug logging by setting CC_OPENOBSERVE_DEBUG=true in your project's env config.
Test the hook manually
echo '{"session_id":"test","transcript_path":"/path/to/transcript.jsonl"}' | \
TRACE_TO_OPENOBSERVE=true \
OPENOBSERVE_URL=http://localhost:5080 \
OPENOBSERVE_ORG=default \
OPENOBSERVE_AUTH_TOKEN="Basic ..." \
python3 ~/.claude/hooks/openobserve_hooks.py
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| No traces appear | TRACE_TO_OPENOBSERVE not set |
Add env vars to .claude/settings.local.json |
| Hook silently exits | Missing openobserve-telemetry-sdk |
Run pip install openobserve-telemetry-sdk |
| Auth errors in log | Wrong token format | Ensure token is Basic <base64> format |
| Partial traces | OpenObserve unreachable | Verify OPENOBSERVE_URL and that the service is running |
State files. The hook maintains incremental state in ~/.claude/state/:
openobserve_state.json: per-session offsets and turn countsopenobserve_state.lock: file lock for concurrent accessopenobserve_hook.log: debug and info log
Limitations. System prompts are not included in Claude Code's conversation transcripts, so they are not part of the trace.
Hook source code
The complete openobserve_hooks.py script is included below. Save it to ~/.claude/hooks/openobserve_hooks.py.
openobserve_hooks.py (click to expand)
#!/usr/bin/env python3
"""
Claude Code -> OpenObserve hook
Sends trace data from Claude Code sessions to OpenObserve via OpenTelemetry.
Environment variables:
TRACE_TO_OPENOBSERVE=true Enable tracing
OPENOBSERVE_URL OpenObserve base URL
OPENOBSERVE_ORG OpenObserve organization
OPENOBSERVE_AUTH_TOKEN Authorization token
OPENOBSERVE_TRACES_STREAM_NAME Stream name (default: "default")
OPENOBSERVE_PROTOCOL "http/protobuf" or "grpc" (default: "http/protobuf")
OPENOBSERVE_USER_ID User identifier (default: None, added as resource attribute)
CC_OPENOBSERVE_DEBUG=true Enable debug logging
CC_OPENOBSERVE_MAX_CHARS=20000 Max chars per text field
"""
import json
import os
import socket
import sys
import time
import hashlib
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# --- OpenObserve / OpenTelemetry import (fail-open) ---
try:
from openobserve import openobserve_init_traces, openobserve_flush, openobserve_shutdown
from opentelemetry import trace
except Exception:
sys.exit(0)
# --- Paths ---
STATE_DIR = Path.home() / ".claude" / "state"
LOG_FILE = STATE_DIR / "openobserve_hook.log"
STATE_FILE = STATE_DIR / "openobserve_state.json"
LOCK_FILE = STATE_DIR / "openobserve_state.lock"
DEBUG = os.environ.get("CC_OPENOBSERVE_DEBUG", "").lower() == "true"
MAX_CHARS = int(os.environ.get("CC_OPENOBSERVE_MAX_CHARS", "20000"))
USER_ID = os.environ.get("OPENOBSERVE_USER_ID") or None
HOSTNAME = socket.gethostname()
# ----------------- Logging -----------------
def _log(level: str, message: str) -> None:
try:
STATE_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"{ts} [{level}] {message}\n")
except Exception:
# Never block
pass
def debug(msg: str) -> None:
if DEBUG:
_log("DEBUG", msg)
def info(msg: str) -> None:
_log("INFO", msg)
def warn(msg: str) -> None:
_log("WARN", msg)
def error(msg: str) -> None:
_log("ERROR", msg)
# ----------------- State locking (best-effort) -----------------
class FileLock:
def __init__(self, path: Path, timeout_s: float = 2.0):
self.path = path
self.timeout_s = timeout_s
self._fh = None
def __enter__(self):
STATE_DIR.mkdir(parents=True, exist_ok=True)
self._fh = open(self.path, "a+", encoding="utf-8")
try:
import fcntl # Unix only
deadline = time.time() + self.timeout_s
while True:
try:
fcntl.flock(self._fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() > deadline:
break
time.sleep(0.05)
except Exception:
# If locking isn't available, proceed without it.
pass
return self
def __exit__(self, exc_type, exc, tb):
if self._fh is None:
return
try:
import fcntl
fcntl.flock(self._fh.fileno(), fcntl.LOCK_UN)
except Exception:
pass
try:
self._fh.close()
except Exception:
pass
def load_state() -> Dict[str, Any]:
try:
if not STATE_FILE.exists():
return {}
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except Exception:
return {}
def save_state(state: Dict[str, Any]) -> None:
try:
STATE_DIR.mkdir(parents=True, exist_ok=True)
tmp = STATE_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
os.replace(tmp, STATE_FILE)
except Exception as e:
debug(f"save_state failed: {e}")
STATE_MAX_AGE_DAYS = 7
def prune_stale_entries(state: Dict[str, Any]) -> Dict[str, Any]:
"""Remove state entries older than STATE_MAX_AGE_DAYS."""
now = datetime.now(timezone.utc)
pruned: Dict[str, Any] = {}
for key, val in state.items():
updated = val.get("updated") if isinstance(val, dict) else None
if updated:
try:
ts = datetime.fromisoformat(updated)
if (now - ts).days > STATE_MAX_AGE_DAYS:
continue
except Exception:
pass
pruned[key] = val
return pruned
def state_key(session_id: str, transcript_path: str) -> str:
# stable key even if session_id collides
raw = f"{session_id}::{transcript_path}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
# ----------------- Hook payload -----------------
def read_hook_payload() -> Dict[str, Any]:
"""
Claude Code hooks pass a JSON payload on stdin.
This script tolerates missing/empty stdin by returning {}.
"""
try:
data = sys.stdin.read()
if not data.strip():
return {}
return json.loads(data)
except Exception:
return {}
def extract_session_and_transcript(payload: Dict[str, Any]) -> Tuple[Optional[str], Optional[Path]]:
"""
Hook payload uses snake_case: session_id, transcript_path.
"""
session_id = (
payload.get("session_id")
or payload.get("sessionId")
)
transcript = (
payload.get("transcript_path")
or payload.get("transcriptPath")
)
if transcript:
try:
transcript_path = Path(transcript).expanduser().resolve()
except Exception:
transcript_path = None
else:
transcript_path = None
return session_id, transcript_path
# ----------------- Transcript parsing helpers -----------------
def get_content(msg: Dict[str, Any]) -> Any:
if not isinstance(msg, dict):
return None
if "message" in msg and isinstance(msg.get("message"), dict):
return msg["message"].get("content")
return msg.get("content")
def get_role(msg: Dict[str, Any]) -> Optional[str]:
# Claude Code transcript lines commonly have type=user/assistant OR message.role
t = msg.get("type")
if t in ("user", "assistant"):
return t
m = msg.get("message")
if isinstance(m, dict):
r = m.get("role")
if r in ("user", "assistant"):
return r
return None
def is_tool_result(msg: Dict[str, Any]) -> bool:
role = get_role(msg)
if role != "user":
return False
content = get_content(msg)
if isinstance(content, list):
return any(isinstance(x, dict) and x.get("type") == "tool_result" for x in content)
return False
def iter_tool_results(content: Any) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
if isinstance(content, list):
for x in content:
if isinstance(x, dict) and x.get("type") == "tool_result":
out.append(x)
return out
def iter_tool_uses(content: Any) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
if isinstance(content, list):
for x in content:
if isinstance(x, dict) and x.get("type") == "tool_use":
out.append(x)
return out
def extract_text(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts: List[str] = []
for x in content:
if isinstance(x, dict) and x.get("type") == "text":
parts.append(x.get("text", ""))
elif isinstance(x, str):
parts.append(x)
return "\n".join([p for p in parts if p])
return ""
def truncate_text(s: str, max_chars: int = MAX_CHARS) -> Tuple[str, Dict[str, Any]]:
if s is None:
return "", {"truncated": False, "orig_len": 0}
orig_len = len(s)
if orig_len <= max_chars:
return s, {"truncated": False, "orig_len": orig_len}
head = s[:max_chars]
return head, {"truncated": True, "orig_len": orig_len, "kept_len": len(head), "sha256": hashlib.sha256(s.encode("utf-8")).hexdigest()}
def get_model(msg: Dict[str, Any]) -> str:
m = msg.get("message")
if isinstance(m, dict):
return m.get("model") or "claude"
return "claude"
def get_message_id(msg: Dict[str, Any]) -> Optional[str]:
m = msg.get("message")
if isinstance(m, dict):
mid = m.get("id")
if isinstance(mid, str) and mid:
return mid
return None
def get_usage(msg: Dict[str, Any]) -> Dict[str, Any]:
"""Extract usage/token data from an assistant transcript message."""
m = msg.get("message")
if isinstance(m, dict):
return m.get("usage") or {}
return {}
# ----------------- Incremental reader -----------------
@dataclass
class SessionState:
offset: int = 0
buffer: str = ""
turn_count: int = 0
def load_session_state(global_state: Dict[str, Any], key: str) -> SessionState:
s = global_state.get(key, {})
return SessionState(
offset=int(s.get("offset", 0)),
buffer=str(s.get("buffer", "")),
turn_count=int(s.get("turn_count", 0)),
)
def write_session_state(global_state: Dict[str, Any], key: str, ss: SessionState) -> None:
global_state[key] = {
"offset": ss.offset,
"buffer": ss.buffer,
"turn_count": ss.turn_count,
"updated": datetime.now(timezone.utc).isoformat(),
}
def read_new_jsonl(transcript_path: Path, ss: SessionState) -> Tuple[List[Dict[str, Any]], SessionState]:
"""
Reads only new bytes since ss.offset. Keeps ss.buffer for partial last line.
Returns parsed JSON lines (best-effort) and updated state.
"""
if not transcript_path.exists():
return [], ss
try:
with open(transcript_path, "rb") as f:
f.seek(ss.offset)
chunk = f.read()
new_offset = f.tell()
except Exception as e:
debug(f"read_new_jsonl failed: {e}")
return [], ss
if not chunk:
return [], ss
text = chunk.decode("utf-8", errors="replace")
combined = ss.buffer + text
lines = combined.split("\n")
# last element may be incomplete
ss.buffer = lines[-1]
ss.offset = new_offset
msgs: List[Dict[str, Any]] = []
for line in lines[:-1]:
line = line.strip()
if not line:
continue
try:
msgs.append(json.loads(line))
except Exception:
continue
return msgs, ss
# ----------------- Turn assembly -----------------
@dataclass
class Turn:
user_msg: Dict[str, Any]
assistant_msgs: List[Dict[str, Any]]
tool_results_by_id: Dict[str, Any]
def build_turns(messages: List[Dict[str, Any]]) -> List[Turn]:
"""
Groups incremental transcript rows into turns:
user (non-tool-result) -> assistant messages -> (tool_result rows, possibly interleaved)
Uses:
- assistant message dedupe by message.id (latest row wins)
- tool results dedupe by tool_use_id (latest wins)
"""
turns: List[Turn] = []
current_user: Optional[Dict[str, Any]] = None
# assistant messages for current turn:
assistant_order: List[str] = [] # message ids in order of first appearance (or synthetic)
assistant_latest: Dict[str, Dict[str, Any]] = {} # id -> latest msg
tool_results_by_id: Dict[str, Any] = {} # tool_use_id -> content
def flush_turn():
nonlocal current_user, assistant_order, assistant_latest, tool_results_by_id, turns
if current_user is None:
return
if not assistant_latest:
return
assistants = [assistant_latest[mid] for mid in assistant_order if mid in assistant_latest]
turns.append(Turn(user_msg=current_user, assistant_msgs=assistants, tool_results_by_id=dict(tool_results_by_id)))
for msg in messages:
role = get_role(msg)
# tool_result rows show up as role=user with content blocks of type tool_result
if is_tool_result(msg):
for tr in iter_tool_results(get_content(msg)):
tid = tr.get("tool_use_id")
if tid:
tool_results_by_id[str(tid)] = tr.get("content")
continue
if role == "user":
# new user message -> finalize previous turn
flush_turn()
# start a new turn
current_user = msg
assistant_order = []
assistant_latest = {}
tool_results_by_id = {}
continue
if role == "assistant":
if current_user is None:
# ignore assistant rows until we see a user message
continue
mid = get_message_id(msg) or f"noid:{len(assistant_order)}"
if mid not in assistant_latest:
assistant_order.append(mid)
assistant_latest[mid] = msg
continue
# ignore unknown rows
# flush last
flush_turn()
return turns
# ----------------- OpenObserve emit via OpenTelemetry -----------------
def _tool_calls_from_assistants(assistant_msgs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
calls: List[Dict[str, Any]] = []
for am in assistant_msgs:
for tu in iter_tool_uses(get_content(am)):
tid = tu.get("id") or ""
calls.append({
"id": str(tid),
"name": tu.get("name") or "unknown",
"input": tu.get("input") if isinstance(tu.get("input"), (dict, list, str, int, float, bool)) else {},
})
return calls
def emit_turn(tracer: trace.Tracer, session_id: str, turn_num: int, turn: Turn, transcript_path: Path) -> None:
user_text_raw = extract_text(get_content(turn.user_msg))
user_text, user_text_meta = truncate_text(user_text_raw)
last_assistant = turn.assistant_msgs[-1]
assistant_text_raw = extract_text(get_content(last_assistant))
assistant_text, assistant_text_meta = truncate_text(assistant_text_raw)
model = get_model(turn.assistant_msgs[0])
tool_calls = _tool_calls_from_assistants(turn.assistant_msgs)
# Token usage from the last (final) assistant message in this turn
usage = get_usage(last_assistant)
# attach tool outputs
for c in tool_calls:
if c["id"] and c["id"] in turn.tool_results_by_id:
out_raw = turn.tool_results_by_id[c["id"]]
out_str = out_raw if isinstance(out_raw, str) else json.dumps(out_raw, ensure_ascii=False)
out_trunc, out_meta = truncate_text(out_str)
c["output"] = out_trunc
c["output_meta"] = out_meta
else:
c["output"] = None
# Root span for the turn (summary only — details on children)
with tracer.start_as_current_span(
name=f"Claude Code - Turn {turn_num}",
attributes={
"claude_code.source": "claude-code",
"session.id": session_id,
"claude_code.turn_number": turn_num,
"claude_code.transcript_path": str(transcript_path),
"host.name": HOSTNAME,
"llm.model": model,
"gen_ai.provider.name": "anthropic",
"gen_ai.operation.name": "chat",
"claude_code.tool_count": len(tool_calls),
},
):
# LLM generation child span (carries full message & usage details)
with tracer.start_as_current_span(
name="Claude Response",
attributes={
"gen_ai.provider.name": "anthropic",
"gen_ai.request.model": model,
"gen_ai.operation.name": "chat",
"gen_ai.input.messages": user_text,
"input.truncated": user_text_meta.get("truncated", False),
"input.orig_len": user_text_meta.get("orig_len", 0),
"gen_ai.output.messages": assistant_text,
"output.truncated": assistant_text_meta.get("truncated", False),
"output.orig_len": assistant_text_meta.get("orig_len", 0),
"claude_code.tool_count": len(tool_calls),
"host.name": HOSTNAME,
"gen_ai.usage.input_tokens": usage.get("input_tokens", 0) or 0,
"gen_ai.usage.output_tokens": usage.get("output_tokens", 0) or 0,
"gen_ai.usage.cache_read_tokens": usage.get("cache_read_input_tokens", 0) or 0,
"gen_ai.usage.cache_write_tokens": usage.get("cache_creation_input_tokens", 0) or 0,
},
):
pass
# Tool child spans
for tc in tool_calls:
in_obj = tc["input"]
if isinstance(in_obj, str):
in_str, _ = truncate_text(in_obj)
elif isinstance(in_obj, (dict, list)):
in_str, _ = truncate_text(json.dumps(in_obj, ensure_ascii=False))
else:
in_str = str(in_obj)
out_str = tc.get("output") or ""
with tracer.start_as_current_span(
name=f"Tool: {tc['name']}",
attributes={
"gen_ai.tool.name": tc["name"],
"gen_ai.tool.call.id": tc["id"],
"gen_ai.tool.call.arguments": in_str,
"gen_ai.tool.call.result": out_str,
"host.name": HOSTNAME,
},
):
pass
# ----------------- Main -----------------
def main() -> int:
start = time.time()
debug("Hook started")
if os.environ.get("TRACE_TO_OPENOBSERVE", "").lower() != "true":
return 0
# Validate required env vars are present (openobserve SDK reads them itself)
if not os.environ.get("OPENOBSERVE_AUTH_TOKEN"):
debug("OPENOBSERVE_AUTH_TOKEN not set; exiting.")
return 0
payload = read_hook_payload()
session_id, transcript_path = extract_session_and_transcript(payload)
if not session_id or not transcript_path:
debug("Missing session_id or transcript_path from hook payload; exiting.")
return 0
if not transcript_path.exists():
debug(f"Transcript path does not exist: {transcript_path}")
return 0
try:
resource_attrs = {
"service.name": "claude-code",
"session.id": session_id,
"host.name": HOSTNAME,
}
if USER_ID:
resource_attrs["user.id"] = USER_ID
openobserve_init_traces(
resource_attributes=resource_attrs,
)
tracer = trace.get_tracer("claude-code-hook")
except Exception as e:
debug(f"Failed to initialize OpenObserve traces: {e}")
return 0
# From here on, openobserve was initialized — ensure shutdown in finally.
try:
emitted = 0
with FileLock(LOCK_FILE):
state = prune_stale_entries(load_state())
key = state_key(session_id, str(transcript_path))
ss = load_session_state(state, key)
msgs, ss = read_new_jsonl(transcript_path, ss)
if not msgs:
write_session_state(state, key, ss)
save_state(state)
return 0
turns = build_turns(msgs)
if not turns:
write_session_state(state, key, ss)
save_state(state)
return 0
for t in turns:
emitted += 1
turn_num = ss.turn_count + emitted
try:
emit_turn(tracer, session_id, turn_num, t, transcript_path)
except Exception as e:
debug(f"emit_turn failed: {e}")
ss.turn_count += emitted
write_session_state(state, key, ss)
save_state(state)
try:
openobserve_flush()
except Exception:
pass
dur = time.time() - start
info(f"Processed {emitted} turns in {dur:.2f}s (session={session_id})")
return 0
except Exception as e:
debug(f"Unexpected failure: {e}")
return 0
finally:
try:
openobserve_shutdown()
except Exception:
pass
if __name__ == "__main__":
sys.exit(main())
Related
- OTLP / OpenTelemetry ingestion: endpoint and collector reference.
- Quickstart: get OpenObserve running if you haven't yet.
Need some help?
- Join our Community Slack
- Or Contact support