Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 156 additions & 15 deletions src/collector/claude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl ClaudeCollector {

if let Some(ref tp) = transcript_path {
let cached = self.transcript_cache.remove(&sf.session_id);
// Detect file replacement: if inode or mtime changed, reparse from scratch
// Detect file replacement: if the file identity changed, reparse from scratch.
let identity_changed = cached
.as_ref()
.map(|c| c.file_identity != file_identity(tp))
Expand All @@ -367,7 +367,20 @@ impl ClaudeCollector {
cached.as_ref().map(|c| c.new_offset).unwrap_or(0)
};

let delta = parse_transcript(tp, from_offset);
let (initial_context_tokens, initial_cache_read) = if from_offset > 0 {
cached
.as_ref()
.map(|c| (c.last_context_tokens, c.prev_cache_read))
.unwrap_or((0, 0))
} else {
(0, 0)
};
let delta = parse_transcript_with_previous(
tp,
from_offset,
initial_context_tokens,
initial_cache_read,
);

if let Some(mut prev) = cached {
// File replaced, shrank, or first parse — replace entirely
Expand All @@ -388,6 +401,10 @@ impl ClaudeCollector {
if delta.max_context_tokens > prev.max_context_tokens {
prev.max_context_tokens = delta.max_context_tokens;
}
if delta.last_context_tokens > 0 {
prev.prev_cache_read = delta.prev_cache_read;
}
prev.compaction_count += delta.compaction_count;
prev.turn_count += delta.turn_count;
// Always update current_task from delta — empty means
// latest assistant turn had no tool_use (task cleared)
Expand Down Expand Up @@ -452,6 +469,7 @@ impl ClaudeCollector {
total_cache_create: 0,
last_context_tokens: 0,
max_context_tokens: 0,
prev_cache_read: 0,
context_history: Vec::new(),
compaction_count: 0,
turn_count: 0,
Expand Down Expand Up @@ -1149,6 +1167,8 @@ struct TranscriptResult {
last_context_tokens: u64,
/// High-water mark: largest context seen in any turn (for 1M detection)
max_context_tokens: u64,
/// cache_read from the previous turn (for compaction detection)
prev_cache_read: u64,
/// Per-turn context sizes for evolution visualization.
context_history: Vec<u64>,
/// Detected compaction events (context dropped > 30% between consecutive turns).
Expand Down Expand Up @@ -1193,21 +1213,12 @@ fn is_symlink(path: &Path) -> bool {
.unwrap_or(true)
}

/// Get file identity as (inode, mtime_nanos) for detecting file replacement.
/// Get file identity as (device, inode) for detecting file replacement.
#[cfg(unix)]
fn file_identity(path: &Path) -> (u64, u64) {
fs::metadata(path)
.ok()
.map(|m| {
let ino = m.ino();
let mtime_ns = m
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
(ino, mtime_ns)
})
.map(|m| (m.dev(), m.ino()))
.unwrap_or((0, 0))
}

Expand All @@ -1229,6 +1240,15 @@ fn file_identity(path: &Path) -> (u64, u64) {
}

fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult {
parse_transcript_with_previous(path, from_offset, 0, 0)
}

fn parse_transcript_with_previous(
path: &Path,
from_offset: u64,
initial_context_tokens: u64,
initial_cache_read: u64,
) -> TranscriptResult {
let identity = file_identity(path);
let mut result = TranscriptResult {
model: "-".to_string(),
Expand All @@ -1238,6 +1258,7 @@ fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult {
total_cache_create: 0,
last_context_tokens: 0,
max_context_tokens: 0,
prev_cache_read: 0,
context_history: Vec::new(),
compaction_count: 0,
turn_count: 0,
Expand Down Expand Up @@ -1276,6 +1297,16 @@ fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult {
from_offset
};
let from_offset = effective_offset;
let mut prev_context_tokens = if from_offset > 0 {
initial_context_tokens
} else {
0
};
let mut prev_cache_read = if from_offset > 0 {
initial_cache_read
} else {
0
};

let mut reader = BufReader::new(file);
if from_offset > 0 {
Expand Down Expand Up @@ -1380,21 +1411,31 @@ fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult {
// Exception: when cache_read = 0 but cache_creation > 0,
// this is a fresh session creating cache for the first time,
// so cache_creation represents the actual context size.
let prev_context = result.last_context_tokens;
result.last_context_tokens = if cr == 0 && cc > 0 {
let prev_context = prev_context_tokens;
let current_context = if cr == 0 && cc > 0 {
inp + cc
} else {
inp + cr
};
result.last_context_tokens = current_context;
if result.last_context_tokens > result.max_context_tokens {
result.max_context_tokens = result.last_context_tokens;
}
// Detect compaction: context drops > 30% between turns
// AND cache_read drops hard (old cache invalidated).
// This avoids false positives from normal cache hit rate
// fluctuations where total context varies but no
// actual conversation truncation occurred.
if prev_context > 0
&& result.last_context_tokens < prev_context * 7 / 10
&& prev_cache_read > 1000
&& cr < prev_cache_read / 5
{
result.compaction_count += 1;
}
prev_context_tokens = current_context;
prev_cache_read = cr;
result.prev_cache_read = cr;
if result.context_history.len() < 10_000 {
result.context_history.push(result.last_context_tokens);
}
Expand Down Expand Up @@ -2019,6 +2060,29 @@ mod tests {
assert_eq!(result.new_offset, file_len);
}

#[cfg(unix)]
#[test]
fn test_file_identity_is_stable_across_append() {
let mut file = tempfile::NamedTempFile::new().unwrap();
write_lines(
&mut file,
&[r#"{"type":"assistant","message":{"usage":{"input_tokens":1}}}"#],
);
let before = file_identity(file.path());

std::thread::sleep(std::time::Duration::from_millis(20));
write_lines(
&mut file,
&[r#"{"type":"assistant","message":{"usage":{"input_tokens":2}}}"#],
);
let after = file_identity(file.path());

assert_eq!(
before, after,
"appending to an existing transcript must keep the same file identity so the collector can tail new bytes"
);
}

#[test]
fn test_parse_transcript_non_turn_lines_do_not_set_saw_turn() {
// A delta that only processes non-user/non-assistant entries (e.g.
Expand Down Expand Up @@ -2691,6 +2755,83 @@ n/Users/bob/.claude-alt/projects/-Users-bob-project/session.jsonl
assert_eq!(session.initial_prompt, "profile specific prompt");
}

#[test]
fn test_load_session_detects_incremental_compaction_across_cached_boundary() {
let temp = tempfile::tempdir().unwrap();
let profile = temp.path().join(".claude");
let sessions = profile.join("sessions");
let projects = profile.join("projects");
let cwd = temp.path().join("repo");
std::fs::create_dir_all(&sessions).unwrap();
std::fs::create_dir_all(&projects).unwrap();
std::fs::create_dir_all(&cwd).unwrap();

let pid = 5252;
let session_id = "session-5252";
let session_path = sessions.join(format!("{}.json", pid));
write_session_file(&session_path, pid, session_id, &cwd);

let transcript_dir = projects.join(encode_cwd_path(cwd.to_str().unwrap()));
std::fs::create_dir_all(&transcript_dir).unwrap();
let transcript_path = transcript_dir.join(format!("{}.jsonl", session_id));
std::fs::write(
&transcript_path,
r#"{"type":"user","timestamp":"2026-03-28T15:00:00Z","message":{"role":"user","content":"start"}}
{"type":"assistant","timestamp":"2026-03-28T15:00:05Z","message":{"model":"claude-sonnet-4-6","usage":{"input_tokens":100000,"output_tokens":50,"cache_read_input_tokens":100000,"cache_creation_input_tokens":0},"content":[{"type":"text","text":"first"}]}}
"#,
)
.unwrap();

let mut collector = ClaudeCollector::new();
let process_info = make_proc_info(pid, "claude");
let children_map = HashMap::new();
let ports = HashMap::new();
let config = ConfigDir::new(profile);
let ctx =
build_discovery_context(&[(session_path.clone(), config.clone())], &process_info, 0);

let first = collector
.load_session(
&session_path,
&config,
&process_info,
&children_map,
&ports,
&ctx,
)
.unwrap();
assert_eq!(first.compaction_count, 0);

let mut transcript = std::fs::OpenOptions::new()
.append(true)
.open(&transcript_path)
.unwrap();
writeln!(
transcript,
r#"{{"type":"user","timestamp":"2026-03-28T15:01:00Z","message":{{"role":"user","content":"continue"}}}}"#
)
.unwrap();
writeln!(
transcript,
r#"{{"type":"assistant","timestamp":"2026-03-28T15:01:05Z","message":{{"model":"claude-sonnet-4-6","usage":{{"input_tokens":20000,"output_tokens":30,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}},"content":[{{"type":"text","text":"after compact"}}]}}}}"#
)
.unwrap();
transcript.flush().unwrap();

let second = collector
.load_session(
&session_path,
&config,
&process_info,
&children_map,
&ports,
&ctx,
)
.unwrap();

assert_eq!(second.compaction_count, 1);
}

#[test]
fn test_parse_transcript_basic_tokens() {
let mut file = tempfile::NamedTempFile::new().unwrap();
Expand Down
Loading