Claude Code → OpenObserve Tracing Hook
Send trace data from your Claude Code sessions to OpenObserve via OpenTelemetry.
Each conversation turn (user prompt → assistant response → tool calls) is exported as a structured trace, giving you full observability into how Claude Code works on your behalf.
How It Works
Claude Code supports hooks — shell commands that run at specific lifecycle points. This project uses the Stop hook, which fires after every Claude Code response.
When triggered, the hook:
- Reads a JSON payload from stdin containing
session_idandtranscript_path - Incrementally reads new JSONL entries from the transcript file (only new bytes since last run)
- Groups messages into turns (user → assistant → tool calls/results)
- Exports each turn as an OpenTelemetry trace to OpenObserve, with child spans for LLM generation and tool calls
Stop hook fires
→ Read stdin payload (session_id, transcript_path)
→ Read new transcript lines (incremental, offset-based)
→ Group into turns
→ Export as OTel traces to OpenObserve
Prerequisites
- Python 3.9+
- openobserve-telemetry-sdk
Setup
1. Register the hook globally
Add the Stop hook to ~/.claude/settings.json:
{
"hooks": {
"Stop": [
{
"hooks": [
{
"type": "command",
"command": "python3 ~/.claude/hooks/openobserve_hooks.py"
}
]
}
]
}
}
This makes the hook run after every Claude Code response across all projects.
2. Configure environment variables
You can set the required environment variables at the global level (all projects) or per-project level.
Option A: Global (all projects)
Add env vars to ~/.claude/settings.json alongside the hook definition:
{
"env": {
"TRACE_TO_OPENOBSERVE": "true",
"OPENOBSERVE_URL": "http://localhost:5080",
"OPENOBSERVE_ORG": "default",
"OPENOBSERVE_AUTH_TOKEN": "Basic <base64-encoded user:password>"
},
"hooks": {
"Stop": [
{
"hooks": [
{
"type": "command",
"command": "python3 ~/.claude/hooks/openobserve_hooks.py"
}
]
}
]
}
}
Option B: Per-project
Enable tracing selectively by adding env vars to each project's .claude/settings.local.json:
{
"env": {
"TRACE_TO_OPENOBSERVE": "true",
"OPENOBSERVE_URL": "http://localhost:5080",
"OPENOBSERVE_ORG": "default",
"OPENOBSERVE_AUTH_TOKEN": "Basic <base64-encoded user:password>",
"CC_OPENOBSERVE_DEBUG": "true"
}
}
Environment Variables
| Variable | Description | Required | Default |
|---|---|---|---|
TRACE_TO_OPENOBSERVE |
Set to "true" to enable tracing |
Yes | — |
OPENOBSERVE_URL |
OpenObserve base URL | Yes | — |
OPENOBSERVE_ORG |
OpenObserve organization | Yes | — |
OPENOBSERVE_AUTH_TOKEN |
Basic <base64> or Bearer token |
Yes | — |
OPENOBSERVE_TRACES_STREAM_NAME |
Target stream name | No | "default" |
OPENOBSERVE_PROTOCOL |
"http/protobuf" or "grpc" |
No | "http/protobuf" |
OPENOBSERVE_USER_ID |
User identifier (added as resource attribute) | No | None |
CC_OPENOBSERVE_DEBUG |
Set to "true" for verbose logging |
No | "false" |
CC_OPENOBSERVE_MAX_CHARS |
Max characters per text field before truncation | No | 20000 |
Generating the auth token
The token is a Base64-encoded email:password string. For example, with root@example.com / Complexpass#123:
Then set OPENOBSERVE_AUTH_TOKEN to Basic cm9vdEBleGFtcGxlLmNvbTpDb21wbGV4cGFzcyMxMjM=.
Troubleshooting
Check the log file
Enable debug logging by setting CC_OPENOBSERVE_DEBUG=true in your project's env config.
Test the hook manually
echo '{"session_id":"test","transcript_path":"/path/to/transcript.jsonl"}' | \
TRACE_TO_OPENOBSERVE=true \
OPENOBSERVE_URL=http://localhost:5080 \
OPENOBSERVE_ORG=default \
OPENOBSERVE_AUTH_TOKEN="Basic ..." \
python3 ~/.claude/hooks/openobserve_hooks.py
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| No traces appear | TRACE_TO_OPENOBSERVE not set |
Add env vars to .claude/settings.local.json |
| Hook silently exits | Missing openobserve-telemetry-sdk |
Run pip install openobserve-telemetry-sdk |
| Auth errors in log | Wrong token format | Ensure token is Basic <base64> format |
| Partial traces | OpenObserve unreachable | Verify OPENOBSERVE_URL and that the service is running |
State files
The hook maintains incremental state in ~/.claude/state/:
openobserve_state.json— per-session offsets and turn countsopenobserve_state.lock— file lock for concurrent accessopenobserve_hook.log— debug/info log
Limitations
- System prompts are not included in Claude Code's conversation transcripts, so they are not part of the trace.
- Token usage / cost data is not available in the transcript format.
Hook Source Code
The complete openobserve_hooks.py script is included below. Save it to ~/.claude/hooks/openobserve_hooks.py.
openobserve_hooks.py (click to expand)
#!/usr/bin/env python3
"""
Claude Code -> OpenObserve hook
Sends trace data from Claude Code sessions to OpenObserve via OpenTelemetry.
Environment variables:
TRACE_TO_OPENOBSERVE=true Enable tracing
OPENOBSERVE_URL OpenObserve base URL
OPENOBSERVE_ORG OpenObserve organization
OPENOBSERVE_AUTH_TOKEN Authorization token
OPENOBSERVE_TRACES_STREAM_NAME Stream name (default: "default")
OPENOBSERVE_PROTOCOL "http/protobuf" or "grpc" (default: "http/protobuf")
OPENOBSERVE_USER_ID User identifier (default: None, added as resource attribute)
CC_OPENOBSERVE_DEBUG=true Enable debug logging
CC_OPENOBSERVE_MAX_CHARS=20000 Max chars per text field
"""
import json
import os
import socket
import sys
import time
import hashlib
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# --- OpenObserve / OpenTelemetry import (fail-open) ---
try:
from openobserve import openobserve_init_traces, openobserve_flush, openobserve_shutdown
from opentelemetry import trace
except Exception:
sys.exit(0)
# --- Paths ---
STATE_DIR = Path.home() / ".claude" / "state"
LOG_FILE = STATE_DIR / "openobserve_hook.log"
STATE_FILE = STATE_DIR / "openobserve_state.json"
LOCK_FILE = STATE_DIR / "openobserve_state.lock"
DEBUG = os.environ.get("CC_OPENOBSERVE_DEBUG", "").lower() == "true"
MAX_CHARS = int(os.environ.get("CC_OPENOBSERVE_MAX_CHARS", "20000"))
USER_ID = os.environ.get("OPENOBSERVE_USER_ID") or None
HOSTNAME = socket.gethostname()
# ----------------- Logging -----------------
def _log(level: str, message: str) -> None:
try:
STATE_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"{ts} [{level}] {message}\n")
except Exception:
# Never block
pass
def debug(msg: str) -> None:
if DEBUG:
_log("DEBUG", msg)
def info(msg: str) -> None:
_log("INFO", msg)
def warn(msg: str) -> None:
_log("WARN", msg)
def error(msg: str) -> None:
_log("ERROR", msg)
# ----------------- State locking (best-effort) -----------------
class FileLock:
def __init__(self, path: Path, timeout_s: float = 2.0):
self.path = path
self.timeout_s = timeout_s
self._fh = None
def __enter__(self):
STATE_DIR.mkdir(parents=True, exist_ok=True)
self._fh = open(self.path, "a+", encoding="utf-8")
try:
import fcntl # Unix only
deadline = time.time() + self.timeout_s
while True:
try:
fcntl.flock(self._fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() > deadline:
break
time.sleep(0.05)
except Exception:
# If locking isn't available, proceed without it.
pass
return self
def __exit__(self, exc_type, exc, tb):
try:
import fcntl
fcntl.flock(self._fh.fileno(), fcntl.LOCK_UN)
except Exception:
pass
try:
self._fh.close()
except Exception:
pass
def load_state() -> Dict[str, Any]:
try:
if not STATE_FILE.exists():
return {}
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except Exception:
return {}
def save_state(state: Dict[str, Any]) -> None:
try:
STATE_DIR.mkdir(parents=True, exist_ok=True)
tmp = STATE_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
os.replace(tmp, STATE_FILE)
except Exception as e:
debug(f"save_state failed: {e}")
def state_key(session_id: str, transcript_path: str) -> str:
# stable key even if session_id collides
raw = f"{session_id}::{transcript_path}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
# ----------------- Hook payload -----------------
def read_hook_payload() -> Dict[str, Any]:
"""
Claude Code hooks pass a JSON payload on stdin.
This script tolerates missing/empty stdin by returning {}.
"""
try:
data = sys.stdin.read()
if not data.strip():
return {}
return json.loads(data)
except Exception:
return {}
def extract_session_and_transcript(payload: Dict[str, Any]) -> Tuple[Optional[str], Optional[Path]]:
"""
Tries a few plausible field names; exact keys can vary across hook types/versions.
Prefer structured values from stdin over heuristics.
"""
session_id = (
payload.get("sessionId")
or payload.get("session_id")
or payload.get("session", {}).get("id")
)
transcript = (
payload.get("transcriptPath")
or payload.get("transcript_path")
or payload.get("transcript", {}).get("path")
)
if transcript:
try:
transcript_path = Path(transcript).expanduser().resolve()
except Exception:
transcript_path = None
else:
transcript_path = None
return session_id, transcript_path
# ----------------- Transcript parsing helpers -----------------
def get_content(msg: Dict[str, Any]) -> Any:
if not isinstance(msg, dict):
return None
if "message" in msg and isinstance(msg.get("message"), dict):
return msg["message"].get("content")
return msg.get("content")
def get_role(msg: Dict[str, Any]) -> Optional[str]:
# Claude Code transcript lines commonly have type=user/assistant OR message.role
t = msg.get("type")
if t in ("user", "assistant"):
return t
m = msg.get("message")
if isinstance(m, dict):
r = m.get("role")
if r in ("user", "assistant"):
return r
return None
def is_tool_result(msg: Dict[str, Any]) -> bool:
role = get_role(msg)
if role != "user":
return False
content = get_content(msg)
if isinstance(content, list):
return any(isinstance(x, dict) and x.get("type") == "tool_result" for x in content)
return False
def iter_tool_results(content: Any) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
if isinstance(content, list):
for x in content:
if isinstance(x, dict) and x.get("type") == "tool_result":
out.append(x)
return out
def iter_tool_uses(content: Any) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
if isinstance(content, list):
for x in content:
if isinstance(x, dict) and x.get("type") == "tool_use":
out.append(x)
return out
def extract_text(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts: List[str] = []
for x in content:
if isinstance(x, dict) and x.get("type") == "text":
parts.append(x.get("text", ""))
elif isinstance(x, str):
parts.append(x)
return "\n".join([p for p in parts if p])
return ""
def truncate_text(s: str, max_chars: int = MAX_CHARS) -> Tuple[str, Dict[str, Any]]:
if s is None:
return "", {"truncated": False, "orig_len": 0}
orig_len = len(s)
if orig_len <= max_chars:
return s, {"truncated": False, "orig_len": orig_len}
head = s[:max_chars]
return head, {"truncated": True, "orig_len": orig_len, "kept_len": len(head), "sha256": hashlib.sha256(s.encode("utf-8")).hexdigest()}
def get_model(msg: Dict[str, Any]) -> str:
m = msg.get("message")
if isinstance(m, dict):
return m.get("model") or "claude"
return "claude"
def get_message_id(msg: Dict[str, Any]) -> Optional[str]:
m = msg.get("message")
if isinstance(m, dict):
mid = m.get("id")
if isinstance(mid, str) and mid:
return mid
return None
# ----------------- Incremental reader -----------------
@dataclass
class SessionState:
offset: int = 0
buffer: str = ""
turn_count: int = 0
def load_session_state(global_state: Dict[str, Any], key: str) -> SessionState:
s = global_state.get(key, {})
return SessionState(
offset=int(s.get("offset", 0)),
buffer=str(s.get("buffer", "")),
turn_count=int(s.get("turn_count", 0)),
)
def write_session_state(global_state: Dict[str, Any], key: str, ss: SessionState) -> None:
global_state[key] = {
"offset": ss.offset,
"buffer": ss.buffer,
"turn_count": ss.turn_count,
"updated": datetime.now(timezone.utc).isoformat(),
}
def read_new_jsonl(transcript_path: Path, ss: SessionState) -> Tuple[List[Dict[str, Any]], SessionState]:
"""
Reads only new bytes since ss.offset. Keeps ss.buffer for partial last line.
Returns parsed JSON lines (best-effort) and updated state.
"""
if not transcript_path.exists():
return [], ss
try:
with open(transcript_path, "rb") as f:
f.seek(ss.offset)
chunk = f.read()
new_offset = f.tell()
except Exception as e:
debug(f"read_new_jsonl failed: {e}")
return [], ss
if not chunk:
return [], ss
try:
text = chunk.decode("utf-8", errors="replace")
except Exception:
text = chunk.decode(errors="replace")
combined = ss.buffer + text
lines = combined.split("\n")
# last element may be incomplete
ss.buffer = lines[-1]
ss.offset = new_offset
msgs: List[Dict[str, Any]] = []
for line in lines[:-1]:
line = line.strip()
if not line:
continue
try:
msgs.append(json.loads(line))
except Exception:
continue
return msgs, ss
# ----------------- Turn assembly -----------------
@dataclass
class Turn:
user_msg: Dict[str, Any]
assistant_msgs: List[Dict[str, Any]]
tool_results_by_id: Dict[str, Any]
def build_turns(messages: List[Dict[str, Any]]) -> List[Turn]:
"""
Groups incremental transcript rows into turns:
user (non-tool-result) -> assistant messages -> (tool_result rows, possibly interleaved)
Uses:
- assistant message dedupe by message.id (latest row wins)
- tool results dedupe by tool_use_id (latest wins)
"""
turns: List[Turn] = []
current_user: Optional[Dict[str, Any]] = None
# assistant messages for current turn:
assistant_order: List[str] = [] # message ids in order of first appearance (or synthetic)
assistant_latest: Dict[str, Dict[str, Any]] = {} # id -> latest msg
tool_results_by_id: Dict[str, Any] = {} # tool_use_id -> content
def flush_turn():
nonlocal current_user, assistant_order, assistant_latest, tool_results_by_id, turns
if current_user is None:
return
if not assistant_latest:
return
assistants = [assistant_latest[mid] for mid in assistant_order if mid in assistant_latest]
turns.append(Turn(user_msg=current_user, assistant_msgs=assistants, tool_results_by_id=dict(tool_results_by_id)))
for msg in messages:
role = get_role(msg)
# tool_result rows show up as role=user with content blocks of type tool_result
if is_tool_result(msg):
for tr in iter_tool_results(get_content(msg)):
tid = tr.get("tool_use_id")
if tid:
tool_results_by_id[str(tid)] = tr.get("content")
continue
if role == "user":
# new user message -> finalize previous turn
flush_turn()
# start a new turn
current_user = msg
assistant_order = []
assistant_latest = {}
tool_results_by_id = {}
continue
if role == "assistant":
if current_user is None:
# ignore assistant rows until we see a user message
continue
mid = get_message_id(msg) or f"noid:{len(assistant_order)}"
if mid not in assistant_latest:
assistant_order.append(mid)
assistant_latest[mid] = msg
continue
# ignore unknown rows
# flush last
flush_turn()
return turns
# ----------------- OpenObserve emit via OpenTelemetry -----------------
def _tool_calls_from_assistants(assistant_msgs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
calls: List[Dict[str, Any]] = []
for am in assistant_msgs:
for tu in iter_tool_uses(get_content(am)):
tid = tu.get("id") or ""
calls.append({
"id": str(tid),
"name": tu.get("name") or "unknown",
"input": tu.get("input") if isinstance(tu.get("input"), (dict, list, str, int, float, bool)) else {},
})
return calls
def emit_turn(tracer: trace.Tracer, session_id: str, turn_num: int, turn: Turn, transcript_path: Path) -> None:
user_text_raw = extract_text(get_content(turn.user_msg))
user_text, user_text_meta = truncate_text(user_text_raw)
last_assistant = turn.assistant_msgs[-1]
assistant_text_raw = extract_text(get_content(last_assistant))
assistant_text, assistant_text_meta = truncate_text(assistant_text_raw)
model = get_model(turn.assistant_msgs[0])
tool_calls = _tool_calls_from_assistants(turn.assistant_msgs)
# attach tool outputs
for c in tool_calls:
if c["id"] and c["id"] in turn.tool_results_by_id:
out_raw = turn.tool_results_by_id[c["id"]]
out_str = out_raw if isinstance(out_raw, str) else json.dumps(out_raw, ensure_ascii=False)
out_trunc, out_meta = truncate_text(out_str)
c["output"] = out_trunc
c["output_meta"] = out_meta
else:
c["output"] = None
# Root span for the turn
with tracer.start_as_current_span(
name=f"Claude Code - Turn {turn_num}",
attributes={
"claude_code.source": "claude-code",
"session.id": session_id,
"claude_code.turn_number": turn_num,
"claude_code.transcript_path": str(transcript_path),
"host.name": HOSTNAME,
"llm.model": model,
"gen_ai.provider.name": "anthropic",
"gen_ai.operation.name": "chat",
"claude_code.tool_count": len(tool_calls),
"gen_ai.input.messages": user_text,
"input.truncated": user_text_meta.get("truncated", False),
"input.orig_len": user_text_meta.get("orig_len", 0),
"gen_ai.output.messages": assistant_text,
"output.truncated": assistant_text_meta.get("truncated", False),
"output.orig_len": assistant_text_meta.get("orig_len", 0),
},
) as turn_span:
# LLM generation child span
with tracer.start_as_current_span(
name="Claude Response",
attributes={
"gen_ai.provider.name": "anthropic",
"gen_ai.request.model": model,
"gen_ai.operation.name": "chat",
"gen_ai.input.messages": user_text,
"gen_ai.output.messages": assistant_text,
"claude_code.tool_count": len(tool_calls),
"host.name": HOSTNAME,
},
):
pass
# Tool child spans
for tc in tool_calls:
in_obj = tc["input"]
if isinstance(in_obj, str):
in_str, _ = truncate_text(in_obj)
elif isinstance(in_obj, (dict, list)):
in_str, _ = truncate_text(json.dumps(in_obj, ensure_ascii=False))
else:
in_str = str(in_obj)
out_str = tc.get("output") or ""
with tracer.start_as_current_span(
name=f"Tool: {tc['name']}",
attributes={
"gen_ai.tool.name": tc["name"],
"gen_ai.tool.call.id": tc["id"],
"gen_ai.tool.call.arguments": in_str,
"gen_ai.tool.call.result": out_str,
"host.name": HOSTNAME,
},
):
pass
# ----------------- Main -----------------
def main() -> int:
start = time.time()
debug("Hook started")
if os.environ.get("TRACE_TO_OPENOBSERVE", "").lower() != "true":
return 0
# Validate required env vars are present (openobserve SDK reads them itself)
if not os.environ.get("OPENOBSERVE_AUTH_TOKEN"):
debug("OPENOBSERVE_AUTH_TOKEN not set; exiting.")
return 0
payload = read_hook_payload()
session_id, transcript_path = extract_session_and_transcript(payload)
if not session_id or not transcript_path:
debug("Missing session_id or transcript_path from hook payload; exiting.")
return 0
if not transcript_path.exists():
debug(f"Transcript path does not exist: {transcript_path}")
return 0
try:
resource_attrs = {
"service.name": "claude-code",
"session.id": session_id,
"host.name": HOSTNAME,
}
if USER_ID:
resource_attrs["user.id"] = USER_ID
openobserve_init_traces(
resource_attributes=resource_attrs,
)
tracer = trace.get_tracer("claude-code-hook")
except Exception as e:
debug(f"Failed to initialize OpenObserve traces: {e}")
return 0
try:
with FileLock(LOCK_FILE):
state = load_state()
key = state_key(session_id, str(transcript_path))
ss = load_session_state(state, key)
msgs, ss = read_new_jsonl(transcript_path, ss)
if not msgs:
write_session_state(state, key, ss)
save_state(state)
return 0
turns = build_turns(msgs)
if not turns:
write_session_state(state, key, ss)
save_state(state)
return 0
# emit turns
emitted = 0
for t in turns:
emitted += 1
turn_num = ss.turn_count + emitted
try:
emit_turn(tracer, session_id, turn_num, t, transcript_path)
except Exception as e:
debug(f"emit_turn failed: {e}")
# continue emitting other turns
ss.turn_count += emitted
write_session_state(state, key, ss)
save_state(state)
try:
openobserve_flush()
except Exception:
pass
dur = time.time() - start
info(f"Processed {emitted} turns in {dur:.2f}s (session={session_id})")
return 0
except Exception as e:
debug(f"Unexpected failure: {e}")
return 0
finally:
try:
openobserve_shutdown()
except Exception:
pass
if __name__ == "__main__":
sys.exit(main())