From c507a6ac6f00556eb8b32878197ad218b2a0ff3e Mon Sep 17 00:00:00 2001 From: wsp1911 Date: Thu, 26 Mar 2026 21:38:00 +0800 Subject: [PATCH 1/5] feat: add structured fallback for session compression - add structured compression payload metadata to reminder messages - fall back to sanitized local turn compression when model summarization fails - track whether compression used a model-generated summary - reorganize session compression into dedicated modules and add tests --- src/crates/core/src/agentic/core/message.rs | 87 +++++ src/crates/core/src/agentic/core/mod.rs | 4 +- .../src/agentic/execution/execution_engine.rs | 6 +- .../session/compression/fallback/builder.rs | 169 ++++++++++ .../session/compression/fallback/mod.rs | 29 ++ .../session/compression/fallback/payload.rs | 165 ++++++++++ .../session/compression/fallback/render.rs | 87 +++++ .../session/compression/fallback/sanitize.rs | 304 ++++++++++++++++++ .../session/compression/fallback/tests.rs | 80 +++++ .../session/compression/fallback/types.rs | 34 ++ .../manager.rs} | 136 ++++++-- .../src/agentic/session/compression/mod.rs | 7 + src/crates/core/src/agentic/session/mod.rs | 4 +- 13 files changed, 1080 insertions(+), 32 deletions(-) create mode 100644 src/crates/core/src/agentic/session/compression/fallback/builder.rs create mode 100644 src/crates/core/src/agentic/session/compression/fallback/mod.rs create mode 100644 src/crates/core/src/agentic/session/compression/fallback/payload.rs create mode 100644 src/crates/core/src/agentic/session/compression/fallback/render.rs create mode 100644 src/crates/core/src/agentic/session/compression/fallback/sanitize.rs create mode 100644 src/crates/core/src/agentic/session/compression/fallback/tests.rs create mode 100644 src/crates/core/src/agentic/session/compression/fallback/types.rs rename src/crates/core/src/agentic/session/{compression_manager.rs => compression/manager.rs} (84%) create mode 100644 src/crates/core/src/agentic/session/compression/mod.rs diff --git a/src/crates/core/src/agentic/core/message.rs b/src/crates/core/src/agentic/core/message.rs index d6f84ee7..6612237c 100644 --- a/src/crates/core/src/agentic/core/message.rs +++ b/src/crates/core/src/agentic/core/message.rs @@ -62,6 +62,8 @@ pub struct MessageMetadata { pub thinking_signature: Option, #[serde(skip_serializing_if = "Option::is_none")] pub semantic_kind: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub compression_payload: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -76,6 +78,85 @@ pub enum MessageSemanticKind { ComputerUsePostActionSnapshot, } +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct CompressionPayload { + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub entries: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum CompressionEntry { + ModelSummary { + text: String, + }, + Turn { + #[serde(skip_serializing_if = "Option::is_none")] + turn_id: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + messages: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + todo: Option, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedMessage { + pub role: CompressedMessageRole, + #[serde(skip_serializing_if = "Option::is_none")] + pub text: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub tool_calls: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CompressedMessageRole { + User, + Assistant, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedToolCall { + pub tool_name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub arguments: Option, + #[serde(default, skip_serializing_if = "is_false")] + pub is_error: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedTodoSnapshot { + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub todos: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub summary: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedTodoItem { + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + pub content: String, + pub status: String, +} + +impl CompressionPayload { + pub fn from_summary(text: String) -> Self { + Self { + entries: vec![CompressionEntry::ModelSummary { text }], + } + } + + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } +} + +fn is_false(value: &bool) -> bool { + !*value +} + impl From for AIMessage { fn from(msg: Message) -> Self { let role = match msg.role { @@ -378,6 +459,12 @@ impl Message { self } + pub fn with_compression_payload(mut self, compression_payload: CompressionPayload) -> Self { + self.metadata.compression_payload = Some(compression_payload); + self.metadata.tokens = None; + self + } + /// Set message's thinking_signature (for Anthropic extended thinking multi-turn conversations) pub fn with_thinking_signature(mut self, signature: Option) -> Self { self.metadata.thinking_signature = signature; diff --git a/src/crates/core/src/agentic/core/mod.rs b/src/crates/core/src/agentic/core/mod.rs index d85ba603..5bd0c55f 100644 --- a/src/crates/core/src/agentic/core/mod.rs +++ b/src/crates/core/src/agentic/core/mod.rs @@ -11,7 +11,9 @@ pub mod session; pub mod state; pub use dialog_turn::{DialogTurn, DialogTurnState, TurnStats}; pub use message::{ - Message, MessageContent, MessageRole, MessageSemanticKind, ToolCall, ToolResult, + CompressedMessage, CompressedMessageRole, CompressedTodoItem, CompressedTodoSnapshot, + CompressedToolCall, CompressionEntry, CompressionPayload, Message, MessageContent, MessageRole, + MessageSemanticKind, ToolCall, ToolResult, }; pub use messages_helper::MessageHelper; pub use model_round::ModelRound; diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 46e20461..dadcd25b 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -487,9 +487,9 @@ impl ExecutionEngine { .compress_turns(session_id, context_window, turn_index_to_keep, turns) .await { - Ok(compressed_messages) => { + Ok(compression_result) => { let mut new_messages = vec![system_prompt_message]; - new_messages.extend(compressed_messages); + new_messages.extend(compression_result.messages); // Update session compression state session.compression_state.increment_compression_count(); @@ -526,7 +526,7 @@ impl ExecutionEngine { tokens_after: compressed_tokens, compression_ratio: (compressed_tokens as f64) / (current_tokens as f64), duration_ms, - has_summary: true, + has_summary: compression_result.has_model_summary, subagent_parent_info: event_subagent_parent_info.clone(), }, EventPriority::Normal, diff --git a/src/crates/core/src/agentic/session/compression/fallback/builder.rs b/src/crates/core/src/agentic/session/compression/fallback/builder.rs new file mode 100644 index 00000000..0376f4ea --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/builder.rs @@ -0,0 +1,169 @@ +use super::sanitize::{ + sanitize_assistant_text, sanitize_todo_snapshot, sanitize_tool_arguments, sanitize_user_text, +}; +use super::types::CompressionFallbackOptions; +use crate::agentic::core::{ + strip_prompt_markup, CompressedMessage, CompressedMessageRole, CompressedTodoSnapshot, + CompressedToolCall, CompressionEntry, Message, MessageContent, MessageRole, + MessageSemanticKind, +}; + +pub(super) fn build_entries_from_turns( + turns: Vec>, + options: &CompressionFallbackOptions, +) -> Vec { + let mut entries = Vec::new(); + + for turn in turns { + build_entries_from_messages(turn, options, &mut entries); + } + + entries +} + +fn build_entries_from_messages( + messages: Vec, + options: &CompressionFallbackOptions, + output: &mut Vec, +) { + let turn_id = messages + .first() + .and_then(|message| message.metadata.turn_id.clone()); + let mut turn_messages = Vec::new(); + let mut latest_todo = None; + + for message in messages { + if let Some(entries) = extract_nested_compression_entries(&message) { + flush_turn_entry( + output, + turn_id.clone(), + &mut turn_messages, + &mut latest_todo, + ); + output.extend(entries); + continue; + } + + match message.content { + MessageContent::Text(text) => match message.role { + MessageRole::User => { + if let Some(text) = sanitize_user_text(&text, options) { + turn_messages.push(CompressedMessage { + role: CompressedMessageRole::User, + text: Some(text), + tool_calls: Vec::new(), + }); + } + } + MessageRole::Assistant => { + if let Some(text) = sanitize_assistant_text(&text, options) { + turn_messages.push(CompressedMessage { + role: CompressedMessageRole::Assistant, + text: Some(text), + tool_calls: Vec::new(), + }); + } + } + MessageRole::System | MessageRole::Tool => {} + }, + MessageContent::Multimodal { text, images } => { + if message.role == MessageRole::User { + let mut rendered = sanitize_user_text(&text, options).unwrap_or_default(); + if !images.is_empty() { + if !rendered.is_empty() { + rendered.push('\n'); + } + rendered.push_str(&format!("[{} image(s) omitted]", images.len())); + } + if !rendered.trim().is_empty() { + turn_messages.push(CompressedMessage { + role: CompressedMessageRole::User, + text: Some(rendered), + tool_calls: Vec::new(), + }); + } + } + } + MessageContent::Mixed { + text, tool_calls, .. + } => { + if message.role != MessageRole::Assistant { + continue; + } + + let mut compressed_tool_calls = Vec::new(); + + for tool_call in tool_calls { + if tool_call.tool_name == "TodoWrite" { + latest_todo = sanitize_todo_snapshot(&tool_call.arguments); + continue; + } + + let compressed_tool_call = CompressedToolCall { + tool_name: tool_call.tool_name.clone(), + arguments: sanitize_tool_arguments( + &tool_call.tool_name, + &tool_call.arguments, + options, + ), + is_error: tool_call.is_error, + }; + compressed_tool_calls.push(compressed_tool_call); + } + + let sanitized_text = sanitize_assistant_text(&text, options); + if sanitized_text.is_some() || !compressed_tool_calls.is_empty() { + turn_messages.push(CompressedMessage { + role: CompressedMessageRole::Assistant, + text: sanitized_text, + tool_calls: compressed_tool_calls, + }); + } + } + MessageContent::ToolResult { .. } => {} + } + } + + flush_turn_entry(output, turn_id, &mut turn_messages, &mut latest_todo); +} + +fn flush_turn_entry( + output: &mut Vec, + turn_id: Option, + turn_messages: &mut Vec, + latest_todo: &mut Option, +) { + if turn_messages.is_empty() && latest_todo.is_none() { + return; + } + + output.push(CompressionEntry::Turn { + turn_id, + messages: std::mem::take(turn_messages), + todo: latest_todo.take(), + }); +} + +fn extract_nested_compression_entries(message: &Message) -> Option> { + if message.metadata.semantic_kind != Some(MessageSemanticKind::InternalReminder) { + return None; + } + + if let Some(payload) = message.metadata.compression_payload.clone() { + if !payload.is_empty() { + return Some(payload.entries); + } + } + + let raw_text = match &message.content { + MessageContent::Text(text) => text.clone(), + MessageContent::Multimodal { text, .. } => text.clone(), + _ => String::new(), + }; + let stripped = strip_prompt_markup(&raw_text); + if stripped.is_empty() { + return None; + } + + Some(vec![CompressionEntry::ModelSummary { text: stripped }]) +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/mod.rs b/src/crates/core/src/agentic/session/compression/fallback/mod.rs new file mode 100644 index 00000000..3c074f69 --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/mod.rs @@ -0,0 +1,29 @@ +mod builder; +mod payload; +mod render; +mod sanitize; +mod types; + +use builder::build_entries_from_turns; +use payload::trim_payload_to_budget; +use render::render_payload_for_model; + +pub use types::{CompressionFallbackOptions, CompressionReminder}; + +pub fn build_structured_compression_reminder( + turns: Vec>, + options: &CompressionFallbackOptions, +) -> CompressionReminder { + let entries = build_entries_from_turns(turns, options); + let trimmed_payload = trim_payload_to_budget(entries, options); + let model_text = render_payload_for_model(&trimmed_payload); + + CompressionReminder { + model_text, + payload: trimmed_payload, + used_model_summary: false, + } +} + +#[cfg(test)] +mod tests; diff --git a/src/crates/core/src/agentic/session/compression/fallback/payload.rs b/src/crates/core/src/agentic/session/compression/fallback/payload.rs new file mode 100644 index 00000000..03560abb --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/payload.rs @@ -0,0 +1,165 @@ +use super::render::render_payload_for_model; +use super::types::{CompressionFallbackOptions, CompressionUnit}; +use crate::agentic::core::{ + render_system_reminder, CompressedMessage, CompressedTodoSnapshot, CompressionEntry, + CompressionPayload, Message, +}; + +pub(super) fn trim_payload_to_budget( + entries: Vec, + options: &CompressionFallbackOptions, +) -> CompressionPayload { + if entries.is_empty() { + return CompressionPayload::default(); + } + + let units = flatten_entries_to_units(entries); + let mut selected_units = Vec::new(); + + for unit in units.into_iter().rev() { + let mut candidate_units = vec![unit.clone()]; + candidate_units.extend(selected_units.clone()); + + let candidate_payload = rebuild_payload_from_units(candidate_units); + if estimate_payload_tokens(&candidate_payload) <= options.max_tokens { + selected_units.insert(0, unit); + } + } + + rebuild_payload_from_units(selected_units) +} + +fn flatten_entries_to_units(entries: Vec) -> Vec { + let mut units = Vec::new(); + + for (entry_id, entry) in entries.into_iter().enumerate() { + match entry { + CompressionEntry::ModelSummary { text } => { + units.push(CompressionUnit::ModelSummary { text }); + } + CompressionEntry::Turn { + turn_id, + messages, + todo, + } => { + for message in messages { + units.push(CompressionUnit::TurnMessage { + entry_id, + turn_id: turn_id.clone(), + message, + }); + } + if let Some(todo) = todo { + units.push(CompressionUnit::TurnTodo { + entry_id, + turn_id, + todo, + }); + } + } + } + } + + units +} + +fn rebuild_payload_from_units(units: Vec) -> CompressionPayload { + let mut entries = Vec::new(); + let mut current_turn_entry_id: Option = None; + let mut current_turn_id: Option = None; + let mut current_messages = Vec::new(); + let mut current_todo = None; + + for unit in units { + match unit { + CompressionUnit::ModelSummary { text } => { + flush_rebuilt_turn( + &mut entries, + &mut current_turn_entry_id, + &mut current_turn_id, + &mut current_messages, + &mut current_todo, + ); + entries.push(CompressionEntry::ModelSummary { text }); + } + CompressionUnit::TurnMessage { + entry_id, + turn_id, + message, + } => { + if current_turn_entry_id != Some(entry_id) { + flush_rebuilt_turn( + &mut entries, + &mut current_turn_entry_id, + &mut current_turn_id, + &mut current_messages, + &mut current_todo, + ); + current_turn_entry_id = Some(entry_id); + current_turn_id = turn_id; + } + current_messages.push(message); + } + CompressionUnit::TurnTodo { + entry_id, + turn_id, + todo, + } => { + if current_turn_entry_id != Some(entry_id) { + flush_rebuilt_turn( + &mut entries, + &mut current_turn_entry_id, + &mut current_turn_id, + &mut current_messages, + &mut current_todo, + ); + current_turn_entry_id = Some(entry_id); + current_turn_id = turn_id; + } + current_todo = Some(todo); + } + } + } + + flush_rebuilt_turn( + &mut entries, + &mut current_turn_entry_id, + &mut current_turn_id, + &mut current_messages, + &mut current_todo, + ); + + CompressionPayload { entries } +} + +fn flush_rebuilt_turn( + entries: &mut Vec, + current_turn_entry_id: &mut Option, + current_turn_id: &mut Option, + current_messages: &mut Vec, + current_todo: &mut Option, +) { + if current_turn_entry_id.is_none() { + return; + } + + if current_messages.is_empty() && current_todo.is_none() { + *current_turn_entry_id = None; + *current_turn_id = None; + return; + } + + entries.push(CompressionEntry::Turn { + turn_id: current_turn_id.clone(), + messages: std::mem::take(current_messages), + todo: current_todo.take(), + }); + *current_turn_entry_id = None; + *current_turn_id = None; +} + +fn estimate_payload_tokens(payload: &CompressionPayload) -> usize { + let rendered = render_payload_for_model(payload); + let mut reminder = Message::user(render_system_reminder(&rendered)); + reminder.get_tokens() +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/render.rs b/src/crates/core/src/agentic/session/compression/fallback/render.rs new file mode 100644 index 00000000..f5ca3986 --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/render.rs @@ -0,0 +1,87 @@ +use crate::agentic::core::{ + CompressedMessage, CompressedMessageRole, CompressionEntry, CompressionPayload, +}; +use serde_json::{json, Value}; + +pub(super) fn render_payload_for_model(payload: &CompressionPayload) -> String { + if payload.entries.is_empty() { + return [ + "Earlier conversation has been condensed for context management.", + "The omitted history could not be kept within the available context budget.", + ] + .join("\n\n"); + } + + let mut sections = vec![ + "Earlier conversation has been condensed for context management.".to_string(), + "The history below is a partial record. Message text, tool arguments, and task lists may have been truncated or omitted during compression. All tool results have been cleared from this compressed history. Treat it as approximate historical context rather than a complete verbatim transcript.".to_string(), + ]; + + for (index, entry) in payload.entries.iter().enumerate() { + match entry { + CompressionEntry::ModelSummary { text } => { + sections.push(format!( + "Earlier summarized history {}:\n{}", + index + 1, + text + )); + } + CompressionEntry::Turn { messages, todo, .. } => { + let mut lines = vec![format!("Historical turn {}:", index + 1)]; + for message in messages { + render_compressed_message(&mut lines, message); + } + if let Some(todo) = todo { + lines.push("Latest task list for this turn:".to_string()); + if todo.todos.is_empty() { + if let Some(summary) = todo.summary.as_ref() { + lines.push(format!("- {}", summary)); + } + } else { + for todo_item in &todo.todos { + lines.push(format!("- [{}] {}", todo_item.status, todo_item.content)); + } + if let Some(summary) = todo.summary.as_ref() { + lines.push(format!("Task list note: {}", summary)); + } + } + } + sections.push(lines.join("\n")); + } + } + } + + sections.join("\n\n") +} + +fn render_compressed_message(lines: &mut Vec, message: &CompressedMessage) { + let role_label = match message.role { + CompressedMessageRole::User => "User", + CompressedMessageRole::Assistant => "Assistant", + }; + + if let Some(text) = message.text.as_ref() { + lines.push(format!("{role_label}: {text}")); + } else { + lines.push(format!("{role_label}:")); + } + + for tool_call in &message.tool_calls { + let mut rendered = tool_call.tool_name.clone(); + if let Some(arguments) = tool_call.arguments.as_ref() { + rendered.push(' '); + rendered.push_str(&render_tool_arguments(arguments)); + } + if tool_call.is_error { + rendered.push_str(" [error]"); + } + lines.push(format!("Tool call: {}", rendered)); + } +} + +fn render_tool_arguments(arguments: &Value) -> String { + if arguments.is_null() { + return "{}".to_string(); + } + serde_json::to_string(arguments).unwrap_or_else(|_| json!({}).to_string()) +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs b/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs new file mode 100644 index 00000000..c87f9e12 --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs @@ -0,0 +1,304 @@ +use super::types::CompressionFallbackOptions; +use crate::agentic::core::{CompressedTodoItem, CompressedTodoSnapshot}; +use serde_json::{Map, Value}; + +pub(super) fn sanitize_user_text( + text: &str, + options: &CompressionFallbackOptions, +) -> Option { + sanitize_text(text, options.user_chars) +} + +pub(super) fn sanitize_assistant_text( + text: &str, + options: &CompressionFallbackOptions, +) -> Option { + sanitize_text(text, options.assistant_chars) +} + +pub(super) fn sanitize_todo_snapshot(arguments: &Value) -> Option { + let todos = arguments.get("todos")?.as_array()?; + let mut compressed_todos = Vec::new(); + + for todo in todos { + let Some(todo_object) = todo.as_object() else { + continue; + }; + let Some(content) = todo_object + .get("content") + .and_then(Value::as_str) + .map(str::trim) + .filter(|content| !content.is_empty()) + else { + continue; + }; + let status = todo_object + .get("status") + .and_then(Value::as_str) + .unwrap_or("pending"); + let id = todo_object + .get("id") + .and_then(Value::as_str) + .map(str::to_string); + + compressed_todos.push(CompressedTodoItem { + id, + content: content.to_string(), + status: status.to_string(), + }); + } + + if compressed_todos.is_empty() { + return None; + } + + Some(CompressedTodoSnapshot { + todos: compressed_todos, + summary: None, + }) +} + +pub(super) fn sanitize_tool_arguments( + tool_name: &str, + arguments: &Value, + options: &CompressionFallbackOptions, +) -> Option { + let Some(object) = arguments.as_object() else { + return sanitize_generic_value(arguments, options); + }; + + let sanitized = match tool_name { + "Read" => { + let mut result = Map::new(); + copy_field(object, &mut result, "file_path"); + copy_field(object, &mut result, "start_line"); + copy_field(object, &mut result, "limit"); + result + } + "Write" => { + let mut result = Map::new(); + copy_field(object, &mut result, "file_path"); + insert_cleared_field(object, &mut result, "content"); + result + } + "Edit" => { + let mut result = Map::new(); + copy_field(object, &mut result, "file_path"); + copy_field(object, &mut result, "replace_all"); + insert_cleared_field(object, &mut result, "old_string"); + insert_cleared_field(object, &mut result, "new_string"); + result + } + "Grep" => { + let mut result = Map::new(); + for key in [ + "pattern", + "path", + "glob", + "type", + "head_limit", + "multiline", + "-A", + "-B", + "-C", + "-i", + "-n", + "output_mode", + ] { + copy_field(object, &mut result, key); + } + result + } + "Glob" => { + let mut result = Map::new(); + copy_field(object, &mut result, "pattern"); + copy_field(object, &mut result, "path"); + copy_field(object, &mut result, "limit"); + result + } + "LS" => { + let mut result = Map::new(); + copy_field(object, &mut result, "path"); + copy_field(object, &mut result, "ignore"); + copy_field(object, &mut result, "limit"); + result + } + "GetFileDiff" => { + let mut result = Map::new(); + copy_field(object, &mut result, "file_path"); + result + } + "DeleteFile" => { + let mut result = Map::new(); + copy_field(object, &mut result, "path"); + copy_field(object, &mut result, "recursive"); + result + } + "Git" => { + let mut result = Map::new(); + copy_field(object, &mut result, "operation"); + copy_field(object, &mut result, "working_directory"); + if let Some(args) = object.get("args") { + if let Some(value) = sanitize_generic_value(args, options) { + result.insert("args".to_string(), value); + } + } + result + } + "Bash" => { + let mut result = Map::new(); + insert_sanitize_text(object, &mut result, "command", options.tool_command_chars); + result + } + "TerminalControl" => { + let mut result = Map::new(); + copy_field(object, &mut result, "action"); + copy_field(object, &mut result, "terminal_session_id"); + result + } + "Skill" => { + let mut result = Map::new(); + copy_field(object, &mut result, "command"); + result + } + "CreatePlan" => { + let mut result = Map::new(); + copy_field(object, &mut result, "name"); + copy_field(object, &mut result, "overview"); + insert_cleared_field(object, &mut result, "plan"); + insert_cleared_field(object, &mut result, "todos"); + result + } + "WebSearch" => { + let mut result = Map::new(); + copy_field(object, &mut result, "query"); + result + } + "WebFetch" => { + let mut result = Map::new(); + copy_field(object, &mut result, "url"); + result + } + _ => sanitize_generic_object(object, options), + }; + + if sanitized.is_empty() { + None + } else { + Some(Value::Object(sanitized)) + } +} + +pub(super) fn sanitize_generic_object( + object: &Map, + options: &CompressionFallbackOptions, +) -> Map { + let mut sanitized = Map::new(); + + for (key, value) in object { + let heavy_string = matches!( + key.as_str(), + "content" + | "contents" + | "old_string" + | "new_string" + | "text" + | "output" + | "stdout" + | "stderr" + | "diff" + | "file_diff" + | "original_content" + | "new_content" + | "data_url" + | "data_base64" + ); + if heavy_string { + if let Some(text) = value.as_str() { + sanitized.insert( + format!("{key}_chars"), + Value::Number(serde_json::Number::from(text.chars().count() as u64)), + ); + } + continue; + } + + if let Some(value) = sanitize_generic_value(value, options) { + sanitized.insert(key.clone(), value); + } + } + + sanitized +} + +pub(super) fn sanitize_generic_value( + value: &Value, + options: &CompressionFallbackOptions, +) -> Option { + match value { + Value::Null => None, + Value::Bool(_) | Value::Number(_) => Some(value.clone()), + Value::String(text) => sanitize_text(text, options.tool_arg_chars).map(Value::String), + Value::Array(values) => { + let sanitized_values: Vec = values + .iter() + .take(5) + .filter_map(|value| sanitize_generic_value(value, options)) + .collect(); + if sanitized_values.is_empty() { + None + } else { + Some(Value::Array(sanitized_values)) + } + } + Value::Object(object) => { + let sanitized_object = sanitize_generic_object(object, options); + if sanitized_object.is_empty() { + None + } else { + Some(Value::Object(sanitized_object)) + } + } + } +} + +fn sanitize_text(text: &str, limit: usize) -> Option { + let trimmed = text.trim(); + if trimmed.is_empty() { + return None; + } + + let text_len = trimmed.chars().count(); + if text_len <= limit { + return Some(trimmed.to_string()); + } + + let mut truncated: String = trimmed.chars().take(limit).collect(); + truncated.push_str(" ... [truncated]"); + Some(truncated) +} + +fn copy_field(source: &Map, target: &mut Map, key: &str) { + if let Some(value) = source.get(key) { + target.insert(key.to_string(), value.clone()); + } +} + +fn insert_sanitize_text( + source: &Map, + target: &mut Map, + key: &str, + limit: usize, +) { + if let Some(value) = source.get(key).and_then(Value::as_str) { + if let Some(text) = sanitize_text(value, limit) { + target.insert(key.to_string(), Value::String(text)); + } + } +} + +fn insert_cleared_field(source: &Map, target: &mut Map, key: &str) { + if source.get(key).is_some() { + target.insert(key.to_string(), Value::String("[cleared]".to_string())); + } +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/tests.rs b/src/crates/core/src/agentic/session/compression/fallback/tests.rs new file mode 100644 index 00000000..c3720c04 --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/tests.rs @@ -0,0 +1,80 @@ +use super::{build_structured_compression_reminder, CompressionFallbackOptions}; +use crate::agentic::core::{ + render_system_reminder, CompressedMessageRole, CompressionEntry, CompressionPayload, Message, + MessageSemanticKind, ToolCall, ToolResult, +}; +use serde_json::json; + +fn default_options() -> CompressionFallbackOptions { + CompressionFallbackOptions { + max_tokens: 10_000, + user_chars: 120, + assistant_chars: 120, + tool_arg_chars: 80, + tool_command_chars: 80, + } +} + +#[test] +fn clears_tool_results_from_compressed_history() { + let assistant = Message::assistant_with_tools( + "Checking file".to_string(), + vec![ToolCall { + tool_id: "tool_1".to_string(), + tool_name: "Read".to_string(), + arguments: json!({ + "file_path": "/tmp/demo.rs", + "start_line": 1, + "limit": 20 + }), + is_error: false, + }], + ); + let tool_result = Message::tool_result(ToolResult { + tool_id: "tool_1".to_string(), + tool_name: "Read".to_string(), + result: json!({"content": "ignored"}), + result_for_assistant: Some("Read succeeded with file preview".to_string()), + is_error: false, + duration_ms: None, + image_attachments: None, + }); + + let reminder = build_structured_compression_reminder( + vec![vec![ + Message::user("inspect".to_string()), + assistant, + tool_result, + ]], + &default_options(), + ); + + let turn = match &reminder.payload.entries[0] { + CompressionEntry::Turn { messages, .. } => messages, + _ => panic!("expected turn entry"), + }; + let assistant_message = turn + .iter() + .find(|message| message.role == CompressedMessageRole::Assistant) + .expect("assistant message"); + + assert_eq!(assistant_message.tool_calls.len(), 1); + assert!(!reminder.model_text.contains("Tool result:")); + assert!(reminder.model_text.contains("All tool results have been cleared")); +} + +#[test] +fn reuses_existing_compression_payload_atomically() { + let prior_summary = "Previous conversation summary".to_string(); + let reminder_message = Message::user(render_system_reminder(&prior_summary)) + .with_semantic_kind(MessageSemanticKind::InternalReminder) + .with_compression_payload(CompressionPayload::from_summary(prior_summary.clone())); + + let reminder = + build_structured_compression_reminder(vec![vec![reminder_message]], &default_options()); + + assert!(matches!( + &reminder.payload.entries[0], + CompressionEntry::ModelSummary { text } if text == &prior_summary + )); +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/types.rs b/src/crates/core/src/agentic/session/compression/fallback/types.rs new file mode 100644 index 00000000..fc1be95c --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/types.rs @@ -0,0 +1,34 @@ +use crate::agentic::core::{CompressedMessage, CompressedTodoSnapshot, CompressionPayload}; + +#[derive(Debug, Clone)] +pub struct CompressionFallbackOptions { + pub max_tokens: usize, + pub user_chars: usize, + pub assistant_chars: usize, + pub tool_arg_chars: usize, + pub tool_command_chars: usize, +} + +#[derive(Debug, Clone)] +pub struct CompressionReminder { + pub model_text: String, + pub payload: CompressionPayload, + pub used_model_summary: bool, +} + +#[derive(Debug, Clone)] +pub(super) enum CompressionUnit { + ModelSummary { + text: String, + }, + TurnMessage { + entry_id: usize, + turn_id: Option, + message: CompressedMessage, + }, + TurnTodo { + entry_id: usize, + turn_id: Option, + todo: CompressedTodoSnapshot, + }, +} diff --git a/src/crates/core/src/agentic/session/compression_manager.rs b/src/crates/core/src/agentic/session/compression/manager.rs similarity index 84% rename from src/crates/core/src/agentic/session/compression_manager.rs rename to src/crates/core/src/agentic/session/compression/manager.rs index ac94e961..61c680cf 100644 --- a/src/crates/core/src/agentic/session/compression_manager.rs +++ b/src/crates/core/src/agentic/session/compression/manager.rs @@ -2,8 +2,12 @@ //! //! Responsible for managing session context compression +use super::fallback::{ + build_structured_compression_reminder, CompressionFallbackOptions, CompressionReminder, +}; use crate::agentic::core::{ - render_system_reminder, Message, MessageHelper, MessageRole, MessageSemanticKind, + render_system_reminder, CompressionPayload, Message, MessageHelper, MessageRole, + MessageSemanticKind, }; use crate::agentic::persistence::PersistenceManager; use crate::infrastructure::ai::{get_global_ai_client_factory, AIClient}; @@ -21,6 +25,11 @@ pub struct CompressionConfig { pub keep_turns_ratio: f32, pub keep_last_turn_ratio: f32, pub single_request_max_tokens_ratio: f32, + pub fallback_max_tokens_ratio: f32, + pub fallback_user_chars: usize, + pub fallback_assistant_chars: usize, + pub fallback_tool_arg_chars: usize, + pub fallback_tool_command_chars: usize, } impl Default for CompressionConfig { @@ -30,6 +39,11 @@ impl Default for CompressionConfig { keep_turns_ratio: 0.3, keep_last_turn_ratio: 0.4, single_request_max_tokens_ratio: 0.7, + fallback_max_tokens_ratio: 0.25, + fallback_user_chars: 1000, + fallback_assistant_chars: 1000, + fallback_tool_arg_chars: 100, + fallback_tool_command_chars: 100, } } } @@ -46,6 +60,12 @@ impl TurnWithTokens { } } +#[derive(Debug, Clone)] +pub struct CompressionResult { + pub messages: Vec, + pub has_model_summary: bool, +} + /// Context compression manager pub struct CompressionManager { /// Compressed message history (by session ID) @@ -206,15 +226,21 @@ impl CompressionManager { context_window: usize, turn_index_to_keep: usize, mut turns: Vec, - ) -> BitFunResult> { + ) -> BitFunResult { if turns.is_empty() { debug!("No turns need compression"); - return Ok(Vec::new()); + return Ok(CompressionResult { + messages: Vec::new(), + has_model_summary: false, + }); } let Some(last_turn_messages) = turns.last().map(|turn| &turn.messages) else { debug!("No turns available after split, skipping last-turn extraction"); - return Ok(Vec::new()); + return Ok(CompressionResult { + messages: Vec::new(), + has_model_summary: false, + }); }; let last_user_message = { last_turn_messages @@ -234,28 +260,15 @@ impl CompressionManager { let turns_to_keep = turns.split_off(turn_index_to_keep); let mut compressed_messages = Vec::new(); + let mut has_model_summary = false; if !turns.is_empty() { - // Dynamically get Agent client for generating summary - let ai_client_factory = get_global_ai_client_factory().await.map_err(|e| { - BitFunError::AIClient(format!("Failed to get AI client factory: {}", e)) - })?; - let ai_client = ai_client_factory - .get_client_by_func_agent("compression") - .await - .map_err(|e| BitFunError::AIClient(format!("Failed to get AI client: {}", e)))?; - - let summary = self - .execute_compression(ai_client, turns, context_window) + let reminder = self + .execute_compression_with_fallback(turns, context_window) .await?; - trace!("Compression summary: {}", summary); - - compressed_messages.push( - Message::user(render_system_reminder(&format!( - "Previous conversation is summarized below:\n{}", - summary - ))) - .with_semantic_kind(MessageSemanticKind::InternalReminder), - ); + trace!("Compression reminder generated"); + has_model_summary = reminder.used_model_summary; + + compressed_messages.push(self.create_reminder_message(reminder)); } if !turns_to_keep.is_empty() { @@ -305,7 +318,78 @@ impl CompressionManager { } } - Ok(compressed_messages) + Ok(CompressionResult { + messages: compressed_messages, + has_model_summary, + }) + } + + fn create_reminder_message(&self, reminder: CompressionReminder) -> Message { + Message::user(render_system_reminder(&reminder.model_text)) + .with_semantic_kind(MessageSemanticKind::InternalReminder) + .with_compression_payload(reminder.payload) + } + + async fn execute_compression_with_fallback( + &self, + turns_to_compress: Vec, + context_window: usize, + ) -> BitFunResult { + let summary_result = match get_global_ai_client_factory().await { + Ok(ai_client_factory) => match ai_client_factory + .get_client_by_func_agent("compression") + .await + { + Ok(ai_client) => { + self.execute_compression(ai_client, turns_to_compress.clone(), context_window) + .await + } + Err(err) => Err(BitFunError::AIClient(format!( + "Failed to get AI client: {}", + err + ))), + }, + Err(err) => Err(BitFunError::AIClient(format!( + "Failed to get AI client factory: {}", + err + ))), + }; + + match summary_result { + Ok(summary) => { + trace!("Compression summary: {}", summary); + Ok(CompressionReminder { + model_text: format!("Previous conversation is summarized below:\n{}", summary), + payload: CompressionPayload::from_summary(summary), + used_model_summary: true, + }) + } + Err(err) => { + warn!( + "Model-based compression failed, falling back to structured local compression: {}", + err + ); + let reminder = build_structured_compression_reminder( + turns_to_compress + .into_iter() + .map(|turn| turn.messages) + .collect(), + &self.build_fallback_options(context_window), + ); + Ok(reminder) + } + } + } + + fn build_fallback_options(&self, context_window: usize) -> CompressionFallbackOptions { + CompressionFallbackOptions { + max_tokens: ((context_window as f32 * self.config.fallback_max_tokens_ratio) as usize) + .max(256), + user_chars: self.config.fallback_user_chars, + assistant_chars: self.config.fallback_assistant_chars, + tool_arg_chars: self.config.fallback_tool_arg_chars, + tool_command_chars: self.config.fallback_tool_command_chars, + } } async fn execute_compression( @@ -450,7 +534,7 @@ Be thorough and precise. Do not lose important technical details from either the system_message_for_summary: Message, messages: Vec, ) -> BitFunResult { - self.generate_summary_with_retry(ai_client, system_message_for_summary, messages, 3) + self.generate_summary_with_retry(ai_client, system_message_for_summary, messages, 1) .await } diff --git a/src/crates/core/src/agentic/session/compression/mod.rs b/src/crates/core/src/agentic/session/compression/mod.rs new file mode 100644 index 00000000..8beb1f6b --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/mod.rs @@ -0,0 +1,7 @@ +//! Session context compression modules. + +pub mod fallback; +pub mod manager; + +pub use fallback::*; +pub use manager::*; diff --git a/src/crates/core/src/agentic/session/mod.rs b/src/crates/core/src/agentic/session/mod.rs index baac1fed..2ceb22ae 100644 --- a/src/crates/core/src/agentic/session/mod.rs +++ b/src/crates/core/src/agentic/session/mod.rs @@ -2,10 +2,10 @@ //! //! Provides session lifecycle management, message history, and context management -pub mod compression_manager; +pub mod compression; pub mod history_manager; pub mod session_manager; -pub use compression_manager::*; +pub use compression::*; pub use history_manager::*; pub use session_manager::*; From 927240cd4973c2355db4f898214c0095a3a43895 Mon Sep 17 00:00:00 2001 From: wsp1911 Date: Thu, 26 Mar 2026 21:38:00 +0800 Subject: [PATCH 2/5] refactor: remove unused legacy session history persistence - limit PersistenceManager to project-scoped session persistence - make history and compression managers in-memory only - simplify agentic system initialization across CLI, desktop, and server --- src/apps/cli/src/agent/agentic_system.rs | 16 +- src/apps/desktop/src/lib.rs | 16 +- src/apps/server/src/bootstrap.rs | 16 +- .../src/agentic/execution/execution_engine.rs | 14 +- .../core/src/agentic/persistence/manager.rs | 227 +----------------- .../agentic/session/compression/manager.rs | 49 +--- .../src/agentic/session/history_manager.rs | 80 +----- .../src/agentic/session/session_manager.rs | 12 +- 8 files changed, 33 insertions(+), 397 deletions(-) diff --git a/src/apps/cli/src/agent/agentic_system.rs b/src/apps/cli/src/agent/agentic_system.rs index d8c41baf..67108a6f 100644 --- a/src/apps/cli/src/agent/agentic_system.rs +++ b/src/apps/cli/src/agent/agentic_system.rs @@ -33,21 +33,9 @@ pub async fn init_agentic_system() -> Result { let path_manager = try_get_path_manager_arc()?; let persistence_manager = Arc::new(persistence::PersistenceManager::new(path_manager.clone())?); - let history_manager = Arc::new(session::MessageHistoryManager::new( - persistence_manager.clone(), - session::HistoryConfig { - enable_persistence: false, - ..Default::default() - }, - )); + let history_manager = Arc::new(session::MessageHistoryManager::new()); - let compression_manager = Arc::new(session::CompressionManager::new( - persistence_manager.clone(), - session::CompressionConfig { - enable_persistence: false, - ..Default::default() - }, - )); + let compression_manager = Arc::new(session::CompressionManager::new(Default::default())); let session_manager = Arc::new(session::SessionManager::new( history_manager.clone(), diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 4d3868d0..0c7d24f2 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -714,21 +714,9 @@ async fn init_agentic_system() -> anyhow::Result<( let path_manager = try_get_path_manager_arc()?; let persistence_manager = Arc::new(persistence::PersistenceManager::new(path_manager.clone())?); - let history_manager = Arc::new(session::MessageHistoryManager::new( - persistence_manager.clone(), - session::HistoryConfig { - enable_persistence: false, - ..Default::default() - }, - )); + let history_manager = Arc::new(session::MessageHistoryManager::new()); - let compression_manager = Arc::new(session::CompressionManager::new( - persistence_manager.clone(), - session::CompressionConfig { - enable_persistence: false, - ..Default::default() - }, - )); + let compression_manager = Arc::new(session::CompressionManager::new(Default::default())); let session_manager = Arc::new(session::SessionManager::new( history_manager, diff --git a/src/apps/server/src/bootstrap.rs b/src/apps/server/src/bootstrap.rs index 8e470db7..b37c3823 100644 --- a/src/apps/server/src/bootstrap.rs +++ b/src/apps/server/src/bootstrap.rs @@ -53,21 +53,9 @@ pub async fn initialize(workspace: Option) -> anyhow::Result PathBuf { - self.path_manager.user_data_dir().join("legacy-sessions") - } - - fn legacy_session_dir(&self, session_id: &str) -> PathBuf { - self.legacy_sessions_dir().join(session_id) - } - - async fn ensure_legacy_session_dir(&self, session_id: &str) -> BitFunResult { - let dir = self.legacy_session_dir(session_id); - fs::create_dir_all(&dir).await.map_err(|e| { - BitFunError::io(format!("Failed to create legacy session directory: {}", e)) - })?; - Ok(dir) - } - - /// Append message (JSONL format) - pub async fn append_message(&self, session_id: &str, message: &Message) -> BitFunResult<()> { - let dir = self.ensure_legacy_session_dir(session_id).await?; - let messages_path = dir.join("messages.jsonl"); - - let sanitized_message = Self::sanitize_message_for_persistence(message); - let json = serde_json::to_string(&sanitized_message).map_err(|e| { - BitFunError::serialization(format!("Failed to serialize message: {}", e)) - })?; - - let mut file = fs::OpenOptions::new() - .create(true) - .append(true) - .open(&messages_path) - .await - .map_err(|e| BitFunError::io(format!("Failed to open message file: {}", e)))?; - - file.write_all(json.as_bytes()) - .await - .map_err(|e| BitFunError::io(format!("Failed to write message: {}", e)))?; - file.write_all(b"\n") - .await - .map_err(|e| BitFunError::io(format!("Failed to write newline: {}", e)))?; - - Ok(()) - } - - /// Load all messages - pub async fn load_messages(&self, session_id: &str) -> BitFunResult> { - let messages_path = self.legacy_session_dir(session_id).join("messages.jsonl"); - if !messages_path.exists() { - return Ok(vec![]); - } - - let file = fs::File::open(&messages_path) - .await - .map_err(|e| BitFunError::io(format!("Failed to open message file: {}", e)))?; - - let reader = BufReader::new(file); - let mut lines = reader.lines(); - let mut messages = Vec::new(); - - while let Some(line) = lines - .next_line() - .await - .map_err(|e| BitFunError::io(format!("Failed to read message line: {}", e)))? - { - if line.trim().is_empty() { - continue; - } - - match serde_json::from_str::(&line) { - Ok(message) => messages.push(message), - Err(e) => warn!("Failed to deserialize message: {}", e), - } - } - - Ok(messages) - } - - /// Clear messages - pub async fn clear_messages(&self, session_id: &str) -> BitFunResult<()> { - let messages_path = self.legacy_session_dir(session_id).join("messages.jsonl"); - if messages_path.exists() { - fs::remove_file(&messages_path) - .await - .map_err(|e| BitFunError::io(format!("Failed to delete message file: {}", e)))?; - } - Ok(()) - } - - /// Delete messages - pub async fn delete_messages(&self, session_id: &str) -> BitFunResult<()> { - self.clear_messages(session_id).await - } - - // ============ Legacy compressed history persistence ============ - - pub async fn append_compressed_message( - &self, - session_id: &str, - message: &Message, - ) -> BitFunResult<()> { - let dir = self.ensure_legacy_session_dir(session_id).await?; - let compressed_path = dir.join("compressed_messages.jsonl"); - - let sanitized_message = Self::sanitize_message_for_persistence(message); - let json = serde_json::to_string(&sanitized_message).map_err(|e| { - BitFunError::serialization(format!("Failed to serialize compressed message: {}", e)) - })?; - - let mut file = fs::OpenOptions::new() - .create(true) - .append(true) - .open(&compressed_path) - .await - .map_err(|e| { - BitFunError::io(format!("Failed to open compressed message file: {}", e)) - })?; - - file.write_all(json.as_bytes()) - .await - .map_err(|e| BitFunError::io(format!("Failed to write compressed message: {}", e)))?; - file.write_all(b"\n") - .await - .map_err(|e| BitFunError::io(format!("Failed to write newline: {}", e)))?; - - Ok(()) - } - - pub async fn save_compressed_messages( - &self, - session_id: &str, - messages: &[Message], - ) -> BitFunResult<()> { - let dir = self.ensure_legacy_session_dir(session_id).await?; - let compressed_path = dir.join("compressed_messages.jsonl"); - - let mut file = fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(&compressed_path) - .await - .map_err(|e| { - BitFunError::io(format!("Failed to open compressed message file: {}", e)) - })?; - - let sanitized_messages = Self::sanitize_messages_for_persistence(messages); - for message in &sanitized_messages { - let json = serde_json::to_string(message).map_err(|e| { - BitFunError::serialization(format!("Failed to serialize compressed message: {}", e)) - })?; - - file.write_all(json.as_bytes()).await.map_err(|e| { - BitFunError::io(format!("Failed to write compressed message: {}", e)) - })?; - file.write_all(b"\n") - .await - .map_err(|e| BitFunError::io(format!("Failed to write newline: {}", e)))?; - } - - debug!( - "Legacy compressed history persisted: session_id={}, message_count={}", - session_id, - messages.len() - ); - Ok(()) - } - - pub async fn load_compressed_messages( - &self, - session_id: &str, - ) -> BitFunResult>> { - let compressed_path = self - .legacy_session_dir(session_id) - .join("compressed_messages.jsonl"); - - if !compressed_path.exists() { - return Ok(None); - } - - let file = fs::File::open(&compressed_path).await.map_err(|e| { - BitFunError::io(format!("Failed to open compressed message file: {}", e)) - })?; - - let reader = BufReader::new(file); - let mut lines = reader.lines(); - let mut messages = Vec::new(); - - while let Some(line) = lines.next_line().await.map_err(|e| { - BitFunError::io(format!("Failed to read compressed message line: {}", e)) - })? { - if line.trim().is_empty() { - continue; - } - - match serde_json::from_str::(&line) { - Ok(message) => messages.push(message), - Err(e) => warn!("Failed to deserialize compressed message: {}", e), - } - } - - if messages.is_empty() { - return Ok(None); - } - - Ok(Some(messages)) - } - - pub async fn delete_compressed_messages(&self, session_id: &str) -> BitFunResult<()> { - let compressed_path = self - .legacy_session_dir(session_id) - .join("compressed_messages.jsonl"); - - if compressed_path.exists() { - fs::remove_file(&compressed_path).await.map_err(|e| { - BitFunError::io(format!("Failed to delete compressed message file: {}", e)) - })?; - } - - Ok(()) - } } #[cfg(test)] diff --git a/src/crates/core/src/agentic/session/compression/manager.rs b/src/crates/core/src/agentic/session/compression/manager.rs index 61c680cf..868250d4 100644 --- a/src/crates/core/src/agentic/session/compression/manager.rs +++ b/src/crates/core/src/agentic/session/compression/manager.rs @@ -1,6 +1,6 @@ //! Context Compression Manager //! -//! Responsible for managing session context compression +//! Responsible for managing in-memory session context compression. use super::fallback::{ build_structured_compression_reminder, CompressionFallbackOptions, CompressionReminder, @@ -9,7 +9,6 @@ use crate::agentic::core::{ render_system_reminder, CompressionPayload, Message, MessageHelper, MessageRole, MessageSemanticKind, }; -use crate::agentic::persistence::PersistenceManager; use crate::infrastructure::ai::{get_global_ai_client_factory, AIClient}; use crate::util::errors::{BitFunError, BitFunResult}; use crate::util::types::Message as AIMessage; @@ -21,7 +20,6 @@ use std::sync::Arc; /// Compression manager configuration #[derive(Debug, Clone)] pub struct CompressionConfig { - pub enable_persistence: bool, pub keep_turns_ratio: f32, pub keep_last_turn_ratio: f32, pub single_request_max_tokens_ratio: f32, @@ -35,7 +33,6 @@ pub struct CompressionConfig { impl Default for CompressionConfig { fn default() -> Self { Self { - enable_persistence: true, keep_turns_ratio: 0.3, keep_last_turn_ratio: 0.4, single_request_max_tokens_ratio: 0.7, @@ -70,17 +67,14 @@ pub struct CompressionResult { pub struct CompressionManager { /// Compressed message history (by session ID) compressed_histories: Arc>>, - /// Persistence manager - persistence: Arc, /// Configuration config: CompressionConfig, } impl CompressionManager { - pub fn new(persistence: Arc, config: CompressionConfig) -> Self { + pub fn new(config: CompressionConfig) -> Self { Self { compressed_histories: Arc::new(DashMap::new()), - persistence, config, } } @@ -95,27 +89,18 @@ impl CompressionManager { ); } - /// Add message (async, supports persistence) + /// Add message to the in-memory context history. pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - // 1. Add to memory if let Some(mut compressed) = self.compressed_histories.get_mut(session_id) { - compressed.push(message.clone()); + compressed.push(message); } else { self.compressed_histories - .insert(session_id.to_string(), vec![message.clone()]); + .insert(session_id.to_string(), vec![message]); } - - // 2. Persist (append single message, similar to MessageHistoryManager) - if self.config.enable_persistence { - self.persistence - .append_compressed_message(session_id, &message) - .await?; - } - Ok(()) } - /// Batch restore messages (doesn't trigger persistence, used for session restore) + /// Batch restore messages into the in-memory compression cache. pub fn restore_session(&self, session_id: &str, messages: Vec) { self.compressed_histories .insert(session_id.to_string(), messages); @@ -296,28 +281,6 @@ impl CompressionManager { self.compressed_histories .insert(session_id.to_string(), compressed_messages.clone()); - // Persist compression history (similar to MessageHistoryManager pattern). - // Persistence is intentionally off until the storage contract is finalized. - #[allow(clippy::overly_complex_bool_expr)] - if false && self.config.enable_persistence { - if let Err(e) = self - .persistence - .save_compressed_messages(session_id, &compressed_messages) - .await - { - warn!( - "Failed to persist compressed history: session_id={}, error={}", - session_id, e - ); - } else { - debug!( - "Compressed history persisted: session_id={}, message_count={}", - session_id, - compressed_messages.len() - ); - } - } - Ok(CompressionResult { messages: compressed_messages, has_model_summary, diff --git a/src/crates/core/src/agentic/session/history_manager.rs b/src/crates/core/src/agentic/session/history_manager.rs index f83ac119..6c50cb8f 100644 --- a/src/crates/core/src/agentic/session/history_manager.rs +++ b/src/crates/core/src/agentic/session/history_manager.rs @@ -1,46 +1,23 @@ //! Message History Manager //! -//! Manages session message history, supports memory caching and persistence +//! Manages in-memory session message history. use crate::agentic::core::Message; -use crate::agentic::persistence::PersistenceManager; use crate::util::errors::BitFunResult; use dashmap::DashMap; use log::debug; use std::sync::Arc; -/// Message history configuration -#[derive(Debug, Clone)] -pub struct HistoryConfig { - pub enable_persistence: bool, -} - -impl Default for HistoryConfig { - fn default() -> Self { - Self { - enable_persistence: true, - } - } -} - /// Message history manager pub struct MessageHistoryManager { /// Message history in memory (by session ID) histories: Arc>>, - - /// Persistence manager - persistence: Arc, - - /// Configuration - config: HistoryConfig, } impl MessageHistoryManager { - pub fn new(persistence: Arc, config: HistoryConfig) -> Self { + pub fn new() -> Self { Self { histories: Arc::new(DashMap::new()), - persistence, - config, } } @@ -53,46 +30,20 @@ impl MessageHistoryManager { /// Add message pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - // 1. Add to memory if let Some(mut messages) = self.histories.get_mut(session_id) { - messages.push(message.clone()); + messages.push(message); } else { - // Session doesn't exist, create and add - self.histories - .insert(session_id.to_string(), vec![message.clone()]); - } - - // 2. Persist - if self.config.enable_persistence { - self.persistence - .append_message(session_id, &message) - .await?; + self.histories.insert(session_id.to_string(), vec![message]); } - Ok(()) } /// Get message history pub async fn get_messages(&self, session_id: &str) -> BitFunResult> { - // First try to get from memory if let Some(messages) = self.histories.get(session_id) { return Ok(messages.clone()); } - - // Load from persistence - if self.config.enable_persistence { - let messages = self.persistence.load_messages(session_id).await?; - - // Cache to memory - if !messages.is_empty() { - self.histories - .insert(session_id.to_string(), messages.clone()); - } - - Ok(messages) - } else { - Ok(vec![]) - } + Ok(vec![]) } /// Get paginated message history @@ -139,13 +90,6 @@ impl MessageHistoryManager { pub async fn count_messages(&self, session_id: &str) -> usize { if let Some(messages) = self.histories.get(session_id) { messages.len() - } else if self.config.enable_persistence { - // Load from persistence - self.persistence - .load_messages(session_id) - .await - .map(|msgs| msgs.len()) - .unwrap_or(0) } else { 0 } @@ -153,35 +97,23 @@ impl MessageHistoryManager { /// Clear message history pub async fn clear_messages(&self, session_id: &str) -> BitFunResult<()> { - // Clear memory if let Some(mut messages) = self.histories.get_mut(session_id) { messages.clear(); } - // Clear persistence - if self.config.enable_persistence { - self.persistence.clear_messages(session_id).await?; - } - debug!("Cleared session message history: session_id={}", session_id); Ok(()) } /// Delete session pub async fn delete_session(&self, session_id: &str) -> BitFunResult<()> { - // Remove from memory self.histories.remove(session_id); - // Delete from persistence - if self.config.enable_persistence { - self.persistence.delete_messages(session_id).await?; - } - debug!("Deleted session history: session_id={}", session_id); Ok(()) } - /// Restore session (load from persistence) + /// Restore session into the in-memory cache. pub async fn restore_session( &self, session_id: &str, diff --git a/src/crates/core/src/agentic/session/session_manager.rs b/src/crates/core/src/agentic/session/session_manager.rs index 8a9b8fc9..a34a4c16 100644 --- a/src/crates/core/src/agentic/session/session_manager.rs +++ b/src/crates/core/src/agentic/session/session_manager.rs @@ -581,7 +581,7 @@ impl SessionManager { .restore_session(session_id, messages.clone()) .await?; - // 3. Restore compression manager - batch restore, don't trigger persistence + // 3. Restore the in-memory compression manager state from the recovered messages. // If session already exists, delete old one first then create (ensure clean state) if session_already_in_memory { self.compression_manager.delete_session(session_id); @@ -1074,14 +1074,14 @@ impl SessionManager { .save_dialog_turn(workspace_path, &turn) .await?; - // Sync messages to in-memory caches so subsequent对话 can access context + // Sync messages to the in-memory caches so subsequent turns can access context. let user_message = Message::user(question.to_string()) .with_turn_id(turn_id.clone()) .with_semantic_kind(MessageSemanticKind::ActualUserInput); let assistant_message = Message::assistant(full_text.to_string()) .with_turn_id(turn_id.clone()); - // Add to MessageHistoryManager + // Add to the in-memory history cache. self.history_manager .add_message(child_session_id, user_message.clone()) .await?; @@ -1089,7 +1089,7 @@ impl SessionManager { .add_message(child_session_id, assistant_message.clone()) .await?; - // Add to CompressionManager + // Add to the in-memory compression/context cache. self.compression_manager .add_message(child_session_id, user_message) .await?; @@ -1141,11 +1141,11 @@ impl SessionManager { /// Add message to session pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - // Add to history manager + // Add to the in-memory history cache. self.history_manager .add_message(session_id, message.clone()) .await?; - // Also add to compression manager + // Also add to the in-memory compression/context cache. self.compression_manager .add_message(session_id, message) .await?; From f6841538aa853013182ee96706322963d6f33bf7 Mon Sep 17 00:00:00 2001 From: wsp1911 Date: Thu, 26 Mar 2026 21:38:00 +0800 Subject: [PATCH 3/5] refactor: remove redundant in-memory session history manager - use the compression manager as the runtime session context cache - rebuild canonical message history from persisted turns when needed - add paginated session message retrieval in the server RPC layer - remove obsolete desktop and web APIs that depended on the old history cache --- src/apps/cli/src/agent/agentic_system.rs | 3 - src/apps/desktop/src/api/agentic_api.rs | 100 ------------ src/apps/desktop/src/lib.rs | 4 - src/apps/server/src/bootstrap.rs | 3 - src/apps/server/src/rpc_dispatcher.rs | 145 ++++++++++++----- .../src/agentic/coordination/coordinator.rs | 73 +++++---- .../src/agentic/execution/execution_engine.rs | 22 ++- .../agentic/session/compression/manager.rs | 50 +++--- .../src/agentic/session/history_manager.rs | 126 --------------- src/crates/core/src/agentic/session/mod.rs | 4 +- .../src/agentic/session/session_manager.rs | 149 ++++++++++-------- .../api/service-api/AgentAPI.ts | 22 --- 12 files changed, 260 insertions(+), 441 deletions(-) delete mode 100644 src/crates/core/src/agentic/session/history_manager.rs diff --git a/src/apps/cli/src/agent/agentic_system.rs b/src/apps/cli/src/agent/agentic_system.rs index 67108a6f..2f6317ea 100644 --- a/src/apps/cli/src/agent/agentic_system.rs +++ b/src/apps/cli/src/agent/agentic_system.rs @@ -33,12 +33,9 @@ pub async fn init_agentic_system() -> Result { let path_manager = try_get_path_manager_arc()?; let persistence_manager = Arc::new(persistence::PersistenceManager::new(path_manager.clone())?); - let history_manager = Arc::new(session::MessageHistoryManager::new()); - let compression_manager = Arc::new(session::CompressionManager::new(Default::default())); let session_manager = Arc::new(session::SessionManager::new( - history_manager.clone(), compression_manager, persistence_manager.clone(), Default::default(), diff --git a/src/apps/desktop/src/api/agentic_api.rs b/src/apps/desktop/src/api/agentic_api.rs index 32ad50f5..cfbe2fda 100644 --- a/src/apps/desktop/src/api/agentic_api.rs +++ b/src/apps/desktop/src/api/agentic_api.rs @@ -125,22 +125,6 @@ pub struct SessionResponse { pub created_at: u64, } -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct GetMessagesRequest { - pub session_id: String, - pub limit: Option, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct MessageDTO { - pub id: String, - pub role: String, - pub content: serde_json::Value, - pub timestamp: u64, -} - #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CancelDialogTurnRequest { @@ -583,21 +567,6 @@ pub async fn list_sessions( Ok(responses) } -#[tauri::command] -pub async fn get_session_messages( - coordinator: State<'_, Arc>, - request: GetMessagesRequest, -) -> Result, String> { - let messages = coordinator - .get_messages(&request.session_id) - .await - .map_err(|e| format!("Failed to get messages: {}", e))?; - - let message_dtos = messages.into_iter().map(message_to_dto).collect(); - - Ok(message_dtos) -} - #[tauri::command] pub async fn confirm_tool_execution( coordinator: State<'_, Arc>, @@ -733,75 +702,6 @@ fn session_to_response(session: Session) -> SessionResponse { } } -fn message_to_dto(message: Message) -> MessageDTO { - let role = match message.role { - MessageRole::User => "user", - MessageRole::Assistant => "assistant", - MessageRole::Tool => "tool", - MessageRole::System => "system", - }; - - let content = match message.content { - MessageContent::Text(text) => serde_json::json!({ "type": "text", "text": text }), - MessageContent::Multimodal { text, images } => { - let images: Vec = images - .into_iter() - .map(|img| { - serde_json::json!({ - "id": img.id, - "image_path": img.image_path, - "mime_type": img.mime_type, - "metadata": img.metadata, - "has_data_url": img.data_url.as_ref().is_some_and(|s| !s.is_empty()), - }) - }) - .collect(); - - serde_json::json!({ - "type": "multimodal", - "text": text, - "images": images, - }) - } - MessageContent::ToolResult { - tool_id, - tool_name, - result, - result_for_assistant, - is_error: _, - image_attachments, - } => { - serde_json::json!({ - "type": "tool_result", - "tool_id": tool_id, - "tool_name": tool_name, - "result": result, - "result_for_assistant": result_for_assistant, - "has_image_attachments": image_attachments.as_ref().is_some_and(|a| !a.is_empty()), - }) - } - MessageContent::Mixed { - reasoning_content, - text, - tool_calls, - } => { - serde_json::json!({ - "type": "mixed", - "reasoning_content": reasoning_content, - "text": text, - "tool_calls": tool_calls, - }) - } - }; - - MessageDTO { - id: message.id, - role: role.to_string(), - content, - timestamp: system_time_to_unix_secs(message.timestamp), - } -} - fn system_time_to_unix_secs(time: std::time::SystemTime) -> u64 { match time.duration_since(std::time::UNIX_EPOCH) { Ok(duration) => duration.as_secs(), diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 0c7d24f2..974abab7 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -321,7 +321,6 @@ pub async fn run() { api::agentic_api::restore_session, webdriver_bridge_result, api::agentic_api::list_sessions, - api::agentic_api::get_session_messages, api::agentic_api::confirm_tool_execution, api::agentic_api::reject_tool_execution, api::agentic_api::cancel_tool, @@ -714,12 +713,9 @@ async fn init_agentic_system() -> anyhow::Result<( let path_manager = try_get_path_manager_arc()?; let persistence_manager = Arc::new(persistence::PersistenceManager::new(path_manager.clone())?); - let history_manager = Arc::new(session::MessageHistoryManager::new()); - let compression_manager = Arc::new(session::CompressionManager::new(Default::default())); let session_manager = Arc::new(session::SessionManager::new( - history_manager, compression_manager, persistence_manager, Default::default(), diff --git a/src/apps/server/src/bootstrap.rs b/src/apps/server/src/bootstrap.rs index b37c3823..62708c76 100644 --- a/src/apps/server/src/bootstrap.rs +++ b/src/apps/server/src/bootstrap.rs @@ -53,12 +53,9 @@ pub async fn initialize(workspace: Option) -> anyhow::Result { let request = extract_request(¶ms)?; let path: String = serde_json::from_value( - request.get("path").cloned().ok_or_else(|| anyhow!("Missing path"))?, + request + .get("path") + .cloned() + .ok_or_else(|| anyhow!("Missing path"))?, )?; - let info = state.workspace_service.open_workspace(path.into()).await + let info = state + .workspace_service + .open_workspace(path.into()) + .await .map_err(|e| anyhow!("{}", e))?; *state.workspace_path.write().await = Some(info.root_path.clone()); Ok(serde_json::to_value(&info).unwrap_or_default()) @@ -72,7 +78,10 @@ pub async fn dispatch( "read_file_content" => { let request = extract_request(¶ms)?; let file_path = get_string(&request, "filePath")?; - let result = state.filesystem_service.read_file(&file_path).await + let result = state + .filesystem_service + .read_file(&file_path) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::json!(result.content)) } @@ -80,7 +89,10 @@ pub async fn dispatch( let request = extract_request(¶ms)?; let file_path = get_string(&request, "filePath")?; let content = get_string(&request, "content")?; - state.filesystem_service.write_file(&file_path, &content).await + state + .filesystem_service + .write_file(&file_path, &content) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::Value::Null) } @@ -96,7 +108,10 @@ pub async fn dispatch( "get_file_tree" => { let request = extract_request(¶ms)?; let path = get_string(&request, "path")?; - let nodes = state.filesystem_service.build_file_tree(&path).await + let nodes = state + .filesystem_service + .build_file_tree(&path) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::to_value(&nodes).unwrap_or_default()) } @@ -110,21 +125,32 @@ pub async fn dispatch( "get_config" => { let request = extract_request(¶ms)?; let key = request.get("key").and_then(|v| v.as_str()); - let config: serde_json::Value = state.config_service - .get_config(key).await + let config: serde_json::Value = state + .config_service + .get_config(key) + .await .map_err(|e| anyhow!("{}", e))?; Ok(config) } "set_config" => { let request = extract_request(¶ms)?; let key = get_string(&request, "key")?; - let value = request.get("value").cloned().ok_or_else(|| anyhow!("Missing value"))?; - state.config_service.set_config(&key, value).await + let value = request + .get("value") + .cloned() + .ok_or_else(|| anyhow!("Missing value"))?; + state + .config_service + .set_config(&key, value) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::json!("ok")) } "get_model_configs" => { - let models = state.config_service.get_ai_models().await + let models = state + .config_service + .get_ai_models() + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::to_value(&models).unwrap_or_default()) } @@ -135,7 +161,8 @@ pub async fn dispatch( let session_name = get_string(&request, "sessionName")?; let agent_type = get_string(&request, "agentType")?; let workspace_path = get_string(&request, "workspacePath")?; - let session_id = request.get("sessionId") + let session_id = request + .get("sessionId") .and_then(|v| v.as_str()) .map(|s| s.to_string()); @@ -144,7 +171,8 @@ pub async fn dispatch( ..Default::default() }; - let session = state.coordinator + let session = state + .coordinator .create_session_with_workspace( session_id, session_name, @@ -164,7 +192,8 @@ pub async fn dispatch( "list_sessions" => { let request = extract_request(¶ms)?; let workspace_path = get_string(&request, "workspacePath")?; - let sessions = state.coordinator + let sessions = state + .coordinator .list_sessions(&PathBuf::from(workspace_path)) .await .map_err(|e| anyhow!("{}", e))?; @@ -174,7 +203,8 @@ pub async fn dispatch( let request = extract_request(¶ms)?; let session_id = get_string(&request, "sessionId")?; let workspace_path = get_string(&request, "workspacePath")?; - state.coordinator + state + .coordinator .delete_session(&PathBuf::from(workspace_path), &session_id) .await .map_err(|e| anyhow!("{}", e))?; @@ -184,18 +214,22 @@ pub async fn dispatch( let request = extract_request(¶ms)?; let session_id = get_string(&request, "sessionId")?; let user_input = get_string(&request, "userInput")?; - let original_user_input = request.get("originalUserInput") + let original_user_input = request + .get("originalUserInput") .and_then(|v| v.as_str()) .map(|s| s.to_string()); let agent_type = get_string(&request, "agentType")?; - let workspace_path = request.get("workspacePath") + let workspace_path = request + .get("workspacePath") .and_then(|v| v.as_str()) .map(|s| s.to_string()); - let turn_id = request.get("turnId") + let turn_id = request + .get("turnId") .and_then(|v| v.as_str()) .map(|s| s.to_string()); - state.scheduler + state + .scheduler .submit( session_id, user_input, @@ -215,26 +249,47 @@ pub async fn dispatch( let request = extract_request(¶ms)?; let session_id = get_string(&request, "sessionId")?; let dialog_turn_id = get_string(&request, "dialogTurnId")?; - state.coordinator + state + .coordinator .cancel_dialog_turn(&session_id, &dialog_turn_id) .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::json!({ "success": true })) } "get_session_messages" => { - let request = extract_request(¶ms)?; - let session_id = get_string(&request, "sessionId")?; - let messages = state.coordinator - .get_messages(&session_id) + let request = params.get("request").unwrap_or(¶ms); + let session_id = request + .get("sessionId") + .or_else(|| request.get("session_id")) + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow!("Missing or invalid 'sessionId'/'session_id' field"))? + .to_string(); + let limit = request + .get("limit") + .and_then(|v| v.as_u64()) + .map(|v| v as usize) + .unwrap_or(50); + let before_message_id = request + .get("beforeMessageId") + .or_else(|| request.get("before_message_id")) + .and_then(|v| v.as_str()); + + let (messages, has_more) = state + .coordinator + .get_messages_paginated(&session_id, limit, before_message_id) .await .map_err(|e| anyhow!("{}", e))?; - Ok(serde_json::to_value(&messages).unwrap_or_default()) + Ok(serde_json::json!({ + "messages": messages, + "has_more": has_more, + })) } "confirm_tool_execution" => { let request = extract_request(¶ms)?; let tool_id = get_string(&request, "toolId")?; let updated_input = request.get("updatedInput").cloned(); - state.coordinator + state + .coordinator .confirm_tool(&tool_id, updated_input) .await .map_err(|e| anyhow!("{}", e))?; @@ -243,11 +298,13 @@ pub async fn dispatch( "reject_tool_execution" => { let request = extract_request(¶ms)?; let tool_id = get_string(&request, "toolId")?; - let reason = request.get("reason") + let reason = request + .get("reason") .and_then(|v| v.as_str()) .unwrap_or("User rejected") .to_string(); - state.coordinator + state + .coordinator .reject_tool(&tool_id, reason) .await .map_err(|e| anyhow!("{}", e))?; @@ -256,32 +313,38 @@ pub async fn dispatch( // ── I18n ───────────────────────────────────────────── "i18n_get_current_language" => { - let lang: String = state.config_service - .get_config(Some("app.language")).await + let lang: String = state + .config_service + .get_config(Some("app.language")) + .await .unwrap_or_else(|_| "zh-CN".to_string()); Ok(serde_json::json!(lang)) } "i18n_set_language" => { let request = extract_request(¶ms)?; let language = get_string(&request, "language")?; - state.config_service.set_config("app.language", language.clone()).await + state + .config_service + .set_config("app.language", language.clone()) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::json!(language)) } - "i18n_get_supported_languages" => { - Ok(serde_json::json!([ - {"id": "zh-CN", "name": "Chinese (Simplified)", "englishName": "Chinese (Simplified)", "nativeName": "简体中文", "rtl": false}, - {"id": "en-US", "name": "English", "englishName": "English", "nativeName": "English", "rtl": false} - ])) - } + "i18n_get_supported_languages" => Ok(serde_json::json!([ + {"id": "zh-CN", "name": "Chinese (Simplified)", "englishName": "Chinese (Simplified)", "nativeName": "简体中文", "rtl": false}, + {"id": "en-US", "name": "English", "englishName": "English", "nativeName": "English", "rtl": false} + ])), // ── Tools ──────────────────────────────────────────── "get_all_tools_info" => { - let tools: Vec = state.tool_registry_snapshot + let tools: Vec = state + .tool_registry_snapshot .iter() - .map(|t| serde_json::json!({ - "name": t.name().to_string(), - })) + .map(|t| { + serde_json::json!({ + "name": t.name().to_string(), + }) + }) .collect(); Ok(serde_json::json!(tools)) } diff --git a/src/crates/core/src/agentic/coordination/coordinator.rs b/src/crates/core/src/agentic/coordination/coordinator.rs index e3d8422b..d0107781 100644 --- a/src/crates/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/core/src/agentic/coordination/coordinator.rs @@ -12,8 +12,8 @@ use crate::agentic::events::{ AgenticEvent, EventPriority, EventQueue, EventRouter, EventSubscriber, }; use crate::agentic::execution::{ExecutionContext, ExecutionEngine}; -use crate::agentic::round_preempt::DialogRoundPreemptSource; use crate::agentic::image_analysis::ImageContextData; +use crate::agentic::round_preempt::DialogRoundPreemptSource; use crate::agentic::session::SessionManager; use crate::agentic::tools::pipeline::{SubagentParentInfo, ToolPipeline}; use crate::agentic::WorkspaceBinding; @@ -151,27 +151,26 @@ impl ConversationCoordinator { ) .await; - let local_session_path = - if let Some(ref e) = entry { - if !e.ssh_host.trim().is_empty() { - crate::service::remote_ssh::workspace_state::remote_workspace_session_mirror_dir( - &e.ssh_host, - &e.remote_root, - ) - } else { - crate::service::remote_ssh::workspace_state::unresolved_remote_session_storage_dir( - rid, &path_norm, - ) - } - } else if let Some(h) = host_from_config { + let local_session_path = if let Some(ref e) = entry { + if !e.ssh_host.trim().is_empty() { crate::service::remote_ssh::workspace_state::remote_workspace_session_mirror_dir( - h, &path_norm, + &e.ssh_host, + &e.remote_root, ) } else { crate::service::remote_ssh::workspace_state::unresolved_remote_session_storage_dir( rid, &path_norm, ) - }; + } + } else if let Some(h) = host_from_config { + crate::service::remote_ssh::workspace_state::remote_workspace_session_mirror_dir( + h, &path_norm, + ) + } else { + crate::service::remote_ssh::workspace_state::unresolved_remote_session_storage_dir( + rid, &path_norm, + ) + }; let connection_name = entry .map(|e| e.connection_name) @@ -195,24 +194,31 @@ impl ConversationCoordinator { let binding = binding.as_ref()?; if binding.is_remote() { - let manager = match crate::service::remote_ssh::workspace_state::get_remote_workspace_manager() { - Some(m) => m, - None => { - log::warn!("build_workspace_services: RemoteWorkspaceStateManager not initialized"); - return None; - } - }; + let manager = + match crate::service::remote_ssh::workspace_state::get_remote_workspace_manager() { + Some(m) => m, + None => { + log::warn!( + "build_workspace_services: RemoteWorkspaceStateManager not initialized" + ); + return None; + } + }; let ssh_manager = match manager.get_ssh_manager().await { Some(m) => m, None => { - log::warn!("build_workspace_services: SSH manager not available in state manager"); + log::warn!( + "build_workspace_services: SSH manager not available in state manager" + ); return None; } }; let file_service = match manager.get_file_service().await { Some(f) => f, None => { - log::warn!("build_workspace_services: File service not available in state manager"); + log::warn!( + "build_workspace_services: File service not available in state manager" + ); return None; } }; @@ -223,7 +229,10 @@ impl ConversationCoordinator { return None; } }; - log::info!("build_workspace_services: Built remote services for connection_id={}", connection_id); + log::info!( + "build_workspace_services: Built remote services for connection_id={}", + connection_id + ); Some(crate::agentic::workspace::remote_workspace_services( connection_id, file_service, @@ -571,7 +580,8 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet let binding = Self::build_workspace_binding(&SessionConfig { workspace_path: Some(workspace_path.to_string()), ..Default::default() - }).await; + }) + .await; binding .as_ref() .map(|b| b.session_storage_path().to_path_buf()) @@ -1514,7 +1524,8 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet return Ok(None); }; - self.cancel_dialog_turn(session_id, ¤t_turn_id).await?; + self.cancel_dialog_turn(session_id, ¤t_turn_id) + .await?; let deadline = Instant::now() + wait_timeout; while self.execution_engine.has_active_turn(¤t_turn_id) { @@ -1565,12 +1576,12 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet self.session_manager.list_sessions(workspace_path).await } - /// Get session messages + /// Get a best-effort message view for a session. pub async fn get_messages(&self, session_id: &str) -> BitFunResult> { self.session_manager.get_messages(session_id).await } - /// Get session messages paginated + /// Get a paginated best-effort message view for a session. pub async fn get_messages_paginated( &self, session_id: &str, @@ -1818,7 +1829,7 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet } } - // Delete subagent session itself (including message history, persistence data, etc.) + // Delete the subagent session itself, including runtime context and persisted turn data. let workspace_path = self .session_manager .get_session(session_id) diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 75a117c7..ffad749b 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -276,8 +276,7 @@ impl ExecutionEngine { if Self::skip_message_for_model_send(msg) { continue; } - let keep_this_message_images = - attach_images && keep_image_messages.contains(&msg_idx); + let keep_this_message_images = attach_images && keep_image_messages.contains(&msg_idx); match &msg.content { MessageContent::Multimodal { text, images } => { if !attach_images { @@ -317,7 +316,7 @@ impl ExecutionEngine { provider, workspace_path, ) - .await + .await { Ok(processed) => { let next_count = attached_image_count + processed.len(); @@ -397,10 +396,7 @@ impl ExecutionEngine { Ok(result) } - fn render_multimodal_as_text( - text: &str, - images: &[ImageContextData], - ) -> String { + fn render_multimodal_as_text(text: &str, images: &[ImageContextData]) -> String { let mut content = text.to_string(); if images.is_empty() { @@ -716,7 +712,7 @@ impl ExecutionEngine { // Save the last token usage statistics let mut last_usage: Option = None; - // Add detailed logging showing received message history + // Add detailed logging showing the execution context messages. debug!( "Executing dialog turn: dialog_turn_id={}, mode={}, agent={}, initial_messages={}, messages_len={}", dialog_turn_id, @@ -726,7 +722,7 @@ impl ExecutionEngine { messages.len() ); trace!( - "Message history details: dialog_turn_id={}, session_id={}, roles={:?}", + "Context message details: dialog_turn_id={}, session_id={}, roles={:?}", dialog_turn_id, context.session_id, messages @@ -761,7 +757,7 @@ impl ExecutionEngine { context.workspace.as_ref(), &agent_type, ) - .await + .await } else { (vec![], None) }; @@ -843,8 +839,7 @@ impl ExecutionEngine { let original_images = images.clone(); // Replace multimodal messages with text-only versions to avoid provider errors. - let next_text = - Self::render_multimodal_as_text(&original_text, &original_images); + let next_text = Self::render_multimodal_as_text(&original_text, &original_images); msg.content = MessageContent::Text(next_text); msg.metadata.tokens = None; @@ -1270,7 +1265,8 @@ impl ExecutionEngine { .collect(); tool_definitions.sort_by_key(|tool| tool_ordering.get(&tool.name).unwrap_or(&100)); - let enabled_tool_names: Vec = tool_definitions.iter().map(|d| d.name.clone()).collect(); + let enabled_tool_names: Vec = + tool_definitions.iter().map(|d| d.name.clone()).collect(); (enabled_tool_names, Some(tool_definitions)) } diff --git a/src/crates/core/src/agentic/session/compression/manager.rs b/src/crates/core/src/agentic/session/compression/manager.rs index 868250d4..a163bb47 100644 --- a/src/crates/core/src/agentic/session/compression/manager.rs +++ b/src/crates/core/src/agentic/session/compression/manager.rs @@ -65,8 +65,10 @@ pub struct CompressionResult { /// Context compression manager pub struct CompressionManager { - /// Compressed message history (by session ID) - compressed_histories: Arc>>, + /// In-memory session context cache (by session ID). + /// The cache stores the current model context, which may contain + /// compressed reminders plus the most recent turn messages. + session_contexts: Arc>>, /// Configuration config: CompressionConfig, } @@ -74,45 +76,38 @@ pub struct CompressionManager { impl CompressionManager { pub fn new(config: CompressionConfig) -> Self { Self { - compressed_histories: Arc::new(DashMap::new()), + session_contexts: Arc::new(DashMap::new()), config, } } - /// Create session compression history + /// Initialize an empty in-memory context cache for a session. pub fn create_session(&self, session_id: &str) { - self.compressed_histories - .insert(session_id.to_string(), vec![]); - debug!( - "Created session compression history: session_id={}", - session_id - ); + self.session_contexts.insert(session_id.to_string(), vec![]); + debug!("Created session context cache: session_id={}", session_id); } - /// Add message to the in-memory context history. + /// Add a message to the in-memory context cache. pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - if let Some(mut compressed) = self.compressed_histories.get_mut(session_id) { - compressed.push(message); + if let Some(mut cached_messages) = self.session_contexts.get_mut(session_id) { + cached_messages.push(message); } else { - self.compressed_histories + self.session_contexts .insert(session_id.to_string(), vec![message]); } Ok(()) } - /// Batch restore messages into the in-memory compression cache. + /// Batch restore messages into the in-memory context cache. pub fn restore_session(&self, session_id: &str, messages: Vec) { - self.compressed_histories + self.session_contexts .insert(session_id.to_string(), messages); - debug!( - "Restored session compression history: session_id={}", - session_id - ); + debug!("Restored session context cache: session_id={}", session_id); } /// Get copy of messages for sending to model (may be compressed) pub fn get_context_messages(&self, session_id: &str) -> Vec { - self.compressed_histories + self.session_contexts .get(session_id) .map(|h| h.clone()) .unwrap_or_default() @@ -277,8 +272,8 @@ impl CompressionManager { } } - // Update compression history - self.compressed_histories + // Replace the runtime context cache with the newly compressed context. + self.session_contexts .insert(session_id.to_string(), compressed_messages.clone()); Ok(CompressionResult { @@ -565,13 +560,10 @@ Be thorough and precise. Do not lose important technical details from either the Err(BitFunError::AIClient(error_msg)) } - /// Delete session compression history + /// Delete the in-memory context cache for a session. pub fn delete_session(&self, session_id: &str) { - self.compressed_histories.remove(session_id); - debug!( - "Deleted session compression history: session_id={}", - session_id - ); + self.session_contexts.remove(session_id); + debug!("Deleted session context cache: session_id={}", session_id); } fn get_compact_prompt(&self) -> String { diff --git a/src/crates/core/src/agentic/session/history_manager.rs b/src/crates/core/src/agentic/session/history_manager.rs deleted file mode 100644 index 6c50cb8f..00000000 --- a/src/crates/core/src/agentic/session/history_manager.rs +++ /dev/null @@ -1,126 +0,0 @@ -//! Message History Manager -//! -//! Manages in-memory session message history. - -use crate::agentic::core::Message; -use crate::util::errors::BitFunResult; -use dashmap::DashMap; -use log::debug; -use std::sync::Arc; - -/// Message history manager -pub struct MessageHistoryManager { - /// Message history in memory (by session ID) - histories: Arc>>, -} - -impl MessageHistoryManager { - pub fn new() -> Self { - Self { - histories: Arc::new(DashMap::new()), - } - } - - /// Create session history - pub async fn create_session(&self, session_id: &str) -> BitFunResult<()> { - self.histories.insert(session_id.to_string(), vec![]); - debug!("Created session history: session_id={}", session_id); - Ok(()) - } - - /// Add message - pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - if let Some(mut messages) = self.histories.get_mut(session_id) { - messages.push(message); - } else { - self.histories.insert(session_id.to_string(), vec![message]); - } - Ok(()) - } - - /// Get message history - pub async fn get_messages(&self, session_id: &str) -> BitFunResult> { - if let Some(messages) = self.histories.get(session_id) { - return Ok(messages.clone()); - } - Ok(vec![]) - } - - /// Get paginated message history - pub async fn get_messages_paginated( - &self, - session_id: &str, - limit: usize, - before_message_id: Option<&str>, - ) -> BitFunResult<(Vec, bool)> { - let messages = self.get_messages(session_id).await?; - - if messages.is_empty() { - return Ok((vec![], false)); - } - - let end_idx = if let Some(before_id) = before_message_id { - messages.iter().position(|m| m.id == before_id).unwrap_or(0) - } else { - messages.len() - }; - - if end_idx == 0 { - return Ok((vec![], false)); - } - - let start_idx = end_idx.saturating_sub(limit); - let has_more = start_idx > 0; - - Ok((messages[start_idx..end_idx].to_vec(), has_more)) - } - - /// Get recent N messages - pub async fn get_recent_messages( - &self, - session_id: &str, - count: usize, - ) -> BitFunResult> { - let messages = self.get_messages(session_id).await?; - let start = messages.len().saturating_sub(count); - Ok(messages[start..].to_vec()) - } - - /// Get message count - pub async fn count_messages(&self, session_id: &str) -> usize { - if let Some(messages) = self.histories.get(session_id) { - messages.len() - } else { - 0 - } - } - - /// Clear message history - pub async fn clear_messages(&self, session_id: &str) -> BitFunResult<()> { - if let Some(mut messages) = self.histories.get_mut(session_id) { - messages.clear(); - } - - debug!("Cleared session message history: session_id={}", session_id); - Ok(()) - } - - /// Delete session - pub async fn delete_session(&self, session_id: &str) -> BitFunResult<()> { - self.histories.remove(session_id); - - debug!("Deleted session history: session_id={}", session_id); - Ok(()) - } - - /// Restore session into the in-memory cache. - pub async fn restore_session( - &self, - session_id: &str, - messages: Vec, - ) -> BitFunResult<()> { - self.histories.insert(session_id.to_string(), messages); - debug!("Restored session history: session_id={}", session_id); - Ok(()) - } -} diff --git a/src/crates/core/src/agentic/session/mod.rs b/src/crates/core/src/agentic/session/mod.rs index 2ceb22ae..97128342 100644 --- a/src/crates/core/src/agentic/session/mod.rs +++ b/src/crates/core/src/agentic/session/mod.rs @@ -1,11 +1,9 @@ //! Session Management Layer //! -//! Provides session lifecycle management, message history, and context management +//! Provides session lifecycle management and context management. pub mod compression; -pub mod history_manager; pub mod session_manager; pub use compression::*; -pub use history_manager::*; pub use session_manager::*; diff --git a/src/crates/core/src/agentic/session/session_manager.rs b/src/crates/core/src/agentic/session/session_manager.rs index a34a4c16..9582945c 100644 --- a/src/crates/core/src/agentic/session/session_manager.rs +++ b/src/crates/core/src/agentic/session/session_manager.rs @@ -8,7 +8,7 @@ use crate::agentic::core::{ }; use crate::agentic::image_analysis::ImageContextData; use crate::agentic::persistence::PersistenceManager; -use crate::agentic::session::{CompressionManager, MessageHistoryManager}; +use crate::agentic::session::CompressionManager; use crate::infrastructure::ai::get_global_ai_client_factory; use crate::service::session::{ DialogTurnData, ModelRoundData, TextItemData, TurnStatus, UserMessageData, @@ -49,7 +49,6 @@ pub struct SessionManager { sessions: Arc>, /// Sub-components - history_manager: Arc, compression_manager: Arc, persistence_manager: Arc, @@ -58,6 +57,31 @@ pub struct SessionManager { } impl SessionManager { + fn paginate_messages( + messages: &[Message], + limit: usize, + before_message_id: Option<&str>, + ) -> (Vec, bool) { + if messages.is_empty() { + return (vec![], false); + } + + let end_idx = if let Some(before_id) = before_message_id { + messages.iter().position(|m| m.id == before_id).unwrap_or(0) + } else { + messages.len() + }; + + if end_idx == 0 { + return (vec![], false); + } + + let start_idx = end_idx.saturating_sub(limit); + let has_more = start_idx > 0; + + (messages[start_idx..end_idx].to_vec(), has_more) + } + fn session_workspace_from_config(config: &SessionConfig) -> Option { config.workspace_path.as_ref().map(PathBuf::from) } @@ -185,7 +209,6 @@ impl SessionManager { } pub fn new( - history_manager: Arc, compression_manager: Arc, persistence_manager: Arc, config: SessionManagerConfig, @@ -194,7 +217,6 @@ impl SessionManager { let manager = Self { sessions: Arc::new(DashMap::new()), - history_manager, compression_manager, persistence_manager, config, @@ -272,15 +294,10 @@ impl SessionManager { // 1. Add to memory self.sessions.insert(session_id.clone(), session.clone()); - // 2. Initialize message history - self.history_manager - .create_session(&session_id) - .await?; - - // 3. Initialize compression manager + // 2. Initialize the in-memory compression/context cache. self.compression_manager.create_session(&session_id); - // 4. Persist to local path (handles remote workspaces correctly) + // 3. Persist to local path (handles remote workspaces correctly) if self.config.enable_persistence { if let Some(session) = self.sessions.get(&session_id) { self.persistence_manager @@ -382,10 +399,9 @@ impl SessionManager { if self.config.enable_persistence { let effective_path = self.effective_session_workspace_path(session_id).await; - if let (Some(workspace_path), Some(session)) = ( - effective_path, - self.sessions.get(session_id), - ) { + if let (Some(workspace_path), Some(session)) = + (effective_path, self.sessions.get(session_id)) + { self.persistence_manager .save_session(&workspace_path, &session) .await?; @@ -419,10 +435,9 @@ impl SessionManager { if self.config.enable_persistence { let effective_path = self.effective_session_workspace_path(session_id).await; - if let (Some(workspace_path), Some(session)) = ( - effective_path, - self.sessions.get(session_id), - ) { + if let (Some(workspace_path), Some(session)) = + (effective_path, self.sessions.get(session_id)) + { self.persistence_manager .save_session(&workspace_path, &session) .await?; @@ -464,10 +479,9 @@ impl SessionManager { } } - // 2. Delete message history - self.history_manager.delete_session(session_id).await?; + self.compression_manager.delete_session(session_id); - // 3. Delete persisted data + // 2. Delete persisted data if self.config.enable_persistence { self.persistence_manager .delete_session(workspace_path, session_id) @@ -492,7 +506,7 @@ impl SessionManager { } } - // 4. Clean up associated Terminal session + // 3. Clean up associated Terminal session use crate::service::terminal::TerminalApi; if let Ok(terminal_api) = TerminalApi::from_singleton() { let binding = terminal_api.session_manager().binding(); @@ -508,7 +522,7 @@ impl SessionManager { } } - // 5. Remove from memory + // 4. Remove from memory self.sessions.remove(session_id); info!("Session deletion completed: session_id={}", session_id); @@ -553,7 +567,8 @@ impl SessionManager { ); } - // 2. Load message history - full list by turn, may already be compressed + // 2. Rebuild the runtime context cache from the latest persisted snapshot when + // available; otherwise reconstruct it from persisted turns. let mut latest_turn_index: Option = None; let messages = match self .persistence_manager @@ -577,10 +592,6 @@ impl SessionManager { ); } - self.history_manager - .restore_session(session_id, messages.clone()) - .await?; - // 3. Restore the in-memory compression manager state from the recovered messages. // If session already exists, delete old one first then create (ensure clean state) if session_already_in_memory { @@ -659,10 +670,7 @@ impl SessionManager { })? }; - // 2) Restore history/compression context in memory - self.history_manager - .restore_session(session_id, messages.clone()) - .await?; + // 2) Restore the in-memory context cache. self.compression_manager .restore_session(session_id, messages); @@ -735,8 +743,9 @@ impl SessionManager { let session = self .get_session(session_id) .ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", session_id)))?; - let workspace_path = - Self::effective_workspace_path_from_config(&session.config).await.ok_or_else(|| { + let workspace_path = Self::effective_workspace_path_from_config(&session.config) + .await + .ok_or_else(|| { BitFunError::Validation(format!( "Session workspace_path is missing: {}", session_id @@ -764,7 +773,7 @@ impl SessionManager { session.last_activity_at = SystemTime::now(); } - // 2. Add user message to history and compression managers + // 2. Add the user message to the in-memory context cache. let user_message = if let Some(images) = image_contexts.as_ref().filter(|v| !v.is_empty()).cloned() { Message::user_multimodal(user_input.clone(), images) @@ -775,9 +784,6 @@ impl SessionManager { .with_turn_id(turn_id.clone()) .with_semantic_kind(MessageSemanticKind::ActualUserInput) }; - self.history_manager - .add_message(session_id, user_message.clone()) - .await?; self.compression_manager .add_message(session_id, user_message) .await?; @@ -825,9 +831,15 @@ impl SessionManager { final_response: String, stats: TurnStats, ) -> BitFunResult<()> { - let workspace_path = self.effective_session_workspace_path(session_id).await.ok_or_else(|| { - BitFunError::Validation(format!("Session workspace_path is missing: {}", session_id)) - })?; + let workspace_path = self + .effective_session_workspace_path(session_id) + .await + .ok_or_else(|| { + BitFunError::Validation(format!( + "Session workspace_path is missing: {}", + session_id + )) + })?; let turn_index = self .sessions .get(session_id) @@ -935,9 +947,15 @@ impl SessionManager { turn_id: &str, error: String, ) -> BitFunResult<()> { - let workspace_path = self.effective_session_workspace_path(session_id).await.ok_or_else(|| { - BitFunError::Validation(format!("Session workspace_path is missing: {}", session_id)) - })?; + let workspace_path = self + .effective_session_workspace_path(session_id) + .await + .ok_or_else(|| { + BitFunError::Validation(format!( + "Session workspace_path is missing: {}", + session_id + )) + })?; let turn_index = self .sessions .get(session_id) @@ -1078,16 +1096,8 @@ impl SessionManager { let user_message = Message::user(question.to_string()) .with_turn_id(turn_id.clone()) .with_semantic_kind(MessageSemanticKind::ActualUserInput); - let assistant_message = Message::assistant(full_text.to_string()) - .with_turn_id(turn_id.clone()); - - // Add to the in-memory history cache. - self.history_manager - .add_message(child_session_id, user_message.clone()) - .await?; - self.history_manager - .add_message(child_session_id, assistant_message.clone()) - .await?; + let assistant_message = + Message::assistant(full_text.to_string()).with_turn_id(turn_id.clone()); // Add to the in-memory compression/context cache. self.compression_manager @@ -1114,21 +1124,33 @@ impl SessionManager { // ============ Helper Methods ============ - /// Get session's message history (complete) + /// Get a best-effort message view for the session. + /// When persistence is enabled, rebuild from persisted turns so callers see the + /// canonical turn history instead of the runtime context cache. pub async fn get_messages(&self, session_id: &str) -> BitFunResult> { - self.history_manager.get_messages(session_id).await + if self.config.enable_persistence { + if let Some(workspace_path) = self.effective_session_workspace_path(session_id).await { + let messages = self + .rebuild_messages_from_turns(&workspace_path, session_id) + .await?; + if !messages.is_empty() { + return Ok(messages); + } + } + } + + Ok(self.compression_manager.get_context_messages(session_id)) } - /// Get session's message history (paginated) + /// Get a paginated best-effort message view for the session. pub async fn get_messages_paginated( &self, session_id: &str, limit: usize, before_message_id: Option<&str>, ) -> BitFunResult<(Vec, bool)> { - self.history_manager - .get_messages_paginated(session_id, limit, before_message_id) - .await + let messages = self.get_messages(session_id).await?; + Ok(Self::paginate_messages(&messages, limit, before_message_id)) } /// Get session's context messages (may be compressed) @@ -1141,11 +1163,6 @@ impl SessionManager { /// Add message to session pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - // Add to the in-memory history cache. - self.history_manager - .add_message(session_id, message.clone()) - .await?; - // Also add to the in-memory compression/context cache. self.compression_manager .add_message(session_id, message) .await?; diff --git a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts index 4628cca6..a56129e3 100644 --- a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts +++ b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts @@ -95,13 +95,6 @@ export interface UpdateSessionModelRequest { } -export interface Message { - id: string; - role: 'user' | 'assistant' | 'tool' | 'system'; - content: any; - timestamp: number; -} - export interface ModeInfo { id: string; name: string; @@ -286,21 +279,6 @@ export class AgentAPI { } } - - async getSessionMessages(sessionId: string, limit?: number): Promise { - try { - return await api.invoke('get_session_messages', { - request: { - sessionId, - limit - } - }); - } catch (error) { - throw createTauriCommandError('get_session_messages', error, { sessionId, limit }); - } - } - - async confirmToolExecution(sessionId: string, toolId: string): Promise { try { await api.invoke('confirm_tool_execution', { From 9f857b601f4e3e4e656e573944d3f41a1b92b27e Mon Sep 17 00:00:00 2001 From: wsp1911 Date: Thu, 26 Mar 2026 21:38:00 +0800 Subject: [PATCH 4/5] refactor(session): split runtime context store from context compression - replace CompressionManager with SessionContextStore and ContextCompressor - move runtime context state handling into SessionManager + SessionContextStore - make ExecutionEngine depend on ContextCompressor explicitly - remove the old mixed-responsibility manager and update wiring --- src/apps/cli/src/agent/agentic_system.rs | 6 +- src/apps/desktop/src/lib.rs | 6 +- src/apps/server/src/bootstrap.rs | 23 ++- .../src/agentic/execution/execution_engine.rs | 15 +- .../compression/{manager.rs => compressor.rs} | 180 ++++++------------ .../src/agentic/session/compression/mod.rs | 4 +- .../core/src/agentic/session/context_store.rs | 53 ++++++ src/crates/core/src/agentic/session/mod.rs | 2 + .../src/agentic/session/session_manager.rs | 70 +++---- 9 files changed, 176 insertions(+), 183 deletions(-) rename src/crates/core/src/agentic/session/compression/{manager.rs => compressor.rs} (78%) create mode 100644 src/crates/core/src/agentic/session/context_store.rs diff --git a/src/apps/cli/src/agent/agentic_system.rs b/src/apps/cli/src/agent/agentic_system.rs index 2f6317ea..64b9d7b6 100644 --- a/src/apps/cli/src/agent/agentic_system.rs +++ b/src/apps/cli/src/agent/agentic_system.rs @@ -33,10 +33,11 @@ pub async fn init_agentic_system() -> Result { let path_manager = try_get_path_manager_arc()?; let persistence_manager = Arc::new(persistence::PersistenceManager::new(path_manager.clone())?); - let compression_manager = Arc::new(session::CompressionManager::new(Default::default())); + let context_store = Arc::new(session::SessionContextStore::new()); + let context_compressor = Arc::new(session::ContextCompressor::new(Default::default())); let session_manager = Arc::new(session::SessionManager::new( - compression_manager, + context_store, persistence_manager.clone(), Default::default(), )); @@ -60,6 +61,7 @@ pub async fn init_agentic_system() -> Result { round_executor, event_queue.clone(), session_manager.clone(), + context_compressor, Default::default(), )); diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 974abab7..2ee6beff 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -713,10 +713,11 @@ async fn init_agentic_system() -> anyhow::Result<( let path_manager = try_get_path_manager_arc()?; let persistence_manager = Arc::new(persistence::PersistenceManager::new(path_manager.clone())?); - let compression_manager = Arc::new(session::CompressionManager::new(Default::default())); + let context_store = Arc::new(session::SessionContextStore::new()); + let context_compressor = Arc::new(session::ContextCompressor::new(Default::default())); let session_manager = Arc::new(session::SessionManager::new( - compression_manager, + context_store, persistence_manager, Default::default(), )); @@ -746,6 +747,7 @@ async fn init_agentic_system() -> anyhow::Result<( round_executor, event_queue.clone(), session_manager.clone(), + context_compressor, Default::default(), )); diff --git a/src/apps/server/src/bootstrap.rs b/src/apps/server/src/bootstrap.rs index 62708c76..861ac6e9 100644 --- a/src/apps/server/src/bootstrap.rs +++ b/src/apps/server/src/bootstrap.rs @@ -5,9 +5,7 @@ use bitfun_core::agentic::*; use bitfun_core::infrastructure::ai::AIClientFactory; use bitfun_core::infrastructure::try_get_path_manager_arc; -use bitfun_core::service::{ - ai_rules, config, filesystem, mcp, token_usage, workspace, -}; +use bitfun_core::service::{ai_rules, config, filesystem, mcp, token_usage, workspace}; use std::sync::Arc; use tokio::sync::RwLock; @@ -50,13 +48,13 @@ pub async fn initialize(workspace: Option) -> anyhow::Result) -> anyhow::Result) -> anyhow::Result, event_queue: Arc, session_manager: Arc, + context_compressor: Arc, config: ExecutionEngineConfig, } @@ -52,12 +53,14 @@ impl ExecutionEngine { round_executor: Arc, event_queue: Arc, session_manager: Arc, + context_compressor: Arc, config: ExecutionEngineConfig, ) -> Self { Self { round_executor, event_queue, session_manager, + context_compressor, config, } } @@ -445,14 +448,13 @@ impl ExecutionEngine { .get_session(session_id) .ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", session_id)))?; - let compression_manager = self.session_manager.get_compression_manager(); - // Record start time let start_time = std::time::Instant::now(); let old_messages_len = messages.len(); // Preprocess turns - let (turn_index_to_keep, turns) = compression_manager + let (turn_index_to_keep, turns) = self + .context_compressor .preprocess_turns(session_id, context_window, messages) .await?; if turn_index_to_keep == 0 { @@ -479,11 +481,14 @@ impl ExecutionEngine { .await; // Execute compression - match compression_manager + match self + .context_compressor .compress_turns(session_id, context_window, turn_index_to_keep, turns) .await { Ok(compression_result) => { + self.session_manager + .replace_context_messages(session_id, compression_result.messages.clone()); let mut new_messages = vec![system_prompt_message]; new_messages.extend(compression_result.messages); // Update session compression state diff --git a/src/crates/core/src/agentic/session/compression/manager.rs b/src/crates/core/src/agentic/session/compression/compressor.rs similarity index 78% rename from src/crates/core/src/agentic/session/compression/manager.rs rename to src/crates/core/src/agentic/session/compression/compressor.rs index a163bb47..e144456b 100644 --- a/src/crates/core/src/agentic/session/compression/manager.rs +++ b/src/crates/core/src/agentic/session/compression/compressor.rs @@ -1,6 +1,6 @@ -//! Context Compression Manager +//! Context compressor //! -//! Responsible for managing in-memory session context compression. +//! Responsible only for transforming a session context into a compressed one. use super::fallback::{ build_structured_compression_reminder, CompressionFallbackOptions, CompressionReminder, @@ -13,11 +13,10 @@ use crate::infrastructure::ai::{get_global_ai_client_factory, AIClient}; use crate::util::errors::{BitFunError, BitFunResult}; use crate::util::types::Message as AIMessage; use anyhow; -use dashmap::DashMap; use log::{debug, trace, warn}; use std::sync::Arc; -/// Compression manager configuration +/// Context compressor configuration #[derive(Debug, Clone)] pub struct CompressionConfig { pub keep_turns_ratio: f32, @@ -63,54 +62,14 @@ pub struct CompressionResult { pub has_model_summary: bool, } -/// Context compression manager -pub struct CompressionManager { - /// In-memory session context cache (by session ID). - /// The cache stores the current model context, which may contain - /// compressed reminders plus the most recent turn messages. - session_contexts: Arc>>, - /// Configuration +/// Stateless context compression service. +pub struct ContextCompressor { config: CompressionConfig, } -impl CompressionManager { +impl ContextCompressor { pub fn new(config: CompressionConfig) -> Self { - Self { - session_contexts: Arc::new(DashMap::new()), - config, - } - } - - /// Initialize an empty in-memory context cache for a session. - pub fn create_session(&self, session_id: &str) { - self.session_contexts.insert(session_id.to_string(), vec![]); - debug!("Created session context cache: session_id={}", session_id); - } - - /// Add a message to the in-memory context cache. - pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - if let Some(mut cached_messages) = self.session_contexts.get_mut(session_id) { - cached_messages.push(message); - } else { - self.session_contexts - .insert(session_id.to_string(), vec![message]); - } - Ok(()) - } - - /// Batch restore messages into the in-memory context cache. - pub fn restore_session(&self, session_id: &str, messages: Vec) { - self.session_contexts - .insert(session_id.to_string(), messages); - debug!("Restored session context cache: session_id={}", session_id); - } - - /// Get copy of messages for sending to model (may be compressed) - pub fn get_context_messages(&self, session_id: &str) -> Vec { - self.session_contexts - .get(session_id) - .map(|h| h.clone()) - .unwrap_or_default() + Self { config } } fn get_turn_index_to_keep(&self, turns_tokens: &[usize], token_limit: usize) -> usize { @@ -127,8 +86,8 @@ impl CompressionManager { result } - /// Returns (turn_index_to_keep, turns) - /// If turn_index_to_keep is 0, no compression is needed + /// Returns `(turn_index_to_keep, turns)`. + /// If `turn_index_to_keep` is 0, no compression is needed. pub async fn preprocess_turns( &self, session_id: &str, @@ -136,11 +95,10 @@ impl CompressionManager { mut messages: Vec, ) -> BitFunResult<(usize, Vec)> { debug!( - "Starting session context compression: session_id={}", + "Starting session context compression analysis: session_id={}", session_id ); - // Remove system messages let message_start = { let mut start_idx = messages.len(); for (idx, msg) in messages.iter().enumerate() { @@ -155,7 +113,7 @@ impl CompressionManager { if all_messages.is_empty() { debug!( - "Session history is empty, no compression needed: session_id={}", + "Session context is empty, no compression needed: session_id={}", session_id ); return Ok((0, Vec::new())); @@ -167,7 +125,6 @@ impl CompressionManager { .iter_mut() .map(|turn| turn.iter_mut().map(|m| m.get_tokens()).sum::()) .collect(); - // Print message count and token count for each turn { let turns_msg_num: Vec = turns_messages.iter().map(|t| t.len()).collect(); debug!( @@ -181,7 +138,6 @@ impl CompressionManager { let mut turn_index_to_keep = self.get_turn_index_to_keep(&turns_tokens, token_limit_keep_turns); if turn_index_to_keep == turns_count { - // If the last turn exceeds 30% but not 40%, keep the last turn let token_limit_last_turn = (context_window as f32 * self.config.keep_last_turn_ratio) as usize; if let Some(last_turn_tokens) = turns_tokens.last() { @@ -190,7 +146,10 @@ impl CompressionManager { } } } - debug!("Turn index to keep: {}", turn_index_to_keep); + debug!( + "Turn index to keep after compression analysis: session_id={}, keep_from_turn={}", + session_id, turn_index_to_keep + ); let turns: Vec = turns_messages .into_iter() @@ -208,7 +167,7 @@ impl CompressionManager { mut turns: Vec, ) -> BitFunResult { if turns.is_empty() { - debug!("No turns need compression"); + debug!("No turns need compression: session_id={}", session_id); return Ok(CompressionResult { messages: Vec::new(), has_model_summary: false, @@ -216,7 +175,10 @@ impl CompressionManager { } let Some(last_turn_messages) = turns.last().map(|turn| &turn.messages) else { - debug!("No turns available after split, skipping last-turn extraction"); + debug!( + "No turns available after split, skipping compression: session_id={}", + session_id + ); return Ok(CompressionResult { messages: Vec::new(), has_model_summary: false, @@ -234,7 +196,7 @@ impl CompressionManager { } }) }; - let last_todo = MessageHelper::get_last_todo(&last_turn_messages); + let last_todo = MessageHelper::get_last_todo(last_turn_messages); trace!("Last user message: {:?}", last_user_message); trace!("Last todo: {:?}", last_todo); let turns_to_keep = turns.split_off(turn_index_to_keep); @@ -247,7 +209,6 @@ impl CompressionManager { .await?; trace!("Compression reminder generated"); has_model_summary = reminder.used_model_summary; - compressed_messages.push(self.create_reminder_message(reminder)); } @@ -256,11 +217,9 @@ impl CompressionManager { compressed_messages.extend(turn.messages); } } else { - // All turns compressed, append last user message if let Some(last_user_message) = last_user_message { compressed_messages.push(last_user_message); } - // Append last todo if let Some(last_todo) = last_todo { compressed_messages.push( Message::user(render_system_reminder(&format!( @@ -272,9 +231,11 @@ impl CompressionManager { } } - // Replace the runtime context cache with the newly compressed context. - self.session_contexts - .insert(session_id.to_string(), compressed_messages.clone()); + debug!( + "Compression completed: session_id={}, compressed_messages={}", + session_id, + compressed_messages.len() + ); Ok(CompressionResult { messages: compressed_messages, @@ -403,11 +364,9 @@ Be thorough and precise. Do not lose important technical details from either the let mut request_cnt = 0; for (idx, turn) in turns_to_compress.into_iter().enumerate() { if current_tokens + turn.tokens <= max_tokens_in_one_request { - // Add current turn's messages to accumulated messages cur_messages.extend(turn.messages); current_tokens += turn.tokens; } else { - // Compress accumulated messages if !cur_messages.is_empty() { summary = self .generate_summary( @@ -416,7 +375,7 @@ Be thorough and precise. Do not lose important technical details from either the cur_messages, ) .await?; - cur_messages = Vec::new(); // cur_messages has been consumed, need to reassign + cur_messages = Vec::new(); current_tokens = 0; request_cnt += 1; trace!( @@ -427,50 +386,44 @@ Be thorough and precise. Do not lose important technical details from either the } if turn.tokens <= max_tokens_in_one_request { - // Add current turn's messages to accumulated messages cur_messages.extend(turn.messages); current_tokens = turn.tokens; + } else if let Some((messages_part1, messages_part2)) = + MessageHelper::split_messages_in_middle(turn.messages) + { + summary = self + .generate_summary( + ai_client.clone(), + gen_system_message_for_summary(&summary), + messages_part1, + ) + .await?; + request_cnt += 1; + debug!( + "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", + request_cnt, idx, summary + ); + summary = self + .generate_summary( + ai_client.clone(), + gen_system_message_for_summary(&summary), + messages_part2, + ) + .await?; + request_cnt += 1; + debug!( + "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", + request_cnt, idx, summary + ); } else { - // Single turn too long - if let Some((messages_part1, messages_part2)) = - MessageHelper::split_messages_in_middle(turn.messages) - { - // Compress first half and second half separately - summary = self - .generate_summary( - ai_client.clone(), - gen_system_message_for_summary(&summary), - messages_part1, - ) - .await?; - request_cnt += 1; - debug!( - "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", - request_cnt, idx, summary - ); - summary = self - .generate_summary( - ai_client.clone(), - gen_system_message_for_summary(&summary), - messages_part2, - ) - .await?; - request_cnt += 1; - debug!( - "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", - request_cnt, idx, summary - ); - } else { - return Err(BitFunError::Service(format!( - "Compression Failed, turn {} cannot be split in middle", - idx - ))); - } + return Err(BitFunError::Service(format!( + "Compression Failed, turn {} cannot be split in middle", + idx + ))); } } } - // Compress remaining messages if !cur_messages.is_empty() { summary = self .generate_summary( @@ -485,7 +438,6 @@ Be thorough and precise. Do not lose important technical details from either the Ok(summary) } - /// Generate summary for dialog turns, messages need to remove system prompt async fn generate_summary( &self, ai_client: Arc, @@ -496,7 +448,6 @@ Be thorough and precise. Do not lose important technical details from either the .await } - /// Generate summary for dialog turns, supports retry async fn generate_summary_with_retry( &self, ai_client: Arc, @@ -504,9 +455,7 @@ Be thorough and precise. Do not lose important technical details from either the messages: Vec, max_tries: usize, ) -> BitFunResult { - // Call AI to generate summary let mut summary_messages = vec![AIMessage::from(system_message_for_summary)]; - // Remove thinking process when summarizing summary_messages.extend(messages.iter().map(|m| { let mut ai_msg = AIMessage::from(m); ai_msg.reasoning_content = None; @@ -540,9 +489,8 @@ Be thorough and precise. Do not lose important technical details from either the ); last_error = Some(e); - // If not the last attempt, wait before retrying if attempt < max_tries - 1 { - let delay_ms = base_wait_time_ms * (1 << attempt.min(3)); // Exponential backoff + let delay_ms = base_wait_time_ms * (1 << attempt.min(3)); debug!("Waiting {}ms before retry {}...", delay_ms, attempt + 2); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; } @@ -550,7 +498,6 @@ Be thorough and precise. Do not lose important technical details from either the } } - // All attempts failed let error_msg = format!( "Summary generation failed after {} attempts: {}", max_tries, @@ -560,12 +507,6 @@ Be thorough and precise. Do not lose important technical details from either the Err(BitFunError::AIClient(error_msg)) } - /// Delete the in-memory context cache for a session. - pub fn delete_session(&self, session_id: &str) { - self.session_contexts.remove(session_id); - debug!("Deleted session context cache: session_id={}", session_id); - } - fn get_compact_prompt(&self) -> String { r#"Your task is to create a detailed summary of the conversation so far, paying close attention to the user's explicit requests and your previous actions. This summary should be thorough in capturing technical details, code patterns, and architectural decisions that would be essential for continuing development work without losing context. @@ -651,6 +592,7 @@ Here's an example of how your output should be structured: Please provide your summary based on the conversation so far, following this structure and ensuring precision and thoroughness in your response. -"#.to_string() +"# + .to_string() } } diff --git a/src/crates/core/src/agentic/session/compression/mod.rs b/src/crates/core/src/agentic/session/compression/mod.rs index 8beb1f6b..9437095a 100644 --- a/src/crates/core/src/agentic/session/compression/mod.rs +++ b/src/crates/core/src/agentic/session/compression/mod.rs @@ -1,7 +1,7 @@ //! Session context compression modules. +pub mod compressor; pub mod fallback; -pub mod manager; +pub use compressor::*; pub use fallback::*; -pub use manager::*; diff --git a/src/crates/core/src/agentic/session/context_store.rs b/src/crates/core/src/agentic/session/context_store.rs new file mode 100644 index 00000000..9023ccdf --- /dev/null +++ b/src/crates/core/src/agentic/session/context_store.rs @@ -0,0 +1,53 @@ +//! Runtime session context store. +//! +//! Holds the in-memory model context for each active session. + +use crate::agentic::core::Message; +use dashmap::DashMap; +use log::debug; +use std::sync::Arc; + +/// In-memory runtime context store for active sessions. +pub struct SessionContextStore { + session_contexts: Arc>>, +} + +impl SessionContextStore { + pub fn new() -> Self { + Self { + session_contexts: Arc::new(DashMap::new()), + } + } + + pub fn create_session(&self, session_id: &str) { + self.session_contexts.insert(session_id.to_string(), vec![]); + debug!("Created session context cache: session_id={}", session_id); + } + + pub fn add_message(&self, session_id: &str, message: Message) { + if let Some(mut cached_messages) = self.session_contexts.get_mut(session_id) { + cached_messages.push(message); + } else { + self.session_contexts + .insert(session_id.to_string(), vec![message]); + } + } + + pub fn replace_context(&self, session_id: &str, messages: Vec) { + self.session_contexts + .insert(session_id.to_string(), messages); + debug!("Replaced session context cache: session_id={}", session_id); + } + + pub fn get_context_messages(&self, session_id: &str) -> Vec { + self.session_contexts + .get(session_id) + .map(|messages| messages.clone()) + .unwrap_or_default() + } + + pub fn delete_session(&self, session_id: &str) { + self.session_contexts.remove(session_id); + debug!("Deleted session context cache: session_id={}", session_id); + } +} diff --git a/src/crates/core/src/agentic/session/mod.rs b/src/crates/core/src/agentic/session/mod.rs index 97128342..1b0b22a9 100644 --- a/src/crates/core/src/agentic/session/mod.rs +++ b/src/crates/core/src/agentic/session/mod.rs @@ -3,7 +3,9 @@ //! Provides session lifecycle management and context management. pub mod compression; +pub mod context_store; pub mod session_manager; pub use compression::*; +pub use context_store::*; pub use session_manager::*; diff --git a/src/crates/core/src/agentic/session/session_manager.rs b/src/crates/core/src/agentic/session/session_manager.rs index 9582945c..d2721a5e 100644 --- a/src/crates/core/src/agentic/session/session_manager.rs +++ b/src/crates/core/src/agentic/session/session_manager.rs @@ -8,7 +8,7 @@ use crate::agentic::core::{ }; use crate::agentic::image_analysis::ImageContextData; use crate::agentic::persistence::PersistenceManager; -use crate::agentic::session::CompressionManager; +use crate::agentic::session::SessionContextStore; use crate::infrastructure::ai::get_global_ai_client_factory; use crate::service::session::{ DialogTurnData, ModelRoundData, TextItemData, TurnStatus, UserMessageData, @@ -49,7 +49,7 @@ pub struct SessionManager { sessions: Arc>, /// Sub-components - compression_manager: Arc, + context_store: Arc, persistence_manager: Arc, /// Configuration @@ -209,7 +209,7 @@ impl SessionManager { } pub fn new( - compression_manager: Arc, + context_store: Arc, persistence_manager: Arc, config: SessionManagerConfig, ) -> Self { @@ -217,7 +217,7 @@ impl SessionManager { let manager = Self { sessions: Arc::new(DashMap::new()), - compression_manager, + context_store, persistence_manager, config, }; @@ -294,8 +294,8 @@ impl SessionManager { // 1. Add to memory self.sessions.insert(session_id.clone(), session.clone()); - // 2. Initialize the in-memory compression/context cache. - self.compression_manager.create_session(&session_id); + // 2. Initialize the in-memory context cache. + self.context_store.create_session(&session_id); // 3. Persist to local path (handles remote workspaces correctly) if self.config.enable_persistence { @@ -479,7 +479,7 @@ impl SessionManager { } } - self.compression_manager.delete_session(session_id); + self.context_store.delete_session(session_id); // 2. Delete persisted data if self.config.enable_persistence { @@ -592,15 +592,14 @@ impl SessionManager { ); } - // 3. Restore the in-memory compression manager state from the recovered messages. + // 3. Restore the in-memory context cache from the recovered messages. // If session already exists, delete old one first then create (ensure clean state) if session_already_in_memory { - self.compression_manager.delete_session(session_id); + self.context_store.delete_session(session_id); } - // Use restore_session for batch restore, avoid triggering persistence for each add_message - self.compression_manager - .restore_session(session_id, messages.clone()); + self.context_store + .replace_context(session_id, messages.clone()); // If session's recorded turn count doesn't match snapshot, truncate to snapshot's turn if let Some(latest_turn_index) = latest_turn_index { @@ -622,10 +621,7 @@ impl SessionManager { session.dialog_turn_ids.clear(); } - let context_msg_count = self - .compression_manager - .get_context_messages(session_id) - .len(); + let context_msg_count = self.context_store.get_context_messages(session_id).len(); info!( "Session restored: session_id={}, session_name={}, messages={}, context_messages={}", @@ -671,8 +667,7 @@ impl SessionManager { }; // 2) Restore the in-memory context cache. - self.compression_manager - .restore_session(session_id, messages); + self.context_store.replace_context(session_id, messages); // 3) Truncate session turn list & persist if let Some(mut session) = self.sessions.get_mut(session_id) { @@ -784,9 +779,7 @@ impl SessionManager { .with_turn_id(turn_id.clone()) .with_semantic_kind(MessageSemanticKind::ActualUserInput) }; - self.compression_manager - .add_message(session_id, user_message) - .await?; + self.context_store.add_message(session_id, user_message); // 3. Persist if self.config.enable_persistence { @@ -1099,13 +1092,11 @@ impl SessionManager { let assistant_message = Message::assistant(full_text.to_string()).with_turn_id(turn_id.clone()); - // Add to the in-memory compression/context cache. - self.compression_manager - .add_message(child_session_id, user_message) - .await?; - self.compression_manager - .add_message(child_session_id, assistant_message) - .await?; + // Add to the in-memory runtime context cache. + self.context_store + .add_message(child_session_id, user_message); + self.context_store + .add_message(child_session_id, assistant_message); if let Some(mut session) = self.sessions.get_mut(child_session_id) { if !session @@ -1139,7 +1130,7 @@ impl SessionManager { } } - Ok(self.compression_manager.get_context_messages(session_id)) + Ok(self.context_store.get_context_messages(session_id)) } /// Get a paginated best-effort message view for the session. @@ -1153,22 +1144,24 @@ impl SessionManager { Ok(Self::paginate_messages(&messages, limit, before_message_id)) } - /// Get session's context messages (may be compressed) + /// Get session's runtime context messages (may already include compressed reminders). pub async fn get_context_messages(&self, session_id: &str) -> BitFunResult> { - // Get context messages from compression manager (may be compressed) - let context_messages = self.compression_manager.get_context_messages(session_id); + let context_messages = self.context_store.get_context_messages(session_id); Ok(context_messages) } - /// Add message to session + /// Add a message to the runtime context cache. pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - self.compression_manager - .add_message(session_id, message) - .await?; + self.context_store.add_message(session_id, message); Ok(()) } + /// Replace the runtime context cache for a session. + pub fn replace_context_messages(&self, session_id: &str, messages: Vec) { + self.context_store.replace_context(session_id, messages); + } + /// Get dialog turn count pub fn get_turn_count(&self, session_id: &str) -> usize { self.sessions @@ -1184,11 +1177,6 @@ impl SessionManager { .map(|s| s.compression_state.clone()) } - /// Get compression manager (for ExecutionEngine use) - pub fn get_compression_manager(&self) -> Arc { - self.compression_manager.clone() - } - /// Update session's compression state pub async fn update_compression_state( &self, From 32fcdd7d174c483926a9951802b3c228352af77f Mon Sep 17 00:00:00 2001 From: wsp1911 Date: Thu, 26 Mar 2026 21:38:00 +0800 Subject: [PATCH 5/5] feat(session): persist runtime context snapshots on semantic updates - save context snapshots at turn start, context append, compression replace, and turn end - await snapshot persistence when compression rewrites runtime context - document best-effort, non-transactional snapshot persistence - restore by merging snapshot state with newer persisted turns - fix `/btw` turn indexing and session metadata persistence --- .../src/agentic/execution/execution_engine.rs | 3 +- .../agentic/session/compression/compressor.rs | 2 +- .../src/agentic/session/session_manager.rs | 274 ++++++++++++------ 3 files changed, 187 insertions(+), 92 deletions(-) diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 28f0c5a2..7c892d95 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -488,7 +488,8 @@ impl ExecutionEngine { { Ok(compression_result) => { self.session_manager - .replace_context_messages(session_id, compression_result.messages.clone()); + .replace_context_messages(session_id, compression_result.messages.clone()) + .await; let mut new_messages = vec![system_prompt_message]; new_messages.extend(compression_result.messages); // Update session compression state diff --git a/src/crates/core/src/agentic/session/compression/compressor.rs b/src/crates/core/src/agentic/session/compression/compressor.rs index e144456b..c74e8b2d 100644 --- a/src/crates/core/src/agentic/session/compression/compressor.rs +++ b/src/crates/core/src/agentic/session/compression/compressor.rs @@ -444,7 +444,7 @@ Be thorough and precise. Do not lose important technical details from either the system_message_for_summary: Message, messages: Vec, ) -> BitFunResult { - self.generate_summary_with_retry(ai_client, system_message_for_summary, messages, 1) + self.generate_summary_with_retry(ai_client, system_message_for_summary, messages, 2) .await } diff --git a/src/crates/core/src/agentic/session/session_manager.rs b/src/crates/core/src/agentic/session/session_manager.rs index d2721a5e..2253f918 100644 --- a/src/crates/core/src/agentic/session/session_manager.rs +++ b/src/crates/core/src/agentic/session/session_manager.rs @@ -132,15 +132,7 @@ impl SessionManager { Self::effective_workspace_path_from_config(&config).await } - async fn rebuild_messages_from_turns( - &self, - workspace_path: &Path, - session_id: &str, - ) -> BitFunResult> { - let turns = self - .persistence_manager - .load_session_turns(workspace_path, session_id) - .await?; + fn build_messages_from_turns(turns: &[DialogTurnData]) -> Vec { let mut messages = Vec::new(); for turn in turns { @@ -205,7 +197,82 @@ impl SessionManager { } } - Ok(messages) + messages + } + + async fn rebuild_messages_from_turns( + &self, + workspace_path: &Path, + session_id: &str, + ) -> BitFunResult> { + let turns = self + .persistence_manager + .load_session_turns(workspace_path, session_id) + .await?; + Ok(Self::build_messages_from_turns(&turns)) + } + + /// Persist the current runtime context by overwriting `snapshots/context-{turn_index}.json`. + /// + /// Save timing is intentionally tied to semantic context changes rather than token chunks: + /// - after a turn starts and the user message enters runtime context + /// - after assistant/tool messages are appended to runtime context + /// - after compression replaces runtime context + /// - once more when a turn completes or fails + /// + /// This is still a best-effort multi-file persistence flow, not a transactional commit. + /// `session.json`, `turns/turn-*.json`, and `snapshots/context-*.json` may be briefly out of + /// sync if the process crashes between writes, so restore logic must tolerate partial updates. + async fn persist_context_snapshot_for_turn_best_effort( + &self, + session_id: &str, + turn_index: usize, + reason: &str, + ) { + if !self.config.enable_persistence { + return; + } + + let Some(workspace_path) = self.effective_session_workspace_path(session_id).await else { + debug!( + "Skipping context snapshot persistence because workspace path is unavailable: session_id={}, turn_index={}, reason={}", + session_id, turn_index, reason + ); + return; + }; + + let context_messages = self.context_store.get_context_messages(session_id); + if let Err(err) = self + .persistence_manager + .save_turn_context_snapshot(&workspace_path, session_id, turn_index, &context_messages) + .await + { + warn!( + "failed to persist context snapshot: session_id={}, turn_index={}, reason={}, err={}", + session_id, turn_index, reason, err + ); + } + } + + async fn persist_current_turn_context_snapshot_best_effort( + &self, + session_id: &str, + reason: &str, + ) { + let Some(turn_index) = self + .sessions + .get(session_id) + .and_then(|session| session.dialog_turn_ids.len().checked_sub(1)) + else { + debug!( + "Skipping current-turn context snapshot because no turn is active: session_id={}, reason={}", + session_id, reason + ); + return; + }; + + self.persist_context_snapshot_for_turn_best_effort(session_id, turn_index, reason) + .await; } pub fn new( @@ -567,10 +634,22 @@ impl SessionManager { ); } - // 2. Rebuild the runtime context cache from the latest persisted snapshot when - // available; otherwise reconstruct it from persisted turns. + // 2. Restore runtime context with snapshot-first semantics. + // If the latest snapshot lags behind turn persistence, append the missing turn delta + // instead of truncating session history. + // + // This compensates for the fact that persistence is not transactional across + // `session.json`, `turns/*.json`, and `snapshots/context-*.json`. + let persisted_turns = self + .persistence_manager + .load_session_turns(&session_storage_path, session_id) + .await?; + let persisted_turn_ids: Vec = persisted_turns + .iter() + .map(|turn| turn.turn_id.clone()) + .collect(); let mut latest_turn_index: Option = None; - let messages = match self + let mut messages = match self .persistence_manager .load_latest_turn_context_snapshot(&session_storage_path, session_id) .await? @@ -579,9 +658,21 @@ impl SessionManager { latest_turn_index = Some(turn_index); msgs } - None => { - self.rebuild_messages_from_turns(&session_storage_path, session_id) - .await? + None => Self::build_messages_from_turns(&persisted_turns), + }; + + if let Some(snapshot_turn_index) = latest_turn_index { + let delta_start = snapshot_turn_index.saturating_add(1); + if delta_start < persisted_turns.len() { + warn!( + "Context snapshot is behind persisted turns, rebuilding delta: session_id={}, snapshot_turn_index={}, persisted_turn_count={}", + session_id, + snapshot_turn_index, + persisted_turns.len() + ); + messages.extend(Self::build_messages_from_turns( + &persisted_turns[delta_start..], + )); } }; @@ -601,19 +692,39 @@ impl SessionManager { self.context_store .replace_context(session_id, messages.clone()); - // If session's recorded turn count doesn't match snapshot, truncate to snapshot's turn - if let Some(latest_turn_index) = latest_turn_index { - let expected_turn_count = latest_turn_index + 1; - if session.dialog_turn_ids.len() > expected_turn_count { - warn!( - "Session turn count exceeds snapshot, truncating: session_id={}, turns={} -> {}", - session_id, - session.dialog_turn_ids.len(), - expected_turn_count - ); - session.dialog_turn_ids.truncate(expected_turn_count); - } - } else if !session.dialog_turn_ids.is_empty() && messages.is_empty() { + let recoverable_turn_count = latest_turn_index + .map(|turn_index| turn_index + 1) + .unwrap_or(0) + .max(persisted_turns.len()); + + if session.dialog_turn_ids.len() < persisted_turns.len() { + warn!( + "Session metadata is behind persisted turns, rebuilding dialog_turn_ids: session_id={}, session_turn_count={}, persisted_turn_count={}", + session_id, + session.dialog_turn_ids.len(), + persisted_turns.len() + ); + session.dialog_turn_ids = persisted_turn_ids; + } else if session.dialog_turn_ids.len() > recoverable_turn_count { + warn!( + "Session metadata exceeds recoverable history, truncating: session_id={}, session_turn_count={}, recoverable_turn_count={}", + session_id, + session.dialog_turn_ids.len(), + recoverable_turn_count + ); + session.dialog_turn_ids.truncate(recoverable_turn_count); + } else if persisted_turns.len() == session.dialog_turn_ids.len() + && session.dialog_turn_ids != persisted_turn_ids + { + warn!( + "Session metadata turn ids diverge from persisted turns, normalizing order: session_id={}", + session_id + ); + session.dialog_turn_ids = persisted_turn_ids; + } + + if recoverable_turn_count == 0 && !session.dialog_turn_ids.is_empty() && messages.is_empty() + { warn!( "Session has no available context snapshot and messages are empty, clearing turns: session_id={}", session_id @@ -808,6 +919,9 @@ impl SessionManager { .await?; } + self.persist_context_snapshot_for_turn_best_effort(session_id, turn_index, "turn_started") + .await; + debug!( "Starting dialog turn: turn_id={}, turn_index={}", turn_id, turn_index @@ -885,37 +999,12 @@ impl SessionManager { turn.duration_ms = Some(stats.duration_ms); turn.end_time = Some(completion_timestamp); - if self.config.enable_persistence { - match self.get_context_messages(session_id).await { - Ok(context_messages) => { - if let Err(err) = self - .persistence_manager - .save_turn_context_snapshot( - &workspace_path, - session_id, - turn.turn_index, - &context_messages, - ) - .await - { - warn!( - "failed to save turn context snapshot: session_id={}, turn_index={}, err={}", - session_id, - turn.turn_index, - err - ); - } - } - Err(err) => { - warn!( - "failed to build context messages for snapshot: session_id={}, turn_index={}, err={}", - session_id, - turn.turn_index, - err - ); - } - } - } + self.persist_context_snapshot_for_turn_best_effort( + session_id, + turn.turn_index, + "turn_completed", + ) + .await; // Persist if self.config.enable_persistence { @@ -968,32 +1057,13 @@ impl SessionManager { .as_millis() as u64, ); + self.persist_context_snapshot_for_turn_best_effort( + session_id, + turn.turn_index, + "turn_failed", + ) + .await; if self.config.enable_persistence { - match self.get_context_messages(session_id).await { - Ok(context_messages) => { - if let Err(err) = self - .persistence_manager - .save_turn_context_snapshot( - &workspace_path, - session_id, - turn.turn_index, - &context_messages, - ) - .await - { - warn!( - "failed to save turn context snapshot on failure: session_id={}, turn_index={}, err={}", - session_id, turn.turn_index, err - ); - } - } - Err(err) => { - warn!( - "failed to build context messages for snapshot on failure: session_id={}, turn_index={}, err={}", - session_id, turn.turn_index, err - ); - } - } self.persistence_manager .save_dialog_turn(&workspace_path, &turn) .await?; @@ -1022,8 +1092,13 @@ impl SessionManager { let session = self.sessions.get(child_session_id).ok_or_else(|| { BitFunError::NotFound(format!("Session not found: {}", child_session_id)) })?; - let turn_id = format!("btw-turn-{}", request_id); + let turn_index = session + .dialog_turn_ids + .iter() + .position(|existing| existing == &turn_id) + .unwrap_or(session.dialog_turn_ids.len()); + let user_message_id = format!("btw-user-{}", request_id); let round_id = format!("btw-round-{}", request_id); let text_id = format!("btw-text-{}", request_id); @@ -1034,7 +1109,7 @@ impl SessionManager { let mut turn = DialogTurnData::new( turn_id.clone(), - 0, + turn_index, child_session_id.to_string(), UserMessageData { id: user_message_id, @@ -1108,8 +1183,21 @@ impl SessionManager { } session.updated_at = SystemTime::now(); session.last_activity_at = SystemTime::now(); + + if self.config.enable_persistence { + self.persistence_manager + .save_session(workspace_path, &session) + .await?; + } } + self.persist_context_snapshot_for_turn_best_effort( + child_session_id, + turn_index, + "btw_turn_persisted", + ) + .await; + Ok(()) } @@ -1151,15 +1239,21 @@ impl SessionManager { Ok(context_messages) } - /// Add a message to the runtime context cache. + /// Add a semantic message to the runtime context cache and immediately refresh the current + /// turn snapshot so crashes do not lose the latest in-memory context change. pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { self.context_store.add_message(session_id, message); + self.persist_current_turn_context_snapshot_best_effort(session_id, "context_message_added") + .await; Ok(()) } - /// Replace the runtime context cache for a session. - pub fn replace_context_messages(&self, session_id: &str, messages: Vec) { + /// Replace the runtime context cache for a session and immediately refresh the current turn + /// snapshot. This is primarily used after compression rewrites the model-visible context. + pub async fn replace_context_messages(&self, session_id: &str, messages: Vec) { self.context_store.replace_context(session_id, messages); + self.persist_current_turn_context_snapshot_best_effort(session_id, "context_replaced") + .await; } /// Get dialog turn count