From 2d9178b9c9dab4bb86fd48646b0fe61894e0db2a Mon Sep 17 00:00:00 2001 From: KorenKrita Date: Mon, 11 May 2026 18:56:02 +0800 Subject: [PATCH 1/3] fix: improve compaction detection with cache_read validation Previously, compaction was detected solely by context size dropping >30% between turns. This caused false positives from normal cache hit rate fluctuations where inp+cr varies between tool calls. Now requires both conditions: - Context drops > 30% between consecutive turns - cache_read drops > 80% (old cache invalidated by truncation) Real compaction invalidates cached conversation prefix, causing cache_read to plummet. Normal cache fluctuations don't trigger this. --- src/collector/claude.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/collector/claude.rs b/src/collector/claude.rs index 807078c..fbfabf3 100644 --- a/src/collector/claude.rs +++ b/src/collector/claude.rs @@ -452,6 +452,7 @@ impl ClaudeCollector { total_cache_create: 0, last_context_tokens: 0, max_context_tokens: 0, + prev_cache_read: 0, context_history: Vec::new(), compaction_count: 0, turn_count: 0, @@ -1149,6 +1150,8 @@ struct TranscriptResult { last_context_tokens: u64, /// High-water mark: largest context seen in any turn (for 1M detection) max_context_tokens: u64, + /// cache_read from the previous turn (for compaction detection) + prev_cache_read: u64, /// Per-turn context sizes for evolution visualization. context_history: Vec, /// Detected compaction events (context dropped > 30% between consecutive turns). @@ -1238,6 +1241,7 @@ fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult { total_cache_create: 0, last_context_tokens: 0, max_context_tokens: 0, + prev_cache_read: 0, context_history: Vec::new(), compaction_count: 0, turn_count: 0, @@ -1390,11 +1394,18 @@ fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult { result.max_context_tokens = result.last_context_tokens; } // Detect compaction: context drops > 30% between turns + // AND cache_read drops hard (old cache invalidated). + // This avoids false positives from normal cache hit rate + // fluctuations where total context varies but no + // actual conversation truncation occurred. if prev_context > 0 && result.last_context_tokens < prev_context * 7 / 10 + && result.prev_cache_read > 1000 + && cr < result.prev_cache_read / 5 { result.compaction_count += 1; } + result.prev_cache_read = cr; if result.context_history.len() < 10_000 { result.context_history.push(result.last_context_tokens); } From 0581a82fd2ea4f4134051e50b119329c27422595 Mon Sep 17 00:00:00 2001 From: KorenKrita Date: Mon, 11 May 2026 18:57:46 +0800 Subject: [PATCH 2/3] fix: sync prev_cache_read in delta merge The prev_cache_read field was not propagated during incremental transcript parsing (delta merge), causing it to remain 0 on subsequent collection cycles. --- src/collector/claude.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/collector/claude.rs b/src/collector/claude.rs index fbfabf3..da98f83 100644 --- a/src/collector/claude.rs +++ b/src/collector/claude.rs @@ -388,6 +388,9 @@ impl ClaudeCollector { if delta.max_context_tokens > prev.max_context_tokens { prev.max_context_tokens = delta.max_context_tokens; } + if delta.prev_cache_read > 0 { + prev.prev_cache_read = delta.prev_cache_read; + } prev.turn_count += delta.turn_count; // Always update current_task from delta — empty means // latest assistant turn had no tool_use (task cleared) From eef8f6531e9df9c6938d48cea5e101aedd56b97d Mon Sep 17 00:00:00 2001 From: graykode Date: Wed, 13 May 2026 14:02:42 +0900 Subject: [PATCH 3/3] fix: preserve compaction detection while tailing --- src/collector/claude.rs | 163 +++++++++++++++++++++++++++++++++++----- 1 file changed, 145 insertions(+), 18 deletions(-) diff --git a/src/collector/claude.rs b/src/collector/claude.rs index da98f83..ce6dcbc 100644 --- a/src/collector/claude.rs +++ b/src/collector/claude.rs @@ -356,7 +356,7 @@ impl ClaudeCollector { if let Some(ref tp) = transcript_path { let cached = self.transcript_cache.remove(&sf.session_id); - // Detect file replacement: if inode or mtime changed, reparse from scratch + // Detect file replacement: if the file identity changed, reparse from scratch. let identity_changed = cached .as_ref() .map(|c| c.file_identity != file_identity(tp)) @@ -367,7 +367,20 @@ impl ClaudeCollector { cached.as_ref().map(|c| c.new_offset).unwrap_or(0) }; - let delta = parse_transcript(tp, from_offset); + let (initial_context_tokens, initial_cache_read) = if from_offset > 0 { + cached + .as_ref() + .map(|c| (c.last_context_tokens, c.prev_cache_read)) + .unwrap_or((0, 0)) + } else { + (0, 0) + }; + let delta = parse_transcript_with_previous( + tp, + from_offset, + initial_context_tokens, + initial_cache_read, + ); if let Some(mut prev) = cached { // File replaced, shrank, or first parse — replace entirely @@ -388,9 +401,10 @@ impl ClaudeCollector { if delta.max_context_tokens > prev.max_context_tokens { prev.max_context_tokens = delta.max_context_tokens; } - if delta.prev_cache_read > 0 { + if delta.last_context_tokens > 0 { prev.prev_cache_read = delta.prev_cache_read; } + prev.compaction_count += delta.compaction_count; prev.turn_count += delta.turn_count; // Always update current_task from delta — empty means // latest assistant turn had no tool_use (task cleared) @@ -1199,21 +1213,12 @@ fn is_symlink(path: &Path) -> bool { .unwrap_or(true) } -/// Get file identity as (inode, mtime_nanos) for detecting file replacement. +/// Get file identity as (device, inode) for detecting file replacement. #[cfg(unix)] fn file_identity(path: &Path) -> (u64, u64) { fs::metadata(path) .ok() - .map(|m| { - let ino = m.ino(); - let mtime_ns = m - .modified() - .ok() - .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) - .map(|d| d.as_nanos() as u64) - .unwrap_or(0); - (ino, mtime_ns) - }) + .map(|m| (m.dev(), m.ino())) .unwrap_or((0, 0)) } @@ -1235,6 +1240,15 @@ fn file_identity(path: &Path) -> (u64, u64) { } fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult { + parse_transcript_with_previous(path, from_offset, 0, 0) +} + +fn parse_transcript_with_previous( + path: &Path, + from_offset: u64, + initial_context_tokens: u64, + initial_cache_read: u64, +) -> TranscriptResult { let identity = file_identity(path); let mut result = TranscriptResult { model: "-".to_string(), @@ -1283,6 +1297,16 @@ fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult { from_offset }; let from_offset = effective_offset; + let mut prev_context_tokens = if from_offset > 0 { + initial_context_tokens + } else { + 0 + }; + let mut prev_cache_read = if from_offset > 0 { + initial_cache_read + } else { + 0 + }; let mut reader = BufReader::new(file); if from_offset > 0 { @@ -1387,12 +1411,13 @@ fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult { // Exception: when cache_read = 0 but cache_creation > 0, // this is a fresh session creating cache for the first time, // so cache_creation represents the actual context size. - let prev_context = result.last_context_tokens; - result.last_context_tokens = if cr == 0 && cc > 0 { + let prev_context = prev_context_tokens; + let current_context = if cr == 0 && cc > 0 { inp + cc } else { inp + cr }; + result.last_context_tokens = current_context; if result.last_context_tokens > result.max_context_tokens { result.max_context_tokens = result.last_context_tokens; } @@ -1403,11 +1428,13 @@ fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult { // actual conversation truncation occurred. if prev_context > 0 && result.last_context_tokens < prev_context * 7 / 10 - && result.prev_cache_read > 1000 - && cr < result.prev_cache_read / 5 + && prev_cache_read > 1000 + && cr < prev_cache_read / 5 { result.compaction_count += 1; } + prev_context_tokens = current_context; + prev_cache_read = cr; result.prev_cache_read = cr; if result.context_history.len() < 10_000 { result.context_history.push(result.last_context_tokens); @@ -2033,6 +2060,29 @@ mod tests { assert_eq!(result.new_offset, file_len); } + #[cfg(unix)] + #[test] + fn test_file_identity_is_stable_across_append() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + write_lines( + &mut file, + &[r#"{"type":"assistant","message":{"usage":{"input_tokens":1}}}"#], + ); + let before = file_identity(file.path()); + + std::thread::sleep(std::time::Duration::from_millis(20)); + write_lines( + &mut file, + &[r#"{"type":"assistant","message":{"usage":{"input_tokens":2}}}"#], + ); + let after = file_identity(file.path()); + + assert_eq!( + before, after, + "appending to an existing transcript must keep the same file identity so the collector can tail new bytes" + ); + } + #[test] fn test_parse_transcript_non_turn_lines_do_not_set_saw_turn() { // A delta that only processes non-user/non-assistant entries (e.g. @@ -2705,6 +2755,83 @@ n/Users/bob/.claude-alt/projects/-Users-bob-project/session.jsonl assert_eq!(session.initial_prompt, "profile specific prompt"); } + #[test] + fn test_load_session_detects_incremental_compaction_across_cached_boundary() { + let temp = tempfile::tempdir().unwrap(); + let profile = temp.path().join(".claude"); + let sessions = profile.join("sessions"); + let projects = profile.join("projects"); + let cwd = temp.path().join("repo"); + std::fs::create_dir_all(&sessions).unwrap(); + std::fs::create_dir_all(&projects).unwrap(); + std::fs::create_dir_all(&cwd).unwrap(); + + let pid = 5252; + let session_id = "session-5252"; + let session_path = sessions.join(format!("{}.json", pid)); + write_session_file(&session_path, pid, session_id, &cwd); + + let transcript_dir = projects.join(encode_cwd_path(cwd.to_str().unwrap())); + std::fs::create_dir_all(&transcript_dir).unwrap(); + let transcript_path = transcript_dir.join(format!("{}.jsonl", session_id)); + std::fs::write( + &transcript_path, + r#"{"type":"user","timestamp":"2026-03-28T15:00:00Z","message":{"role":"user","content":"start"}} +{"type":"assistant","timestamp":"2026-03-28T15:00:05Z","message":{"model":"claude-sonnet-4-6","usage":{"input_tokens":100000,"output_tokens":50,"cache_read_input_tokens":100000,"cache_creation_input_tokens":0},"content":[{"type":"text","text":"first"}]}} +"#, + ) + .unwrap(); + + let mut collector = ClaudeCollector::new(); + let process_info = make_proc_info(pid, "claude"); + let children_map = HashMap::new(); + let ports = HashMap::new(); + let config = ConfigDir::new(profile); + let ctx = + build_discovery_context(&[(session_path.clone(), config.clone())], &process_info, 0); + + let first = collector + .load_session( + &session_path, + &config, + &process_info, + &children_map, + &ports, + &ctx, + ) + .unwrap(); + assert_eq!(first.compaction_count, 0); + + let mut transcript = std::fs::OpenOptions::new() + .append(true) + .open(&transcript_path) + .unwrap(); + writeln!( + transcript, + r#"{{"type":"user","timestamp":"2026-03-28T15:01:00Z","message":{{"role":"user","content":"continue"}}}}"# + ) + .unwrap(); + writeln!( + transcript, + r#"{{"type":"assistant","timestamp":"2026-03-28T15:01:05Z","message":{{"model":"claude-sonnet-4-6","usage":{{"input_tokens":20000,"output_tokens":30,"cache_read_input_tokens":100,"cache_creation_input_tokens":0}},"content":[{{"type":"text","text":"after compact"}}]}}}}"# + ) + .unwrap(); + transcript.flush().unwrap(); + + let second = collector + .load_session( + &session_path, + &config, + &process_info, + &children_map, + &ports, + &ctx, + ) + .unwrap(); + + assert_eq!(second.compaction_count, 1); + } + #[test] fn test_parse_transcript_basic_tokens() { let mut file = tempfile::NamedTempFile::new().unwrap();