diff --git a/src/apps/cli/src/agent/agentic_system.rs b/src/apps/cli/src/agent/agentic_system.rs index d8c41baf..64b9d7b6 100644 --- a/src/apps/cli/src/agent/agentic_system.rs +++ b/src/apps/cli/src/agent/agentic_system.rs @@ -33,25 +33,11 @@ pub async fn init_agentic_system() -> Result { let path_manager = try_get_path_manager_arc()?; let persistence_manager = Arc::new(persistence::PersistenceManager::new(path_manager.clone())?); - let history_manager = Arc::new(session::MessageHistoryManager::new( - persistence_manager.clone(), - session::HistoryConfig { - enable_persistence: false, - ..Default::default() - }, - )); - - let compression_manager = Arc::new(session::CompressionManager::new( - persistence_manager.clone(), - session::CompressionConfig { - enable_persistence: false, - ..Default::default() - }, - )); + let context_store = Arc::new(session::SessionContextStore::new()); + let context_compressor = Arc::new(session::ContextCompressor::new(Default::default())); let session_manager = Arc::new(session::SessionManager::new( - history_manager.clone(), - compression_manager, + context_store, persistence_manager.clone(), Default::default(), )); @@ -75,6 +61,7 @@ pub async fn init_agentic_system() -> Result { round_executor, event_queue.clone(), session_manager.clone(), + context_compressor, Default::default(), )); diff --git a/src/apps/desktop/src/api/agentic_api.rs b/src/apps/desktop/src/api/agentic_api.rs index 32ad50f5..cfbe2fda 100644 --- a/src/apps/desktop/src/api/agentic_api.rs +++ b/src/apps/desktop/src/api/agentic_api.rs @@ -125,22 +125,6 @@ pub struct SessionResponse { pub created_at: u64, } -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct GetMessagesRequest { - pub session_id: String, - pub limit: Option, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct MessageDTO { - pub id: String, - pub role: String, - pub content: serde_json::Value, - pub timestamp: u64, -} - #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CancelDialogTurnRequest { @@ -583,21 +567,6 @@ pub async fn list_sessions( Ok(responses) } -#[tauri::command] -pub async fn get_session_messages( - coordinator: State<'_, Arc>, - request: GetMessagesRequest, -) -> Result, String> { - let messages = coordinator - .get_messages(&request.session_id) - .await - .map_err(|e| format!("Failed to get messages: {}", e))?; - - let message_dtos = messages.into_iter().map(message_to_dto).collect(); - - Ok(message_dtos) -} - #[tauri::command] pub async fn confirm_tool_execution( coordinator: State<'_, Arc>, @@ -733,75 +702,6 @@ fn session_to_response(session: Session) -> SessionResponse { } } -fn message_to_dto(message: Message) -> MessageDTO { - let role = match message.role { - MessageRole::User => "user", - MessageRole::Assistant => "assistant", - MessageRole::Tool => "tool", - MessageRole::System => "system", - }; - - let content = match message.content { - MessageContent::Text(text) => serde_json::json!({ "type": "text", "text": text }), - MessageContent::Multimodal { text, images } => { - let images: Vec = images - .into_iter() - .map(|img| { - serde_json::json!({ - "id": img.id, - "image_path": img.image_path, - "mime_type": img.mime_type, - "metadata": img.metadata, - "has_data_url": img.data_url.as_ref().is_some_and(|s| !s.is_empty()), - }) - }) - .collect(); - - serde_json::json!({ - "type": "multimodal", - "text": text, - "images": images, - }) - } - MessageContent::ToolResult { - tool_id, - tool_name, - result, - result_for_assistant, - is_error: _, - image_attachments, - } => { - serde_json::json!({ - "type": "tool_result", - "tool_id": tool_id, - "tool_name": tool_name, - "result": result, - "result_for_assistant": result_for_assistant, - "has_image_attachments": image_attachments.as_ref().is_some_and(|a| !a.is_empty()), - }) - } - MessageContent::Mixed { - reasoning_content, - text, - tool_calls, - } => { - serde_json::json!({ - "type": "mixed", - "reasoning_content": reasoning_content, - "text": text, - "tool_calls": tool_calls, - }) - } - }; - - MessageDTO { - id: message.id, - role: role.to_string(), - content, - timestamp: system_time_to_unix_secs(message.timestamp), - } -} - fn system_time_to_unix_secs(time: std::time::SystemTime) -> u64 { match time.duration_since(std::time::UNIX_EPOCH) { Ok(duration) => duration.as_secs(), diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 4d3868d0..2ee6beff 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -321,7 +321,6 @@ pub async fn run() { api::agentic_api::restore_session, webdriver_bridge_result, api::agentic_api::list_sessions, - api::agentic_api::get_session_messages, api::agentic_api::confirm_tool_execution, api::agentic_api::reject_tool_execution, api::agentic_api::cancel_tool, @@ -714,25 +713,11 @@ async fn init_agentic_system() -> anyhow::Result<( let path_manager = try_get_path_manager_arc()?; let persistence_manager = Arc::new(persistence::PersistenceManager::new(path_manager.clone())?); - let history_manager = Arc::new(session::MessageHistoryManager::new( - persistence_manager.clone(), - session::HistoryConfig { - enable_persistence: false, - ..Default::default() - }, - )); - - let compression_manager = Arc::new(session::CompressionManager::new( - persistence_manager.clone(), - session::CompressionConfig { - enable_persistence: false, - ..Default::default() - }, - )); + let context_store = Arc::new(session::SessionContextStore::new()); + let context_compressor = Arc::new(session::ContextCompressor::new(Default::default())); let session_manager = Arc::new(session::SessionManager::new( - history_manager, - compression_manager, + context_store, persistence_manager, Default::default(), )); @@ -762,6 +747,7 @@ async fn init_agentic_system() -> anyhow::Result<( round_executor, event_queue.clone(), session_manager.clone(), + context_compressor, Default::default(), )); diff --git a/src/apps/server/src/bootstrap.rs b/src/apps/server/src/bootstrap.rs index 8e470db7..861ac6e9 100644 --- a/src/apps/server/src/bootstrap.rs +++ b/src/apps/server/src/bootstrap.rs @@ -5,9 +5,7 @@ use bitfun_core::agentic::*; use bitfun_core::infrastructure::ai::AIClientFactory; use bitfun_core::infrastructure::try_get_path_manager_arc; -use bitfun_core::service::{ - ai_rules, config, filesystem, mcp, token_usage, workspace, -}; +use bitfun_core::service::{ai_rules, config, filesystem, mcp, token_usage, workspace}; use std::sync::Arc; use tokio::sync::RwLock; @@ -50,28 +48,13 @@ pub async fn initialize(workspace: Option) -> anyhow::Result) -> anyhow::Result) -> anyhow::Result { let request = extract_request(¶ms)?; let path: String = serde_json::from_value( - request.get("path").cloned().ok_or_else(|| anyhow!("Missing path"))?, + request + .get("path") + .cloned() + .ok_or_else(|| anyhow!("Missing path"))?, )?; - let info = state.workspace_service.open_workspace(path.into()).await + let info = state + .workspace_service + .open_workspace(path.into()) + .await .map_err(|e| anyhow!("{}", e))?; *state.workspace_path.write().await = Some(info.root_path.clone()); Ok(serde_json::to_value(&info).unwrap_or_default()) @@ -72,7 +78,10 @@ pub async fn dispatch( "read_file_content" => { let request = extract_request(¶ms)?; let file_path = get_string(&request, "filePath")?; - let result = state.filesystem_service.read_file(&file_path).await + let result = state + .filesystem_service + .read_file(&file_path) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::json!(result.content)) } @@ -80,7 +89,10 @@ pub async fn dispatch( let request = extract_request(¶ms)?; let file_path = get_string(&request, "filePath")?; let content = get_string(&request, "content")?; - state.filesystem_service.write_file(&file_path, &content).await + state + .filesystem_service + .write_file(&file_path, &content) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::Value::Null) } @@ -96,7 +108,10 @@ pub async fn dispatch( "get_file_tree" => { let request = extract_request(¶ms)?; let path = get_string(&request, "path")?; - let nodes = state.filesystem_service.build_file_tree(&path).await + let nodes = state + .filesystem_service + .build_file_tree(&path) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::to_value(&nodes).unwrap_or_default()) } @@ -110,21 +125,32 @@ pub async fn dispatch( "get_config" => { let request = extract_request(¶ms)?; let key = request.get("key").and_then(|v| v.as_str()); - let config: serde_json::Value = state.config_service - .get_config(key).await + let config: serde_json::Value = state + .config_service + .get_config(key) + .await .map_err(|e| anyhow!("{}", e))?; Ok(config) } "set_config" => { let request = extract_request(¶ms)?; let key = get_string(&request, "key")?; - let value = request.get("value").cloned().ok_or_else(|| anyhow!("Missing value"))?; - state.config_service.set_config(&key, value).await + let value = request + .get("value") + .cloned() + .ok_or_else(|| anyhow!("Missing value"))?; + state + .config_service + .set_config(&key, value) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::json!("ok")) } "get_model_configs" => { - let models = state.config_service.get_ai_models().await + let models = state + .config_service + .get_ai_models() + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::to_value(&models).unwrap_or_default()) } @@ -135,7 +161,8 @@ pub async fn dispatch( let session_name = get_string(&request, "sessionName")?; let agent_type = get_string(&request, "agentType")?; let workspace_path = get_string(&request, "workspacePath")?; - let session_id = request.get("sessionId") + let session_id = request + .get("sessionId") .and_then(|v| v.as_str()) .map(|s| s.to_string()); @@ -144,7 +171,8 @@ pub async fn dispatch( ..Default::default() }; - let session = state.coordinator + let session = state + .coordinator .create_session_with_workspace( session_id, session_name, @@ -164,7 +192,8 @@ pub async fn dispatch( "list_sessions" => { let request = extract_request(¶ms)?; let workspace_path = get_string(&request, "workspacePath")?; - let sessions = state.coordinator + let sessions = state + .coordinator .list_sessions(&PathBuf::from(workspace_path)) .await .map_err(|e| anyhow!("{}", e))?; @@ -174,7 +203,8 @@ pub async fn dispatch( let request = extract_request(¶ms)?; let session_id = get_string(&request, "sessionId")?; let workspace_path = get_string(&request, "workspacePath")?; - state.coordinator + state + .coordinator .delete_session(&PathBuf::from(workspace_path), &session_id) .await .map_err(|e| anyhow!("{}", e))?; @@ -184,18 +214,22 @@ pub async fn dispatch( let request = extract_request(¶ms)?; let session_id = get_string(&request, "sessionId")?; let user_input = get_string(&request, "userInput")?; - let original_user_input = request.get("originalUserInput") + let original_user_input = request + .get("originalUserInput") .and_then(|v| v.as_str()) .map(|s| s.to_string()); let agent_type = get_string(&request, "agentType")?; - let workspace_path = request.get("workspacePath") + let workspace_path = request + .get("workspacePath") .and_then(|v| v.as_str()) .map(|s| s.to_string()); - let turn_id = request.get("turnId") + let turn_id = request + .get("turnId") .and_then(|v| v.as_str()) .map(|s| s.to_string()); - state.scheduler + state + .scheduler .submit( session_id, user_input, @@ -215,26 +249,47 @@ pub async fn dispatch( let request = extract_request(¶ms)?; let session_id = get_string(&request, "sessionId")?; let dialog_turn_id = get_string(&request, "dialogTurnId")?; - state.coordinator + state + .coordinator .cancel_dialog_turn(&session_id, &dialog_turn_id) .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::json!({ "success": true })) } "get_session_messages" => { - let request = extract_request(¶ms)?; - let session_id = get_string(&request, "sessionId")?; - let messages = state.coordinator - .get_messages(&session_id) + let request = params.get("request").unwrap_or(¶ms); + let session_id = request + .get("sessionId") + .or_else(|| request.get("session_id")) + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow!("Missing or invalid 'sessionId'/'session_id' field"))? + .to_string(); + let limit = request + .get("limit") + .and_then(|v| v.as_u64()) + .map(|v| v as usize) + .unwrap_or(50); + let before_message_id = request + .get("beforeMessageId") + .or_else(|| request.get("before_message_id")) + .and_then(|v| v.as_str()); + + let (messages, has_more) = state + .coordinator + .get_messages_paginated(&session_id, limit, before_message_id) .await .map_err(|e| anyhow!("{}", e))?; - Ok(serde_json::to_value(&messages).unwrap_or_default()) + Ok(serde_json::json!({ + "messages": messages, + "has_more": has_more, + })) } "confirm_tool_execution" => { let request = extract_request(¶ms)?; let tool_id = get_string(&request, "toolId")?; let updated_input = request.get("updatedInput").cloned(); - state.coordinator + state + .coordinator .confirm_tool(&tool_id, updated_input) .await .map_err(|e| anyhow!("{}", e))?; @@ -243,11 +298,13 @@ pub async fn dispatch( "reject_tool_execution" => { let request = extract_request(¶ms)?; let tool_id = get_string(&request, "toolId")?; - let reason = request.get("reason") + let reason = request + .get("reason") .and_then(|v| v.as_str()) .unwrap_or("User rejected") .to_string(); - state.coordinator + state + .coordinator .reject_tool(&tool_id, reason) .await .map_err(|e| anyhow!("{}", e))?; @@ -256,32 +313,38 @@ pub async fn dispatch( // ── I18n ───────────────────────────────────────────── "i18n_get_current_language" => { - let lang: String = state.config_service - .get_config(Some("app.language")).await + let lang: String = state + .config_service + .get_config(Some("app.language")) + .await .unwrap_or_else(|_| "zh-CN".to_string()); Ok(serde_json::json!(lang)) } "i18n_set_language" => { let request = extract_request(¶ms)?; let language = get_string(&request, "language")?; - state.config_service.set_config("app.language", language.clone()).await + state + .config_service + .set_config("app.language", language.clone()) + .await .map_err(|e| anyhow!("{}", e))?; Ok(serde_json::json!(language)) } - "i18n_get_supported_languages" => { - Ok(serde_json::json!([ - {"id": "zh-CN", "name": "Chinese (Simplified)", "englishName": "Chinese (Simplified)", "nativeName": "简体中文", "rtl": false}, - {"id": "en-US", "name": "English", "englishName": "English", "nativeName": "English", "rtl": false} - ])) - } + "i18n_get_supported_languages" => Ok(serde_json::json!([ + {"id": "zh-CN", "name": "Chinese (Simplified)", "englishName": "Chinese (Simplified)", "nativeName": "简体中文", "rtl": false}, + {"id": "en-US", "name": "English", "englishName": "English", "nativeName": "English", "rtl": false} + ])), // ── Tools ──────────────────────────────────────────── "get_all_tools_info" => { - let tools: Vec = state.tool_registry_snapshot + let tools: Vec = state + .tool_registry_snapshot .iter() - .map(|t| serde_json::json!({ - "name": t.name().to_string(), - })) + .map(|t| { + serde_json::json!({ + "name": t.name().to_string(), + }) + }) .collect(); Ok(serde_json::json!(tools)) } diff --git a/src/crates/core/src/agentic/coordination/coordinator.rs b/src/crates/core/src/agentic/coordination/coordinator.rs index e3d8422b..d0107781 100644 --- a/src/crates/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/core/src/agentic/coordination/coordinator.rs @@ -12,8 +12,8 @@ use crate::agentic::events::{ AgenticEvent, EventPriority, EventQueue, EventRouter, EventSubscriber, }; use crate::agentic::execution::{ExecutionContext, ExecutionEngine}; -use crate::agentic::round_preempt::DialogRoundPreemptSource; use crate::agentic::image_analysis::ImageContextData; +use crate::agentic::round_preempt::DialogRoundPreemptSource; use crate::agentic::session::SessionManager; use crate::agentic::tools::pipeline::{SubagentParentInfo, ToolPipeline}; use crate::agentic::WorkspaceBinding; @@ -151,27 +151,26 @@ impl ConversationCoordinator { ) .await; - let local_session_path = - if let Some(ref e) = entry { - if !e.ssh_host.trim().is_empty() { - crate::service::remote_ssh::workspace_state::remote_workspace_session_mirror_dir( - &e.ssh_host, - &e.remote_root, - ) - } else { - crate::service::remote_ssh::workspace_state::unresolved_remote_session_storage_dir( - rid, &path_norm, - ) - } - } else if let Some(h) = host_from_config { + let local_session_path = if let Some(ref e) = entry { + if !e.ssh_host.trim().is_empty() { crate::service::remote_ssh::workspace_state::remote_workspace_session_mirror_dir( - h, &path_norm, + &e.ssh_host, + &e.remote_root, ) } else { crate::service::remote_ssh::workspace_state::unresolved_remote_session_storage_dir( rid, &path_norm, ) - }; + } + } else if let Some(h) = host_from_config { + crate::service::remote_ssh::workspace_state::remote_workspace_session_mirror_dir( + h, &path_norm, + ) + } else { + crate::service::remote_ssh::workspace_state::unresolved_remote_session_storage_dir( + rid, &path_norm, + ) + }; let connection_name = entry .map(|e| e.connection_name) @@ -195,24 +194,31 @@ impl ConversationCoordinator { let binding = binding.as_ref()?; if binding.is_remote() { - let manager = match crate::service::remote_ssh::workspace_state::get_remote_workspace_manager() { - Some(m) => m, - None => { - log::warn!("build_workspace_services: RemoteWorkspaceStateManager not initialized"); - return None; - } - }; + let manager = + match crate::service::remote_ssh::workspace_state::get_remote_workspace_manager() { + Some(m) => m, + None => { + log::warn!( + "build_workspace_services: RemoteWorkspaceStateManager not initialized" + ); + return None; + } + }; let ssh_manager = match manager.get_ssh_manager().await { Some(m) => m, None => { - log::warn!("build_workspace_services: SSH manager not available in state manager"); + log::warn!( + "build_workspace_services: SSH manager not available in state manager" + ); return None; } }; let file_service = match manager.get_file_service().await { Some(f) => f, None => { - log::warn!("build_workspace_services: File service not available in state manager"); + log::warn!( + "build_workspace_services: File service not available in state manager" + ); return None; } }; @@ -223,7 +229,10 @@ impl ConversationCoordinator { return None; } }; - log::info!("build_workspace_services: Built remote services for connection_id={}", connection_id); + log::info!( + "build_workspace_services: Built remote services for connection_id={}", + connection_id + ); Some(crate::agentic::workspace::remote_workspace_services( connection_id, file_service, @@ -571,7 +580,8 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet let binding = Self::build_workspace_binding(&SessionConfig { workspace_path: Some(workspace_path.to_string()), ..Default::default() - }).await; + }) + .await; binding .as_ref() .map(|b| b.session_storage_path().to_path_buf()) @@ -1514,7 +1524,8 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet return Ok(None); }; - self.cancel_dialog_turn(session_id, ¤t_turn_id).await?; + self.cancel_dialog_turn(session_id, ¤t_turn_id) + .await?; let deadline = Instant::now() + wait_timeout; while self.execution_engine.has_active_turn(¤t_turn_id) { @@ -1565,12 +1576,12 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet self.session_manager.list_sessions(workspace_path).await } - /// Get session messages + /// Get a best-effort message view for a session. pub async fn get_messages(&self, session_id: &str) -> BitFunResult> { self.session_manager.get_messages(session_id).await } - /// Get session messages paginated + /// Get a paginated best-effort message view for a session. pub async fn get_messages_paginated( &self, session_id: &str, @@ -1818,7 +1829,7 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet } } - // Delete subagent session itself (including message history, persistence data, etc.) + // Delete the subagent session itself, including runtime context and persisted turn data. let workspace_path = self .session_manager .get_session(session_id) diff --git a/src/crates/core/src/agentic/core/message.rs b/src/crates/core/src/agentic/core/message.rs index d6f84ee7..6612237c 100644 --- a/src/crates/core/src/agentic/core/message.rs +++ b/src/crates/core/src/agentic/core/message.rs @@ -62,6 +62,8 @@ pub struct MessageMetadata { pub thinking_signature: Option, #[serde(skip_serializing_if = "Option::is_none")] pub semantic_kind: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub compression_payload: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -76,6 +78,85 @@ pub enum MessageSemanticKind { ComputerUsePostActionSnapshot, } +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct CompressionPayload { + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub entries: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum CompressionEntry { + ModelSummary { + text: String, + }, + Turn { + #[serde(skip_serializing_if = "Option::is_none")] + turn_id: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + messages: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + todo: Option, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedMessage { + pub role: CompressedMessageRole, + #[serde(skip_serializing_if = "Option::is_none")] + pub text: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub tool_calls: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CompressedMessageRole { + User, + Assistant, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedToolCall { + pub tool_name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub arguments: Option, + #[serde(default, skip_serializing_if = "is_false")] + pub is_error: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedTodoSnapshot { + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub todos: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub summary: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressedTodoItem { + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + pub content: String, + pub status: String, +} + +impl CompressionPayload { + pub fn from_summary(text: String) -> Self { + Self { + entries: vec![CompressionEntry::ModelSummary { text }], + } + } + + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } +} + +fn is_false(value: &bool) -> bool { + !*value +} + impl From for AIMessage { fn from(msg: Message) -> Self { let role = match msg.role { @@ -378,6 +459,12 @@ impl Message { self } + pub fn with_compression_payload(mut self, compression_payload: CompressionPayload) -> Self { + self.metadata.compression_payload = Some(compression_payload); + self.metadata.tokens = None; + self + } + /// Set message's thinking_signature (for Anthropic extended thinking multi-turn conversations) pub fn with_thinking_signature(mut self, signature: Option) -> Self { self.metadata.thinking_signature = signature; diff --git a/src/crates/core/src/agentic/core/mod.rs b/src/crates/core/src/agentic/core/mod.rs index d85ba603..5bd0c55f 100644 --- a/src/crates/core/src/agentic/core/mod.rs +++ b/src/crates/core/src/agentic/core/mod.rs @@ -11,7 +11,9 @@ pub mod session; pub mod state; pub use dialog_turn::{DialogTurn, DialogTurnState, TurnStats}; pub use message::{ - Message, MessageContent, MessageRole, MessageSemanticKind, ToolCall, ToolResult, + CompressedMessage, CompressedMessageRole, CompressedTodoItem, CompressedTodoSnapshot, + CompressedToolCall, CompressionEntry, CompressionPayload, Message, MessageContent, MessageRole, + MessageSemanticKind, ToolCall, ToolResult, }; pub use messages_helper::MessageHelper; pub use model_round::ModelRound; diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 46e20461..7c892d95 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -11,7 +11,7 @@ use crate::agentic::image_analysis::{ build_multimodal_message_with_images, process_image_contexts_for_provider, ImageContextData, ImageLimits, }; -use crate::agentic::session::SessionManager; +use crate::agentic::session::{ContextCompressor, SessionManager}; use crate::agentic::tools::{get_all_registered_tools, SubagentParentInfo}; use crate::agentic::WorkspaceBinding; use crate::infrastructure::ai::get_global_ai_client_factory; @@ -44,6 +44,7 @@ pub struct ExecutionEngine { round_executor: Arc, event_queue: Arc, session_manager: Arc, + context_compressor: Arc, config: ExecutionEngineConfig, } @@ -52,12 +53,14 @@ impl ExecutionEngine { round_executor: Arc, event_queue: Arc, session_manager: Arc, + context_compressor: Arc, config: ExecutionEngineConfig, ) -> Self { Self { round_executor, event_queue, session_manager, + context_compressor, config, } } @@ -276,8 +279,7 @@ impl ExecutionEngine { if Self::skip_message_for_model_send(msg) { continue; } - let keep_this_message_images = - attach_images && keep_image_messages.contains(&msg_idx); + let keep_this_message_images = attach_images && keep_image_messages.contains(&msg_idx); match &msg.content { MessageContent::Multimodal { text, images } => { if !attach_images { @@ -317,7 +319,7 @@ impl ExecutionEngine { provider, workspace_path, ) - .await + .await { Ok(processed) => { let next_count = attached_image_count + processed.len(); @@ -397,10 +399,7 @@ impl ExecutionEngine { Ok(result) } - fn render_multimodal_as_text( - text: &str, - images: &[ImageContextData], - ) -> String { + fn render_multimodal_as_text(text: &str, images: &[ImageContextData]) -> String { let mut content = text.to_string(); if images.is_empty() { @@ -449,14 +448,13 @@ impl ExecutionEngine { .get_session(session_id) .ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", session_id)))?; - let compression_manager = self.session_manager.get_compression_manager(); - // Record start time let start_time = std::time::Instant::now(); let old_messages_len = messages.len(); // Preprocess turns - let (turn_index_to_keep, turns) = compression_manager + let (turn_index_to_keep, turns) = self + .context_compressor .preprocess_turns(session_id, context_window, messages) .await?; if turn_index_to_keep == 0 { @@ -483,13 +481,17 @@ impl ExecutionEngine { .await; // Execute compression - match compression_manager + match self + .context_compressor .compress_turns(session_id, context_window, turn_index_to_keep, turns) .await { - Ok(compressed_messages) => { + Ok(compression_result) => { + self.session_manager + .replace_context_messages(session_id, compression_result.messages.clone()) + .await; let mut new_messages = vec![system_prompt_message]; - new_messages.extend(compressed_messages); + new_messages.extend(compression_result.messages); // Update session compression state session.compression_state.increment_compression_count(); @@ -526,7 +528,7 @@ impl ExecutionEngine { tokens_after: compressed_tokens, compression_ratio: (compressed_tokens as f64) / (current_tokens as f64), duration_ms, - has_summary: true, + has_summary: compression_result.has_model_summary, subagent_parent_info: event_subagent_parent_info.clone(), }, EventPriority::Normal, @@ -716,7 +718,7 @@ impl ExecutionEngine { // Save the last token usage statistics let mut last_usage: Option = None; - // Add detailed logging showing received message history + // Add detailed logging showing the execution context messages. debug!( "Executing dialog turn: dialog_turn_id={}, mode={}, agent={}, initial_messages={}, messages_len={}", dialog_turn_id, @@ -726,7 +728,7 @@ impl ExecutionEngine { messages.len() ); trace!( - "Message history details: dialog_turn_id={}, session_id={}, roles={:?}", + "Context message details: dialog_turn_id={}, session_id={}, roles={:?}", dialog_turn_id, context.session_id, messages @@ -761,7 +763,7 @@ impl ExecutionEngine { context.workspace.as_ref(), &agent_type, ) - .await + .await } else { (vec![], None) }; @@ -843,8 +845,7 @@ impl ExecutionEngine { let original_images = images.clone(); // Replace multimodal messages with text-only versions to avoid provider errors. - let next_text = - Self::render_multimodal_as_text(&original_text, &original_images); + let next_text = Self::render_multimodal_as_text(&original_text, &original_images); msg.content = MessageContent::Text(next_text); msg.metadata.tokens = None; @@ -1002,31 +1003,31 @@ impl ExecutionEngine { // Add assistant message to history messages.push(round_result.assistant_message.clone()); - // Immediately save assistant message (prevent loss on cancellation) + // Update the in-memory message caches immediately so subsequent rounds see it. if let Err(e) = self .session_manager .add_message(&context.session_id, round_result.assistant_message.clone()) .await { - warn!("Failed to save assistant message in real-time: {}", e); + warn!("Failed to update assistant message in memory: {}", e); } // Add tool result messages to history for tool_result_msg in round_result.tool_result_messages.iter() { messages.push(tool_result_msg.clone()); - // Immediately save tool result message + // Update the in-memory message caches immediately so subsequent rounds see it. if let Err(e) = self .session_manager .add_message(&context.session_id, tool_result_msg.clone()) .await { - warn!("Failed to save tool result message in real-time: {}", e); + warn!("Failed to update tool result message in memory: {}", e); } } debug!( - "Saved round messages in real-time: round_index={}, assistant + {} tool results", + "Updated round messages in memory: round_index={}, assistant + {} tool results", round_index, round_result.tool_result_messages.len() ); @@ -1042,8 +1043,8 @@ impl ExecutionEngine { break; } - // Queued user message while this turn was running: stop after a full model round - // (AI response + tool execution for this round are already persisted). + // Queued user message while this turn was running: stop after a full model round. + // The round output has already been reflected in the in-memory message caches. // No special deferral for tool-confirmation phases: we do not require the user to // finish confirming before this boundary check runs; the check applies as soon as // this `execute_round` completes (same as any other round). @@ -1270,7 +1271,8 @@ impl ExecutionEngine { .collect(); tool_definitions.sort_by_key(|tool| tool_ordering.get(&tool.name).unwrap_or(&100)); - let enabled_tool_names: Vec = tool_definitions.iter().map(|d| d.name.clone()).collect(); + let enabled_tool_names: Vec = + tool_definitions.iter().map(|d| d.name.clone()).collect(); (enabled_tool_names, Some(tool_definitions)) } diff --git a/src/crates/core/src/agentic/persistence/manager.rs b/src/crates/core/src/agentic/persistence/manager.rs index 4e03174c..e415deb1 100644 --- a/src/crates/core/src/agentic/persistence/manager.rs +++ b/src/crates/core/src/agentic/persistence/manager.rs @@ -1,7 +1,6 @@ //! Persistence Manager //! -//! Responsible for project-scoped session persistence and legacy -//! message/compression persistence used by in-memory managers. +//! Responsible for project-scoped session persistence. use crate::agentic::core::{ strip_prompt_markup, CompressionState, Message, MessageContent, Session, SessionConfig, @@ -13,7 +12,7 @@ use crate::service::session::{ SessionTranscriptExportOptions, SessionTranscriptIndexEntry, ToolItemData, TranscriptLineRange, }; use crate::util::errors::{BitFunError, BitFunResult}; -use log::{debug, info, warn}; +use log::{info, warn}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::collections::HashMap; @@ -22,7 +21,6 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, OnceLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::fs; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::Mutex; const SESSION_SCHEMA_VERSION: u32 = 2; @@ -2043,227 +2041,6 @@ impl PersistenceManager { Ok(()) } - // ============ Legacy message persistence ============ - - fn legacy_sessions_dir(&self) -> PathBuf { - self.path_manager.user_data_dir().join("legacy-sessions") - } - - fn legacy_session_dir(&self, session_id: &str) -> PathBuf { - self.legacy_sessions_dir().join(session_id) - } - - async fn ensure_legacy_session_dir(&self, session_id: &str) -> BitFunResult { - let dir = self.legacy_session_dir(session_id); - fs::create_dir_all(&dir).await.map_err(|e| { - BitFunError::io(format!("Failed to create legacy session directory: {}", e)) - })?; - Ok(dir) - } - - /// Append message (JSONL format) - pub async fn append_message(&self, session_id: &str, message: &Message) -> BitFunResult<()> { - let dir = self.ensure_legacy_session_dir(session_id).await?; - let messages_path = dir.join("messages.jsonl"); - - let sanitized_message = Self::sanitize_message_for_persistence(message); - let json = serde_json::to_string(&sanitized_message).map_err(|e| { - BitFunError::serialization(format!("Failed to serialize message: {}", e)) - })?; - - let mut file = fs::OpenOptions::new() - .create(true) - .append(true) - .open(&messages_path) - .await - .map_err(|e| BitFunError::io(format!("Failed to open message file: {}", e)))?; - - file.write_all(json.as_bytes()) - .await - .map_err(|e| BitFunError::io(format!("Failed to write message: {}", e)))?; - file.write_all(b"\n") - .await - .map_err(|e| BitFunError::io(format!("Failed to write newline: {}", e)))?; - - Ok(()) - } - - /// Load all messages - pub async fn load_messages(&self, session_id: &str) -> BitFunResult> { - let messages_path = self.legacy_session_dir(session_id).join("messages.jsonl"); - if !messages_path.exists() { - return Ok(vec![]); - } - - let file = fs::File::open(&messages_path) - .await - .map_err(|e| BitFunError::io(format!("Failed to open message file: {}", e)))?; - - let reader = BufReader::new(file); - let mut lines = reader.lines(); - let mut messages = Vec::new(); - - while let Some(line) = lines - .next_line() - .await - .map_err(|e| BitFunError::io(format!("Failed to read message line: {}", e)))? - { - if line.trim().is_empty() { - continue; - } - - match serde_json::from_str::(&line) { - Ok(message) => messages.push(message), - Err(e) => warn!("Failed to deserialize message: {}", e), - } - } - - Ok(messages) - } - - /// Clear messages - pub async fn clear_messages(&self, session_id: &str) -> BitFunResult<()> { - let messages_path = self.legacy_session_dir(session_id).join("messages.jsonl"); - if messages_path.exists() { - fs::remove_file(&messages_path) - .await - .map_err(|e| BitFunError::io(format!("Failed to delete message file: {}", e)))?; - } - Ok(()) - } - - /// Delete messages - pub async fn delete_messages(&self, session_id: &str) -> BitFunResult<()> { - self.clear_messages(session_id).await - } - - // ============ Legacy compressed history persistence ============ - - pub async fn append_compressed_message( - &self, - session_id: &str, - message: &Message, - ) -> BitFunResult<()> { - let dir = self.ensure_legacy_session_dir(session_id).await?; - let compressed_path = dir.join("compressed_messages.jsonl"); - - let sanitized_message = Self::sanitize_message_for_persistence(message); - let json = serde_json::to_string(&sanitized_message).map_err(|e| { - BitFunError::serialization(format!("Failed to serialize compressed message: {}", e)) - })?; - - let mut file = fs::OpenOptions::new() - .create(true) - .append(true) - .open(&compressed_path) - .await - .map_err(|e| { - BitFunError::io(format!("Failed to open compressed message file: {}", e)) - })?; - - file.write_all(json.as_bytes()) - .await - .map_err(|e| BitFunError::io(format!("Failed to write compressed message: {}", e)))?; - file.write_all(b"\n") - .await - .map_err(|e| BitFunError::io(format!("Failed to write newline: {}", e)))?; - - Ok(()) - } - - pub async fn save_compressed_messages( - &self, - session_id: &str, - messages: &[Message], - ) -> BitFunResult<()> { - let dir = self.ensure_legacy_session_dir(session_id).await?; - let compressed_path = dir.join("compressed_messages.jsonl"); - - let mut file = fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(&compressed_path) - .await - .map_err(|e| { - BitFunError::io(format!("Failed to open compressed message file: {}", e)) - })?; - - let sanitized_messages = Self::sanitize_messages_for_persistence(messages); - for message in &sanitized_messages { - let json = serde_json::to_string(message).map_err(|e| { - BitFunError::serialization(format!("Failed to serialize compressed message: {}", e)) - })?; - - file.write_all(json.as_bytes()).await.map_err(|e| { - BitFunError::io(format!("Failed to write compressed message: {}", e)) - })?; - file.write_all(b"\n") - .await - .map_err(|e| BitFunError::io(format!("Failed to write newline: {}", e)))?; - } - - debug!( - "Legacy compressed history persisted: session_id={}, message_count={}", - session_id, - messages.len() - ); - Ok(()) - } - - pub async fn load_compressed_messages( - &self, - session_id: &str, - ) -> BitFunResult>> { - let compressed_path = self - .legacy_session_dir(session_id) - .join("compressed_messages.jsonl"); - - if !compressed_path.exists() { - return Ok(None); - } - - let file = fs::File::open(&compressed_path).await.map_err(|e| { - BitFunError::io(format!("Failed to open compressed message file: {}", e)) - })?; - - let reader = BufReader::new(file); - let mut lines = reader.lines(); - let mut messages = Vec::new(); - - while let Some(line) = lines.next_line().await.map_err(|e| { - BitFunError::io(format!("Failed to read compressed message line: {}", e)) - })? { - if line.trim().is_empty() { - continue; - } - - match serde_json::from_str::(&line) { - Ok(message) => messages.push(message), - Err(e) => warn!("Failed to deserialize compressed message: {}", e), - } - } - - if messages.is_empty() { - return Ok(None); - } - - Ok(Some(messages)) - } - - pub async fn delete_compressed_messages(&self, session_id: &str) -> BitFunResult<()> { - let compressed_path = self - .legacy_session_dir(session_id) - .join("compressed_messages.jsonl"); - - if compressed_path.exists() { - fs::remove_file(&compressed_path).await.map_err(|e| { - BitFunError::io(format!("Failed to delete compressed message file: {}", e)) - })?; - } - - Ok(()) - } } #[cfg(test)] diff --git a/src/crates/core/src/agentic/session/compression_manager.rs b/src/crates/core/src/agentic/session/compression/compressor.rs similarity index 67% rename from src/crates/core/src/agentic/session/compression_manager.rs rename to src/crates/core/src/agentic/session/compression/compressor.rs index ac94e961..c74e8b2d 100644 --- a/src/crates/core/src/agentic/session/compression_manager.rs +++ b/src/crates/core/src/agentic/session/compression/compressor.rs @@ -1,35 +1,45 @@ -//! Context Compression Manager +//! Context compressor //! -//! Responsible for managing session context compression +//! Responsible only for transforming a session context into a compressed one. +use super::fallback::{ + build_structured_compression_reminder, CompressionFallbackOptions, CompressionReminder, +}; use crate::agentic::core::{ - render_system_reminder, Message, MessageHelper, MessageRole, MessageSemanticKind, + render_system_reminder, CompressionPayload, Message, MessageHelper, MessageRole, + MessageSemanticKind, }; -use crate::agentic::persistence::PersistenceManager; use crate::infrastructure::ai::{get_global_ai_client_factory, AIClient}; use crate::util::errors::{BitFunError, BitFunResult}; use crate::util::types::Message as AIMessage; use anyhow; -use dashmap::DashMap; use log::{debug, trace, warn}; use std::sync::Arc; -/// Compression manager configuration +/// Context compressor configuration #[derive(Debug, Clone)] pub struct CompressionConfig { - pub enable_persistence: bool, pub keep_turns_ratio: f32, pub keep_last_turn_ratio: f32, pub single_request_max_tokens_ratio: f32, + pub fallback_max_tokens_ratio: f32, + pub fallback_user_chars: usize, + pub fallback_assistant_chars: usize, + pub fallback_tool_arg_chars: usize, + pub fallback_tool_command_chars: usize, } impl Default for CompressionConfig { fn default() -> Self { Self { - enable_persistence: true, keep_turns_ratio: 0.3, keep_last_turn_ratio: 0.4, single_request_max_tokens_ratio: 0.7, + fallback_max_tokens_ratio: 0.25, + fallback_user_chars: 1000, + fallback_assistant_chars: 1000, + fallback_tool_arg_chars: 100, + fallback_tool_command_chars: 100, } } } @@ -46,71 +56,20 @@ impl TurnWithTokens { } } -/// Context compression manager -pub struct CompressionManager { - /// Compressed message history (by session ID) - compressed_histories: Arc>>, - /// Persistence manager - persistence: Arc, - /// Configuration - config: CompressionConfig, +#[derive(Debug, Clone)] +pub struct CompressionResult { + pub messages: Vec, + pub has_model_summary: bool, } -impl CompressionManager { - pub fn new(persistence: Arc, config: CompressionConfig) -> Self { - Self { - compressed_histories: Arc::new(DashMap::new()), - persistence, - config, - } - } - - /// Create session compression history - pub fn create_session(&self, session_id: &str) { - self.compressed_histories - .insert(session_id.to_string(), vec![]); - debug!( - "Created session compression history: session_id={}", - session_id - ); - } - - /// Add message (async, supports persistence) - pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - // 1. Add to memory - if let Some(mut compressed) = self.compressed_histories.get_mut(session_id) { - compressed.push(message.clone()); - } else { - self.compressed_histories - .insert(session_id.to_string(), vec![message.clone()]); - } - - // 2. Persist (append single message, similar to MessageHistoryManager) - if self.config.enable_persistence { - self.persistence - .append_compressed_message(session_id, &message) - .await?; - } - - Ok(()) - } - - /// Batch restore messages (doesn't trigger persistence, used for session restore) - pub fn restore_session(&self, session_id: &str, messages: Vec) { - self.compressed_histories - .insert(session_id.to_string(), messages); - debug!( - "Restored session compression history: session_id={}", - session_id - ); - } +/// Stateless context compression service. +pub struct ContextCompressor { + config: CompressionConfig, +} - /// Get copy of messages for sending to model (may be compressed) - pub fn get_context_messages(&self, session_id: &str) -> Vec { - self.compressed_histories - .get(session_id) - .map(|h| h.clone()) - .unwrap_or_default() +impl ContextCompressor { + pub fn new(config: CompressionConfig) -> Self { + Self { config } } fn get_turn_index_to_keep(&self, turns_tokens: &[usize], token_limit: usize) -> usize { @@ -127,8 +86,8 @@ impl CompressionManager { result } - /// Returns (turn_index_to_keep, turns) - /// If turn_index_to_keep is 0, no compression is needed + /// Returns `(turn_index_to_keep, turns)`. + /// If `turn_index_to_keep` is 0, no compression is needed. pub async fn preprocess_turns( &self, session_id: &str, @@ -136,11 +95,10 @@ impl CompressionManager { mut messages: Vec, ) -> BitFunResult<(usize, Vec)> { debug!( - "Starting session context compression: session_id={}", + "Starting session context compression analysis: session_id={}", session_id ); - // Remove system messages let message_start = { let mut start_idx = messages.len(); for (idx, msg) in messages.iter().enumerate() { @@ -155,7 +113,7 @@ impl CompressionManager { if all_messages.is_empty() { debug!( - "Session history is empty, no compression needed: session_id={}", + "Session context is empty, no compression needed: session_id={}", session_id ); return Ok((0, Vec::new())); @@ -167,7 +125,6 @@ impl CompressionManager { .iter_mut() .map(|turn| turn.iter_mut().map(|m| m.get_tokens()).sum::()) .collect(); - // Print message count and token count for each turn { let turns_msg_num: Vec = turns_messages.iter().map(|t| t.len()).collect(); debug!( @@ -181,7 +138,6 @@ impl CompressionManager { let mut turn_index_to_keep = self.get_turn_index_to_keep(&turns_tokens, token_limit_keep_turns); if turn_index_to_keep == turns_count { - // If the last turn exceeds 30% but not 40%, keep the last turn let token_limit_last_turn = (context_window as f32 * self.config.keep_last_turn_ratio) as usize; if let Some(last_turn_tokens) = turns_tokens.last() { @@ -190,7 +146,10 @@ impl CompressionManager { } } } - debug!("Turn index to keep: {}", turn_index_to_keep); + debug!( + "Turn index to keep after compression analysis: session_id={}, keep_from_turn={}", + session_id, turn_index_to_keep + ); let turns: Vec = turns_messages .into_iter() @@ -206,15 +165,24 @@ impl CompressionManager { context_window: usize, turn_index_to_keep: usize, mut turns: Vec, - ) -> BitFunResult> { + ) -> BitFunResult { if turns.is_empty() { - debug!("No turns need compression"); - return Ok(Vec::new()); + debug!("No turns need compression: session_id={}", session_id); + return Ok(CompressionResult { + messages: Vec::new(), + has_model_summary: false, + }); } let Some(last_turn_messages) = turns.last().map(|turn| &turn.messages) else { - debug!("No turns available after split, skipping last-turn extraction"); - return Ok(Vec::new()); + debug!( + "No turns available after split, skipping compression: session_id={}", + session_id + ); + return Ok(CompressionResult { + messages: Vec::new(), + has_model_summary: false, + }); }; let last_user_message = { last_turn_messages @@ -228,34 +196,20 @@ impl CompressionManager { } }) }; - let last_todo = MessageHelper::get_last_todo(&last_turn_messages); + let last_todo = MessageHelper::get_last_todo(last_turn_messages); trace!("Last user message: {:?}", last_user_message); trace!("Last todo: {:?}", last_todo); let turns_to_keep = turns.split_off(turn_index_to_keep); let mut compressed_messages = Vec::new(); + let mut has_model_summary = false; if !turns.is_empty() { - // Dynamically get Agent client for generating summary - let ai_client_factory = get_global_ai_client_factory().await.map_err(|e| { - BitFunError::AIClient(format!("Failed to get AI client factory: {}", e)) - })?; - let ai_client = ai_client_factory - .get_client_by_func_agent("compression") - .await - .map_err(|e| BitFunError::AIClient(format!("Failed to get AI client: {}", e)))?; - - let summary = self - .execute_compression(ai_client, turns, context_window) + let reminder = self + .execute_compression_with_fallback(turns, context_window) .await?; - trace!("Compression summary: {}", summary); - - compressed_messages.push( - Message::user(render_system_reminder(&format!( - "Previous conversation is summarized below:\n{}", - summary - ))) - .with_semantic_kind(MessageSemanticKind::InternalReminder), - ); + trace!("Compression reminder generated"); + has_model_summary = reminder.used_model_summary; + compressed_messages.push(self.create_reminder_message(reminder)); } if !turns_to_keep.is_empty() { @@ -263,11 +217,9 @@ impl CompressionManager { compressed_messages.extend(turn.messages); } } else { - // All turns compressed, append last user message if let Some(last_user_message) = last_user_message { compressed_messages.push(last_user_message); } - // Append last todo if let Some(last_todo) = last_todo { compressed_messages.push( Message::user(render_system_reminder(&format!( @@ -279,33 +231,84 @@ impl CompressionManager { } } - // Update compression history - self.compressed_histories - .insert(session_id.to_string(), compressed_messages.clone()); - - // Persist compression history (similar to MessageHistoryManager pattern). - // Persistence is intentionally off until the storage contract is finalized. - #[allow(clippy::overly_complex_bool_expr)] - if false && self.config.enable_persistence { - if let Err(e) = self - .persistence - .save_compressed_messages(session_id, &compressed_messages) + debug!( + "Compression completed: session_id={}, compressed_messages={}", + session_id, + compressed_messages.len() + ); + + Ok(CompressionResult { + messages: compressed_messages, + has_model_summary, + }) + } + + fn create_reminder_message(&self, reminder: CompressionReminder) -> Message { + Message::user(render_system_reminder(&reminder.model_text)) + .with_semantic_kind(MessageSemanticKind::InternalReminder) + .with_compression_payload(reminder.payload) + } + + async fn execute_compression_with_fallback( + &self, + turns_to_compress: Vec, + context_window: usize, + ) -> BitFunResult { + let summary_result = match get_global_ai_client_factory().await { + Ok(ai_client_factory) => match ai_client_factory + .get_client_by_func_agent("compression") .await { + Ok(ai_client) => { + self.execute_compression(ai_client, turns_to_compress.clone(), context_window) + .await + } + Err(err) => Err(BitFunError::AIClient(format!( + "Failed to get AI client: {}", + err + ))), + }, + Err(err) => Err(BitFunError::AIClient(format!( + "Failed to get AI client factory: {}", + err + ))), + }; + + match summary_result { + Ok(summary) => { + trace!("Compression summary: {}", summary); + Ok(CompressionReminder { + model_text: format!("Previous conversation is summarized below:\n{}", summary), + payload: CompressionPayload::from_summary(summary), + used_model_summary: true, + }) + } + Err(err) => { warn!( - "Failed to persist compressed history: session_id={}, error={}", - session_id, e + "Model-based compression failed, falling back to structured local compression: {}", + err ); - } else { - debug!( - "Compressed history persisted: session_id={}, message_count={}", - session_id, - compressed_messages.len() + let reminder = build_structured_compression_reminder( + turns_to_compress + .into_iter() + .map(|turn| turn.messages) + .collect(), + &self.build_fallback_options(context_window), ); + Ok(reminder) } } + } - Ok(compressed_messages) + fn build_fallback_options(&self, context_window: usize) -> CompressionFallbackOptions { + CompressionFallbackOptions { + max_tokens: ((context_window as f32 * self.config.fallback_max_tokens_ratio) as usize) + .max(256), + user_chars: self.config.fallback_user_chars, + assistant_chars: self.config.fallback_assistant_chars, + tool_arg_chars: self.config.fallback_tool_arg_chars, + tool_command_chars: self.config.fallback_tool_command_chars, + } } async fn execute_compression( @@ -361,11 +364,9 @@ Be thorough and precise. Do not lose important technical details from either the let mut request_cnt = 0; for (idx, turn) in turns_to_compress.into_iter().enumerate() { if current_tokens + turn.tokens <= max_tokens_in_one_request { - // Add current turn's messages to accumulated messages cur_messages.extend(turn.messages); current_tokens += turn.tokens; } else { - // Compress accumulated messages if !cur_messages.is_empty() { summary = self .generate_summary( @@ -374,7 +375,7 @@ Be thorough and precise. Do not lose important technical details from either the cur_messages, ) .await?; - cur_messages = Vec::new(); // cur_messages has been consumed, need to reassign + cur_messages = Vec::new(); current_tokens = 0; request_cnt += 1; trace!( @@ -385,50 +386,44 @@ Be thorough and precise. Do not lose important technical details from either the } if turn.tokens <= max_tokens_in_one_request { - // Add current turn's messages to accumulated messages cur_messages.extend(turn.messages); current_tokens = turn.tokens; + } else if let Some((messages_part1, messages_part2)) = + MessageHelper::split_messages_in_middle(turn.messages) + { + summary = self + .generate_summary( + ai_client.clone(), + gen_system_message_for_summary(&summary), + messages_part1, + ) + .await?; + request_cnt += 1; + debug!( + "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", + request_cnt, idx, summary + ); + summary = self + .generate_summary( + ai_client.clone(), + gen_system_message_for_summary(&summary), + messages_part2, + ) + .await?; + request_cnt += 1; + debug!( + "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", + request_cnt, idx, summary + ); } else { - // Single turn too long - if let Some((messages_part1, messages_part2)) = - MessageHelper::split_messages_in_middle(turn.messages) - { - // Compress first half and second half separately - summary = self - .generate_summary( - ai_client.clone(), - gen_system_message_for_summary(&summary), - messages_part1, - ) - .await?; - request_cnt += 1; - debug!( - "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", - request_cnt, idx, summary - ); - summary = self - .generate_summary( - ai_client.clone(), - gen_system_message_for_summary(&summary), - messages_part2, - ) - .await?; - request_cnt += 1; - debug!( - "[execute_compression] request_cnt={}, turn_idx={}, summary: \n{}", - request_cnt, idx, summary - ); - } else { - return Err(BitFunError::Service(format!( - "Compression Failed, turn {} cannot be split in middle", - idx - ))); - } + return Err(BitFunError::Service(format!( + "Compression Failed, turn {} cannot be split in middle", + idx + ))); } } } - // Compress remaining messages if !cur_messages.is_empty() { summary = self .generate_summary( @@ -443,18 +438,16 @@ Be thorough and precise. Do not lose important technical details from either the Ok(summary) } - /// Generate summary for dialog turns, messages need to remove system prompt async fn generate_summary( &self, ai_client: Arc, system_message_for_summary: Message, messages: Vec, ) -> BitFunResult { - self.generate_summary_with_retry(ai_client, system_message_for_summary, messages, 3) + self.generate_summary_with_retry(ai_client, system_message_for_summary, messages, 2) .await } - /// Generate summary for dialog turns, supports retry async fn generate_summary_with_retry( &self, ai_client: Arc, @@ -462,9 +455,7 @@ Be thorough and precise. Do not lose important technical details from either the messages: Vec, max_tries: usize, ) -> BitFunResult { - // Call AI to generate summary let mut summary_messages = vec![AIMessage::from(system_message_for_summary)]; - // Remove thinking process when summarizing summary_messages.extend(messages.iter().map(|m| { let mut ai_msg = AIMessage::from(m); ai_msg.reasoning_content = None; @@ -498,9 +489,8 @@ Be thorough and precise. Do not lose important technical details from either the ); last_error = Some(e); - // If not the last attempt, wait before retrying if attempt < max_tries - 1 { - let delay_ms = base_wait_time_ms * (1 << attempt.min(3)); // Exponential backoff + let delay_ms = base_wait_time_ms * (1 << attempt.min(3)); debug!("Waiting {}ms before retry {}...", delay_ms, attempt + 2); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; } @@ -508,7 +498,6 @@ Be thorough and precise. Do not lose important technical details from either the } } - // All attempts failed let error_msg = format!( "Summary generation failed after {} attempts: {}", max_tries, @@ -518,15 +507,6 @@ Be thorough and precise. Do not lose important technical details from either the Err(BitFunError::AIClient(error_msg)) } - /// Delete session compression history - pub fn delete_session(&self, session_id: &str) { - self.compressed_histories.remove(session_id); - debug!( - "Deleted session compression history: session_id={}", - session_id - ); - } - fn get_compact_prompt(&self) -> String { r#"Your task is to create a detailed summary of the conversation so far, paying close attention to the user's explicit requests and your previous actions. This summary should be thorough in capturing technical details, code patterns, and architectural decisions that would be essential for continuing development work without losing context. @@ -612,6 +592,7 @@ Here's an example of how your output should be structured: Please provide your summary based on the conversation so far, following this structure and ensuring precision and thoroughness in your response. -"#.to_string() +"# + .to_string() } } diff --git a/src/crates/core/src/agentic/session/compression/fallback/builder.rs b/src/crates/core/src/agentic/session/compression/fallback/builder.rs new file mode 100644 index 00000000..0376f4ea --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/builder.rs @@ -0,0 +1,169 @@ +use super::sanitize::{ + sanitize_assistant_text, sanitize_todo_snapshot, sanitize_tool_arguments, sanitize_user_text, +}; +use super::types::CompressionFallbackOptions; +use crate::agentic::core::{ + strip_prompt_markup, CompressedMessage, CompressedMessageRole, CompressedTodoSnapshot, + CompressedToolCall, CompressionEntry, Message, MessageContent, MessageRole, + MessageSemanticKind, +}; + +pub(super) fn build_entries_from_turns( + turns: Vec>, + options: &CompressionFallbackOptions, +) -> Vec { + let mut entries = Vec::new(); + + for turn in turns { + build_entries_from_messages(turn, options, &mut entries); + } + + entries +} + +fn build_entries_from_messages( + messages: Vec, + options: &CompressionFallbackOptions, + output: &mut Vec, +) { + let turn_id = messages + .first() + .and_then(|message| message.metadata.turn_id.clone()); + let mut turn_messages = Vec::new(); + let mut latest_todo = None; + + for message in messages { + if let Some(entries) = extract_nested_compression_entries(&message) { + flush_turn_entry( + output, + turn_id.clone(), + &mut turn_messages, + &mut latest_todo, + ); + output.extend(entries); + continue; + } + + match message.content { + MessageContent::Text(text) => match message.role { + MessageRole::User => { + if let Some(text) = sanitize_user_text(&text, options) { + turn_messages.push(CompressedMessage { + role: CompressedMessageRole::User, + text: Some(text), + tool_calls: Vec::new(), + }); + } + } + MessageRole::Assistant => { + if let Some(text) = sanitize_assistant_text(&text, options) { + turn_messages.push(CompressedMessage { + role: CompressedMessageRole::Assistant, + text: Some(text), + tool_calls: Vec::new(), + }); + } + } + MessageRole::System | MessageRole::Tool => {} + }, + MessageContent::Multimodal { text, images } => { + if message.role == MessageRole::User { + let mut rendered = sanitize_user_text(&text, options).unwrap_or_default(); + if !images.is_empty() { + if !rendered.is_empty() { + rendered.push('\n'); + } + rendered.push_str(&format!("[{} image(s) omitted]", images.len())); + } + if !rendered.trim().is_empty() { + turn_messages.push(CompressedMessage { + role: CompressedMessageRole::User, + text: Some(rendered), + tool_calls: Vec::new(), + }); + } + } + } + MessageContent::Mixed { + text, tool_calls, .. + } => { + if message.role != MessageRole::Assistant { + continue; + } + + let mut compressed_tool_calls = Vec::new(); + + for tool_call in tool_calls { + if tool_call.tool_name == "TodoWrite" { + latest_todo = sanitize_todo_snapshot(&tool_call.arguments); + continue; + } + + let compressed_tool_call = CompressedToolCall { + tool_name: tool_call.tool_name.clone(), + arguments: sanitize_tool_arguments( + &tool_call.tool_name, + &tool_call.arguments, + options, + ), + is_error: tool_call.is_error, + }; + compressed_tool_calls.push(compressed_tool_call); + } + + let sanitized_text = sanitize_assistant_text(&text, options); + if sanitized_text.is_some() || !compressed_tool_calls.is_empty() { + turn_messages.push(CompressedMessage { + role: CompressedMessageRole::Assistant, + text: sanitized_text, + tool_calls: compressed_tool_calls, + }); + } + } + MessageContent::ToolResult { .. } => {} + } + } + + flush_turn_entry(output, turn_id, &mut turn_messages, &mut latest_todo); +} + +fn flush_turn_entry( + output: &mut Vec, + turn_id: Option, + turn_messages: &mut Vec, + latest_todo: &mut Option, +) { + if turn_messages.is_empty() && latest_todo.is_none() { + return; + } + + output.push(CompressionEntry::Turn { + turn_id, + messages: std::mem::take(turn_messages), + todo: latest_todo.take(), + }); +} + +fn extract_nested_compression_entries(message: &Message) -> Option> { + if message.metadata.semantic_kind != Some(MessageSemanticKind::InternalReminder) { + return None; + } + + if let Some(payload) = message.metadata.compression_payload.clone() { + if !payload.is_empty() { + return Some(payload.entries); + } + } + + let raw_text = match &message.content { + MessageContent::Text(text) => text.clone(), + MessageContent::Multimodal { text, .. } => text.clone(), + _ => String::new(), + }; + let stripped = strip_prompt_markup(&raw_text); + if stripped.is_empty() { + return None; + } + + Some(vec![CompressionEntry::ModelSummary { text: stripped }]) +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/mod.rs b/src/crates/core/src/agentic/session/compression/fallback/mod.rs new file mode 100644 index 00000000..3c074f69 --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/mod.rs @@ -0,0 +1,29 @@ +mod builder; +mod payload; +mod render; +mod sanitize; +mod types; + +use builder::build_entries_from_turns; +use payload::trim_payload_to_budget; +use render::render_payload_for_model; + +pub use types::{CompressionFallbackOptions, CompressionReminder}; + +pub fn build_structured_compression_reminder( + turns: Vec>, + options: &CompressionFallbackOptions, +) -> CompressionReminder { + let entries = build_entries_from_turns(turns, options); + let trimmed_payload = trim_payload_to_budget(entries, options); + let model_text = render_payload_for_model(&trimmed_payload); + + CompressionReminder { + model_text, + payload: trimmed_payload, + used_model_summary: false, + } +} + +#[cfg(test)] +mod tests; diff --git a/src/crates/core/src/agentic/session/compression/fallback/payload.rs b/src/crates/core/src/agentic/session/compression/fallback/payload.rs new file mode 100644 index 00000000..03560abb --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/payload.rs @@ -0,0 +1,165 @@ +use super::render::render_payload_for_model; +use super::types::{CompressionFallbackOptions, CompressionUnit}; +use crate::agentic::core::{ + render_system_reminder, CompressedMessage, CompressedTodoSnapshot, CompressionEntry, + CompressionPayload, Message, +}; + +pub(super) fn trim_payload_to_budget( + entries: Vec, + options: &CompressionFallbackOptions, +) -> CompressionPayload { + if entries.is_empty() { + return CompressionPayload::default(); + } + + let units = flatten_entries_to_units(entries); + let mut selected_units = Vec::new(); + + for unit in units.into_iter().rev() { + let mut candidate_units = vec![unit.clone()]; + candidate_units.extend(selected_units.clone()); + + let candidate_payload = rebuild_payload_from_units(candidate_units); + if estimate_payload_tokens(&candidate_payload) <= options.max_tokens { + selected_units.insert(0, unit); + } + } + + rebuild_payload_from_units(selected_units) +} + +fn flatten_entries_to_units(entries: Vec) -> Vec { + let mut units = Vec::new(); + + for (entry_id, entry) in entries.into_iter().enumerate() { + match entry { + CompressionEntry::ModelSummary { text } => { + units.push(CompressionUnit::ModelSummary { text }); + } + CompressionEntry::Turn { + turn_id, + messages, + todo, + } => { + for message in messages { + units.push(CompressionUnit::TurnMessage { + entry_id, + turn_id: turn_id.clone(), + message, + }); + } + if let Some(todo) = todo { + units.push(CompressionUnit::TurnTodo { + entry_id, + turn_id, + todo, + }); + } + } + } + } + + units +} + +fn rebuild_payload_from_units(units: Vec) -> CompressionPayload { + let mut entries = Vec::new(); + let mut current_turn_entry_id: Option = None; + let mut current_turn_id: Option = None; + let mut current_messages = Vec::new(); + let mut current_todo = None; + + for unit in units { + match unit { + CompressionUnit::ModelSummary { text } => { + flush_rebuilt_turn( + &mut entries, + &mut current_turn_entry_id, + &mut current_turn_id, + &mut current_messages, + &mut current_todo, + ); + entries.push(CompressionEntry::ModelSummary { text }); + } + CompressionUnit::TurnMessage { + entry_id, + turn_id, + message, + } => { + if current_turn_entry_id != Some(entry_id) { + flush_rebuilt_turn( + &mut entries, + &mut current_turn_entry_id, + &mut current_turn_id, + &mut current_messages, + &mut current_todo, + ); + current_turn_entry_id = Some(entry_id); + current_turn_id = turn_id; + } + current_messages.push(message); + } + CompressionUnit::TurnTodo { + entry_id, + turn_id, + todo, + } => { + if current_turn_entry_id != Some(entry_id) { + flush_rebuilt_turn( + &mut entries, + &mut current_turn_entry_id, + &mut current_turn_id, + &mut current_messages, + &mut current_todo, + ); + current_turn_entry_id = Some(entry_id); + current_turn_id = turn_id; + } + current_todo = Some(todo); + } + } + } + + flush_rebuilt_turn( + &mut entries, + &mut current_turn_entry_id, + &mut current_turn_id, + &mut current_messages, + &mut current_todo, + ); + + CompressionPayload { entries } +} + +fn flush_rebuilt_turn( + entries: &mut Vec, + current_turn_entry_id: &mut Option, + current_turn_id: &mut Option, + current_messages: &mut Vec, + current_todo: &mut Option, +) { + if current_turn_entry_id.is_none() { + return; + } + + if current_messages.is_empty() && current_todo.is_none() { + *current_turn_entry_id = None; + *current_turn_id = None; + return; + } + + entries.push(CompressionEntry::Turn { + turn_id: current_turn_id.clone(), + messages: std::mem::take(current_messages), + todo: current_todo.take(), + }); + *current_turn_entry_id = None; + *current_turn_id = None; +} + +fn estimate_payload_tokens(payload: &CompressionPayload) -> usize { + let rendered = render_payload_for_model(payload); + let mut reminder = Message::user(render_system_reminder(&rendered)); + reminder.get_tokens() +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/render.rs b/src/crates/core/src/agentic/session/compression/fallback/render.rs new file mode 100644 index 00000000..f5ca3986 --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/render.rs @@ -0,0 +1,87 @@ +use crate::agentic::core::{ + CompressedMessage, CompressedMessageRole, CompressionEntry, CompressionPayload, +}; +use serde_json::{json, Value}; + +pub(super) fn render_payload_for_model(payload: &CompressionPayload) -> String { + if payload.entries.is_empty() { + return [ + "Earlier conversation has been condensed for context management.", + "The omitted history could not be kept within the available context budget.", + ] + .join("\n\n"); + } + + let mut sections = vec![ + "Earlier conversation has been condensed for context management.".to_string(), + "The history below is a partial record. Message text, tool arguments, and task lists may have been truncated or omitted during compression. All tool results have been cleared from this compressed history. Treat it as approximate historical context rather than a complete verbatim transcript.".to_string(), + ]; + + for (index, entry) in payload.entries.iter().enumerate() { + match entry { + CompressionEntry::ModelSummary { text } => { + sections.push(format!( + "Earlier summarized history {}:\n{}", + index + 1, + text + )); + } + CompressionEntry::Turn { messages, todo, .. } => { + let mut lines = vec![format!("Historical turn {}:", index + 1)]; + for message in messages { + render_compressed_message(&mut lines, message); + } + if let Some(todo) = todo { + lines.push("Latest task list for this turn:".to_string()); + if todo.todos.is_empty() { + if let Some(summary) = todo.summary.as_ref() { + lines.push(format!("- {}", summary)); + } + } else { + for todo_item in &todo.todos { + lines.push(format!("- [{}] {}", todo_item.status, todo_item.content)); + } + if let Some(summary) = todo.summary.as_ref() { + lines.push(format!("Task list note: {}", summary)); + } + } + } + sections.push(lines.join("\n")); + } + } + } + + sections.join("\n\n") +} + +fn render_compressed_message(lines: &mut Vec, message: &CompressedMessage) { + let role_label = match message.role { + CompressedMessageRole::User => "User", + CompressedMessageRole::Assistant => "Assistant", + }; + + if let Some(text) = message.text.as_ref() { + lines.push(format!("{role_label}: {text}")); + } else { + lines.push(format!("{role_label}:")); + } + + for tool_call in &message.tool_calls { + let mut rendered = tool_call.tool_name.clone(); + if let Some(arguments) = tool_call.arguments.as_ref() { + rendered.push(' '); + rendered.push_str(&render_tool_arguments(arguments)); + } + if tool_call.is_error { + rendered.push_str(" [error]"); + } + lines.push(format!("Tool call: {}", rendered)); + } +} + +fn render_tool_arguments(arguments: &Value) -> String { + if arguments.is_null() { + return "{}".to_string(); + } + serde_json::to_string(arguments).unwrap_or_else(|_| json!({}).to_string()) +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs b/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs new file mode 100644 index 00000000..c87f9e12 --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs @@ -0,0 +1,304 @@ +use super::types::CompressionFallbackOptions; +use crate::agentic::core::{CompressedTodoItem, CompressedTodoSnapshot}; +use serde_json::{Map, Value}; + +pub(super) fn sanitize_user_text( + text: &str, + options: &CompressionFallbackOptions, +) -> Option { + sanitize_text(text, options.user_chars) +} + +pub(super) fn sanitize_assistant_text( + text: &str, + options: &CompressionFallbackOptions, +) -> Option { + sanitize_text(text, options.assistant_chars) +} + +pub(super) fn sanitize_todo_snapshot(arguments: &Value) -> Option { + let todos = arguments.get("todos")?.as_array()?; + let mut compressed_todos = Vec::new(); + + for todo in todos { + let Some(todo_object) = todo.as_object() else { + continue; + }; + let Some(content) = todo_object + .get("content") + .and_then(Value::as_str) + .map(str::trim) + .filter(|content| !content.is_empty()) + else { + continue; + }; + let status = todo_object + .get("status") + .and_then(Value::as_str) + .unwrap_or("pending"); + let id = todo_object + .get("id") + .and_then(Value::as_str) + .map(str::to_string); + + compressed_todos.push(CompressedTodoItem { + id, + content: content.to_string(), + status: status.to_string(), + }); + } + + if compressed_todos.is_empty() { + return None; + } + + Some(CompressedTodoSnapshot { + todos: compressed_todos, + summary: None, + }) +} + +pub(super) fn sanitize_tool_arguments( + tool_name: &str, + arguments: &Value, + options: &CompressionFallbackOptions, +) -> Option { + let Some(object) = arguments.as_object() else { + return sanitize_generic_value(arguments, options); + }; + + let sanitized = match tool_name { + "Read" => { + let mut result = Map::new(); + copy_field(object, &mut result, "file_path"); + copy_field(object, &mut result, "start_line"); + copy_field(object, &mut result, "limit"); + result + } + "Write" => { + let mut result = Map::new(); + copy_field(object, &mut result, "file_path"); + insert_cleared_field(object, &mut result, "content"); + result + } + "Edit" => { + let mut result = Map::new(); + copy_field(object, &mut result, "file_path"); + copy_field(object, &mut result, "replace_all"); + insert_cleared_field(object, &mut result, "old_string"); + insert_cleared_field(object, &mut result, "new_string"); + result + } + "Grep" => { + let mut result = Map::new(); + for key in [ + "pattern", + "path", + "glob", + "type", + "head_limit", + "multiline", + "-A", + "-B", + "-C", + "-i", + "-n", + "output_mode", + ] { + copy_field(object, &mut result, key); + } + result + } + "Glob" => { + let mut result = Map::new(); + copy_field(object, &mut result, "pattern"); + copy_field(object, &mut result, "path"); + copy_field(object, &mut result, "limit"); + result + } + "LS" => { + let mut result = Map::new(); + copy_field(object, &mut result, "path"); + copy_field(object, &mut result, "ignore"); + copy_field(object, &mut result, "limit"); + result + } + "GetFileDiff" => { + let mut result = Map::new(); + copy_field(object, &mut result, "file_path"); + result + } + "DeleteFile" => { + let mut result = Map::new(); + copy_field(object, &mut result, "path"); + copy_field(object, &mut result, "recursive"); + result + } + "Git" => { + let mut result = Map::new(); + copy_field(object, &mut result, "operation"); + copy_field(object, &mut result, "working_directory"); + if let Some(args) = object.get("args") { + if let Some(value) = sanitize_generic_value(args, options) { + result.insert("args".to_string(), value); + } + } + result + } + "Bash" => { + let mut result = Map::new(); + insert_sanitize_text(object, &mut result, "command", options.tool_command_chars); + result + } + "TerminalControl" => { + let mut result = Map::new(); + copy_field(object, &mut result, "action"); + copy_field(object, &mut result, "terminal_session_id"); + result + } + "Skill" => { + let mut result = Map::new(); + copy_field(object, &mut result, "command"); + result + } + "CreatePlan" => { + let mut result = Map::new(); + copy_field(object, &mut result, "name"); + copy_field(object, &mut result, "overview"); + insert_cleared_field(object, &mut result, "plan"); + insert_cleared_field(object, &mut result, "todos"); + result + } + "WebSearch" => { + let mut result = Map::new(); + copy_field(object, &mut result, "query"); + result + } + "WebFetch" => { + let mut result = Map::new(); + copy_field(object, &mut result, "url"); + result + } + _ => sanitize_generic_object(object, options), + }; + + if sanitized.is_empty() { + None + } else { + Some(Value::Object(sanitized)) + } +} + +pub(super) fn sanitize_generic_object( + object: &Map, + options: &CompressionFallbackOptions, +) -> Map { + let mut sanitized = Map::new(); + + for (key, value) in object { + let heavy_string = matches!( + key.as_str(), + "content" + | "contents" + | "old_string" + | "new_string" + | "text" + | "output" + | "stdout" + | "stderr" + | "diff" + | "file_diff" + | "original_content" + | "new_content" + | "data_url" + | "data_base64" + ); + if heavy_string { + if let Some(text) = value.as_str() { + sanitized.insert( + format!("{key}_chars"), + Value::Number(serde_json::Number::from(text.chars().count() as u64)), + ); + } + continue; + } + + if let Some(value) = sanitize_generic_value(value, options) { + sanitized.insert(key.clone(), value); + } + } + + sanitized +} + +pub(super) fn sanitize_generic_value( + value: &Value, + options: &CompressionFallbackOptions, +) -> Option { + match value { + Value::Null => None, + Value::Bool(_) | Value::Number(_) => Some(value.clone()), + Value::String(text) => sanitize_text(text, options.tool_arg_chars).map(Value::String), + Value::Array(values) => { + let sanitized_values: Vec = values + .iter() + .take(5) + .filter_map(|value| sanitize_generic_value(value, options)) + .collect(); + if sanitized_values.is_empty() { + None + } else { + Some(Value::Array(sanitized_values)) + } + } + Value::Object(object) => { + let sanitized_object = sanitize_generic_object(object, options); + if sanitized_object.is_empty() { + None + } else { + Some(Value::Object(sanitized_object)) + } + } + } +} + +fn sanitize_text(text: &str, limit: usize) -> Option { + let trimmed = text.trim(); + if trimmed.is_empty() { + return None; + } + + let text_len = trimmed.chars().count(); + if text_len <= limit { + return Some(trimmed.to_string()); + } + + let mut truncated: String = trimmed.chars().take(limit).collect(); + truncated.push_str(" ... [truncated]"); + Some(truncated) +} + +fn copy_field(source: &Map, target: &mut Map, key: &str) { + if let Some(value) = source.get(key) { + target.insert(key.to_string(), value.clone()); + } +} + +fn insert_sanitize_text( + source: &Map, + target: &mut Map, + key: &str, + limit: usize, +) { + if let Some(value) = source.get(key).and_then(Value::as_str) { + if let Some(text) = sanitize_text(value, limit) { + target.insert(key.to_string(), Value::String(text)); + } + } +} + +fn insert_cleared_field(source: &Map, target: &mut Map, key: &str) { + if source.get(key).is_some() { + target.insert(key.to_string(), Value::String("[cleared]".to_string())); + } +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/tests.rs b/src/crates/core/src/agentic/session/compression/fallback/tests.rs new file mode 100644 index 00000000..c3720c04 --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/tests.rs @@ -0,0 +1,80 @@ +use super::{build_structured_compression_reminder, CompressionFallbackOptions}; +use crate::agentic::core::{ + render_system_reminder, CompressedMessageRole, CompressionEntry, CompressionPayload, Message, + MessageSemanticKind, ToolCall, ToolResult, +}; +use serde_json::json; + +fn default_options() -> CompressionFallbackOptions { + CompressionFallbackOptions { + max_tokens: 10_000, + user_chars: 120, + assistant_chars: 120, + tool_arg_chars: 80, + tool_command_chars: 80, + } +} + +#[test] +fn clears_tool_results_from_compressed_history() { + let assistant = Message::assistant_with_tools( + "Checking file".to_string(), + vec![ToolCall { + tool_id: "tool_1".to_string(), + tool_name: "Read".to_string(), + arguments: json!({ + "file_path": "/tmp/demo.rs", + "start_line": 1, + "limit": 20 + }), + is_error: false, + }], + ); + let tool_result = Message::tool_result(ToolResult { + tool_id: "tool_1".to_string(), + tool_name: "Read".to_string(), + result: json!({"content": "ignored"}), + result_for_assistant: Some("Read succeeded with file preview".to_string()), + is_error: false, + duration_ms: None, + image_attachments: None, + }); + + let reminder = build_structured_compression_reminder( + vec![vec![ + Message::user("inspect".to_string()), + assistant, + tool_result, + ]], + &default_options(), + ); + + let turn = match &reminder.payload.entries[0] { + CompressionEntry::Turn { messages, .. } => messages, + _ => panic!("expected turn entry"), + }; + let assistant_message = turn + .iter() + .find(|message| message.role == CompressedMessageRole::Assistant) + .expect("assistant message"); + + assert_eq!(assistant_message.tool_calls.len(), 1); + assert!(!reminder.model_text.contains("Tool result:")); + assert!(reminder.model_text.contains("All tool results have been cleared")); +} + +#[test] +fn reuses_existing_compression_payload_atomically() { + let prior_summary = "Previous conversation summary".to_string(); + let reminder_message = Message::user(render_system_reminder(&prior_summary)) + .with_semantic_kind(MessageSemanticKind::InternalReminder) + .with_compression_payload(CompressionPayload::from_summary(prior_summary.clone())); + + let reminder = + build_structured_compression_reminder(vec![vec![reminder_message]], &default_options()); + + assert!(matches!( + &reminder.payload.entries[0], + CompressionEntry::ModelSummary { text } if text == &prior_summary + )); +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/types.rs b/src/crates/core/src/agentic/session/compression/fallback/types.rs new file mode 100644 index 00000000..fc1be95c --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/fallback/types.rs @@ -0,0 +1,34 @@ +use crate::agentic::core::{CompressedMessage, CompressedTodoSnapshot, CompressionPayload}; + +#[derive(Debug, Clone)] +pub struct CompressionFallbackOptions { + pub max_tokens: usize, + pub user_chars: usize, + pub assistant_chars: usize, + pub tool_arg_chars: usize, + pub tool_command_chars: usize, +} + +#[derive(Debug, Clone)] +pub struct CompressionReminder { + pub model_text: String, + pub payload: CompressionPayload, + pub used_model_summary: bool, +} + +#[derive(Debug, Clone)] +pub(super) enum CompressionUnit { + ModelSummary { + text: String, + }, + TurnMessage { + entry_id: usize, + turn_id: Option, + message: CompressedMessage, + }, + TurnTodo { + entry_id: usize, + turn_id: Option, + todo: CompressedTodoSnapshot, + }, +} diff --git a/src/crates/core/src/agentic/session/compression/mod.rs b/src/crates/core/src/agentic/session/compression/mod.rs new file mode 100644 index 00000000..9437095a --- /dev/null +++ b/src/crates/core/src/agentic/session/compression/mod.rs @@ -0,0 +1,7 @@ +//! Session context compression modules. + +pub mod compressor; +pub mod fallback; + +pub use compressor::*; +pub use fallback::*; diff --git a/src/crates/core/src/agentic/session/context_store.rs b/src/crates/core/src/agentic/session/context_store.rs new file mode 100644 index 00000000..9023ccdf --- /dev/null +++ b/src/crates/core/src/agentic/session/context_store.rs @@ -0,0 +1,53 @@ +//! Runtime session context store. +//! +//! Holds the in-memory model context for each active session. + +use crate::agentic::core::Message; +use dashmap::DashMap; +use log::debug; +use std::sync::Arc; + +/// In-memory runtime context store for active sessions. +pub struct SessionContextStore { + session_contexts: Arc>>, +} + +impl SessionContextStore { + pub fn new() -> Self { + Self { + session_contexts: Arc::new(DashMap::new()), + } + } + + pub fn create_session(&self, session_id: &str) { + self.session_contexts.insert(session_id.to_string(), vec![]); + debug!("Created session context cache: session_id={}", session_id); + } + + pub fn add_message(&self, session_id: &str, message: Message) { + if let Some(mut cached_messages) = self.session_contexts.get_mut(session_id) { + cached_messages.push(message); + } else { + self.session_contexts + .insert(session_id.to_string(), vec![message]); + } + } + + pub fn replace_context(&self, session_id: &str, messages: Vec) { + self.session_contexts + .insert(session_id.to_string(), messages); + debug!("Replaced session context cache: session_id={}", session_id); + } + + pub fn get_context_messages(&self, session_id: &str) -> Vec { + self.session_contexts + .get(session_id) + .map(|messages| messages.clone()) + .unwrap_or_default() + } + + pub fn delete_session(&self, session_id: &str) { + self.session_contexts.remove(session_id); + debug!("Deleted session context cache: session_id={}", session_id); + } +} diff --git a/src/crates/core/src/agentic/session/history_manager.rs b/src/crates/core/src/agentic/session/history_manager.rs deleted file mode 100644 index f83ac119..00000000 --- a/src/crates/core/src/agentic/session/history_manager.rs +++ /dev/null @@ -1,194 +0,0 @@ -//! Message History Manager -//! -//! Manages session message history, supports memory caching and persistence - -use crate::agentic::core::Message; -use crate::agentic::persistence::PersistenceManager; -use crate::util::errors::BitFunResult; -use dashmap::DashMap; -use log::debug; -use std::sync::Arc; - -/// Message history configuration -#[derive(Debug, Clone)] -pub struct HistoryConfig { - pub enable_persistence: bool, -} - -impl Default for HistoryConfig { - fn default() -> Self { - Self { - enable_persistence: true, - } - } -} - -/// Message history manager -pub struct MessageHistoryManager { - /// Message history in memory (by session ID) - histories: Arc>>, - - /// Persistence manager - persistence: Arc, - - /// Configuration - config: HistoryConfig, -} - -impl MessageHistoryManager { - pub fn new(persistence: Arc, config: HistoryConfig) -> Self { - Self { - histories: Arc::new(DashMap::new()), - persistence, - config, - } - } - - /// Create session history - pub async fn create_session(&self, session_id: &str) -> BitFunResult<()> { - self.histories.insert(session_id.to_string(), vec![]); - debug!("Created session history: session_id={}", session_id); - Ok(()) - } - - /// Add message - pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - // 1. Add to memory - if let Some(mut messages) = self.histories.get_mut(session_id) { - messages.push(message.clone()); - } else { - // Session doesn't exist, create and add - self.histories - .insert(session_id.to_string(), vec![message.clone()]); - } - - // 2. Persist - if self.config.enable_persistence { - self.persistence - .append_message(session_id, &message) - .await?; - } - - Ok(()) - } - - /// Get message history - pub async fn get_messages(&self, session_id: &str) -> BitFunResult> { - // First try to get from memory - if let Some(messages) = self.histories.get(session_id) { - return Ok(messages.clone()); - } - - // Load from persistence - if self.config.enable_persistence { - let messages = self.persistence.load_messages(session_id).await?; - - // Cache to memory - if !messages.is_empty() { - self.histories - .insert(session_id.to_string(), messages.clone()); - } - - Ok(messages) - } else { - Ok(vec![]) - } - } - - /// Get paginated message history - pub async fn get_messages_paginated( - &self, - session_id: &str, - limit: usize, - before_message_id: Option<&str>, - ) -> BitFunResult<(Vec, bool)> { - let messages = self.get_messages(session_id).await?; - - if messages.is_empty() { - return Ok((vec![], false)); - } - - let end_idx = if let Some(before_id) = before_message_id { - messages.iter().position(|m| m.id == before_id).unwrap_or(0) - } else { - messages.len() - }; - - if end_idx == 0 { - return Ok((vec![], false)); - } - - let start_idx = end_idx.saturating_sub(limit); - let has_more = start_idx > 0; - - Ok((messages[start_idx..end_idx].to_vec(), has_more)) - } - - /// Get recent N messages - pub async fn get_recent_messages( - &self, - session_id: &str, - count: usize, - ) -> BitFunResult> { - let messages = self.get_messages(session_id).await?; - let start = messages.len().saturating_sub(count); - Ok(messages[start..].to_vec()) - } - - /// Get message count - pub async fn count_messages(&self, session_id: &str) -> usize { - if let Some(messages) = self.histories.get(session_id) { - messages.len() - } else if self.config.enable_persistence { - // Load from persistence - self.persistence - .load_messages(session_id) - .await - .map(|msgs| msgs.len()) - .unwrap_or(0) - } else { - 0 - } - } - - /// Clear message history - pub async fn clear_messages(&self, session_id: &str) -> BitFunResult<()> { - // Clear memory - if let Some(mut messages) = self.histories.get_mut(session_id) { - messages.clear(); - } - - // Clear persistence - if self.config.enable_persistence { - self.persistence.clear_messages(session_id).await?; - } - - debug!("Cleared session message history: session_id={}", session_id); - Ok(()) - } - - /// Delete session - pub async fn delete_session(&self, session_id: &str) -> BitFunResult<()> { - // Remove from memory - self.histories.remove(session_id); - - // Delete from persistence - if self.config.enable_persistence { - self.persistence.delete_messages(session_id).await?; - } - - debug!("Deleted session history: session_id={}", session_id); - Ok(()) - } - - /// Restore session (load from persistence) - pub async fn restore_session( - &self, - session_id: &str, - messages: Vec, - ) -> BitFunResult<()> { - self.histories.insert(session_id.to_string(), messages); - debug!("Restored session history: session_id={}", session_id); - Ok(()) - } -} diff --git a/src/crates/core/src/agentic/session/mod.rs b/src/crates/core/src/agentic/session/mod.rs index baac1fed..1b0b22a9 100644 --- a/src/crates/core/src/agentic/session/mod.rs +++ b/src/crates/core/src/agentic/session/mod.rs @@ -1,11 +1,11 @@ //! Session Management Layer //! -//! Provides session lifecycle management, message history, and context management +//! Provides session lifecycle management and context management. -pub mod compression_manager; -pub mod history_manager; +pub mod compression; +pub mod context_store; pub mod session_manager; -pub use compression_manager::*; -pub use history_manager::*; +pub use compression::*; +pub use context_store::*; pub use session_manager::*; diff --git a/src/crates/core/src/agentic/session/session_manager.rs b/src/crates/core/src/agentic/session/session_manager.rs index 8a9b8fc9..2253f918 100644 --- a/src/crates/core/src/agentic/session/session_manager.rs +++ b/src/crates/core/src/agentic/session/session_manager.rs @@ -8,7 +8,7 @@ use crate::agentic::core::{ }; use crate::agentic::image_analysis::ImageContextData; use crate::agentic::persistence::PersistenceManager; -use crate::agentic::session::{CompressionManager, MessageHistoryManager}; +use crate::agentic::session::SessionContextStore; use crate::infrastructure::ai::get_global_ai_client_factory; use crate::service::session::{ DialogTurnData, ModelRoundData, TextItemData, TurnStatus, UserMessageData, @@ -49,8 +49,7 @@ pub struct SessionManager { sessions: Arc>, /// Sub-components - history_manager: Arc, - compression_manager: Arc, + context_store: Arc, persistence_manager: Arc, /// Configuration @@ -58,6 +57,31 @@ pub struct SessionManager { } impl SessionManager { + fn paginate_messages( + messages: &[Message], + limit: usize, + before_message_id: Option<&str>, + ) -> (Vec, bool) { + if messages.is_empty() { + return (vec![], false); + } + + let end_idx = if let Some(before_id) = before_message_id { + messages.iter().position(|m| m.id == before_id).unwrap_or(0) + } else { + messages.len() + }; + + if end_idx == 0 { + return (vec![], false); + } + + let start_idx = end_idx.saturating_sub(limit); + let has_more = start_idx > 0; + + (messages[start_idx..end_idx].to_vec(), has_more) + } + fn session_workspace_from_config(config: &SessionConfig) -> Option { config.workspace_path.as_ref().map(PathBuf::from) } @@ -108,15 +132,7 @@ impl SessionManager { Self::effective_workspace_path_from_config(&config).await } - async fn rebuild_messages_from_turns( - &self, - workspace_path: &Path, - session_id: &str, - ) -> BitFunResult> { - let turns = self - .persistence_manager - .load_session_turns(workspace_path, session_id) - .await?; + fn build_messages_from_turns(turns: &[DialogTurnData]) -> Vec { let mut messages = Vec::new(); for turn in turns { @@ -181,12 +197,86 @@ impl SessionManager { } } - Ok(messages) + messages + } + + async fn rebuild_messages_from_turns( + &self, + workspace_path: &Path, + session_id: &str, + ) -> BitFunResult> { + let turns = self + .persistence_manager + .load_session_turns(workspace_path, session_id) + .await?; + Ok(Self::build_messages_from_turns(&turns)) + } + + /// Persist the current runtime context by overwriting `snapshots/context-{turn_index}.json`. + /// + /// Save timing is intentionally tied to semantic context changes rather than token chunks: + /// - after a turn starts and the user message enters runtime context + /// - after assistant/tool messages are appended to runtime context + /// - after compression replaces runtime context + /// - once more when a turn completes or fails + /// + /// This is still a best-effort multi-file persistence flow, not a transactional commit. + /// `session.json`, `turns/turn-*.json`, and `snapshots/context-*.json` may be briefly out of + /// sync if the process crashes between writes, so restore logic must tolerate partial updates. + async fn persist_context_snapshot_for_turn_best_effort( + &self, + session_id: &str, + turn_index: usize, + reason: &str, + ) { + if !self.config.enable_persistence { + return; + } + + let Some(workspace_path) = self.effective_session_workspace_path(session_id).await else { + debug!( + "Skipping context snapshot persistence because workspace path is unavailable: session_id={}, turn_index={}, reason={}", + session_id, turn_index, reason + ); + return; + }; + + let context_messages = self.context_store.get_context_messages(session_id); + if let Err(err) = self + .persistence_manager + .save_turn_context_snapshot(&workspace_path, session_id, turn_index, &context_messages) + .await + { + warn!( + "failed to persist context snapshot: session_id={}, turn_index={}, reason={}, err={}", + session_id, turn_index, reason, err + ); + } + } + + async fn persist_current_turn_context_snapshot_best_effort( + &self, + session_id: &str, + reason: &str, + ) { + let Some(turn_index) = self + .sessions + .get(session_id) + .and_then(|session| session.dialog_turn_ids.len().checked_sub(1)) + else { + debug!( + "Skipping current-turn context snapshot because no turn is active: session_id={}, reason={}", + session_id, reason + ); + return; + }; + + self.persist_context_snapshot_for_turn_best_effort(session_id, turn_index, reason) + .await; } pub fn new( - history_manager: Arc, - compression_manager: Arc, + context_store: Arc, persistence_manager: Arc, config: SessionManagerConfig, ) -> Self { @@ -194,8 +284,7 @@ impl SessionManager { let manager = Self { sessions: Arc::new(DashMap::new()), - history_manager, - compression_manager, + context_store, persistence_manager, config, }; @@ -272,15 +361,10 @@ impl SessionManager { // 1. Add to memory self.sessions.insert(session_id.clone(), session.clone()); - // 2. Initialize message history - self.history_manager - .create_session(&session_id) - .await?; - - // 3. Initialize compression manager - self.compression_manager.create_session(&session_id); + // 2. Initialize the in-memory context cache. + self.context_store.create_session(&session_id); - // 4. Persist to local path (handles remote workspaces correctly) + // 3. Persist to local path (handles remote workspaces correctly) if self.config.enable_persistence { if let Some(session) = self.sessions.get(&session_id) { self.persistence_manager @@ -382,10 +466,9 @@ impl SessionManager { if self.config.enable_persistence { let effective_path = self.effective_session_workspace_path(session_id).await; - if let (Some(workspace_path), Some(session)) = ( - effective_path, - self.sessions.get(session_id), - ) { + if let (Some(workspace_path), Some(session)) = + (effective_path, self.sessions.get(session_id)) + { self.persistence_manager .save_session(&workspace_path, &session) .await?; @@ -419,10 +502,9 @@ impl SessionManager { if self.config.enable_persistence { let effective_path = self.effective_session_workspace_path(session_id).await; - if let (Some(workspace_path), Some(session)) = ( - effective_path, - self.sessions.get(session_id), - ) { + if let (Some(workspace_path), Some(session)) = + (effective_path, self.sessions.get(session_id)) + { self.persistence_manager .save_session(&workspace_path, &session) .await?; @@ -464,10 +546,9 @@ impl SessionManager { } } - // 2. Delete message history - self.history_manager.delete_session(session_id).await?; + self.context_store.delete_session(session_id); - // 3. Delete persisted data + // 2. Delete persisted data if self.config.enable_persistence { self.persistence_manager .delete_session(workspace_path, session_id) @@ -492,7 +573,7 @@ impl SessionManager { } } - // 4. Clean up associated Terminal session + // 3. Clean up associated Terminal session use crate::service::terminal::TerminalApi; if let Ok(terminal_api) = TerminalApi::from_singleton() { let binding = terminal_api.session_manager().binding(); @@ -508,7 +589,7 @@ impl SessionManager { } } - // 5. Remove from memory + // 4. Remove from memory self.sessions.remove(session_id); info!("Session deletion completed: session_id={}", session_id); @@ -553,9 +634,22 @@ impl SessionManager { ); } - // 2. Load message history - full list by turn, may already be compressed + // 2. Restore runtime context with snapshot-first semantics. + // If the latest snapshot lags behind turn persistence, append the missing turn delta + // instead of truncating session history. + // + // This compensates for the fact that persistence is not transactional across + // `session.json`, `turns/*.json`, and `snapshots/context-*.json`. + let persisted_turns = self + .persistence_manager + .load_session_turns(&session_storage_path, session_id) + .await?; + let persisted_turn_ids: Vec = persisted_turns + .iter() + .map(|turn| turn.turn_id.clone()) + .collect(); let mut latest_turn_index: Option = None; - let messages = match self + let mut messages = match self .persistence_manager .load_latest_turn_context_snapshot(&session_storage_path, session_id) .await? @@ -564,9 +658,21 @@ impl SessionManager { latest_turn_index = Some(turn_index); msgs } - None => { - self.rebuild_messages_from_turns(&session_storage_path, session_id) - .await? + None => Self::build_messages_from_turns(&persisted_turns), + }; + + if let Some(snapshot_turn_index) = latest_turn_index { + let delta_start = snapshot_turn_index.saturating_add(1); + if delta_start < persisted_turns.len() { + warn!( + "Context snapshot is behind persisted turns, rebuilding delta: session_id={}, snapshot_turn_index={}, persisted_turn_count={}", + session_id, + snapshot_turn_index, + persisted_turns.len() + ); + messages.extend(Self::build_messages_from_turns( + &persisted_turns[delta_start..], + )); } }; @@ -577,33 +683,48 @@ impl SessionManager { ); } - self.history_manager - .restore_session(session_id, messages.clone()) - .await?; - - // 3. Restore compression manager - batch restore, don't trigger persistence + // 3. Restore the in-memory context cache from the recovered messages. // If session already exists, delete old one first then create (ensure clean state) if session_already_in_memory { - self.compression_manager.delete_session(session_id); + self.context_store.delete_session(session_id); } - // Use restore_session for batch restore, avoid triggering persistence for each add_message - self.compression_manager - .restore_session(session_id, messages.clone()); + self.context_store + .replace_context(session_id, messages.clone()); - // If session's recorded turn count doesn't match snapshot, truncate to snapshot's turn - if let Some(latest_turn_index) = latest_turn_index { - let expected_turn_count = latest_turn_index + 1; - if session.dialog_turn_ids.len() > expected_turn_count { - warn!( - "Session turn count exceeds snapshot, truncating: session_id={}, turns={} -> {}", - session_id, - session.dialog_turn_ids.len(), - expected_turn_count - ); - session.dialog_turn_ids.truncate(expected_turn_count); - } - } else if !session.dialog_turn_ids.is_empty() && messages.is_empty() { + let recoverable_turn_count = latest_turn_index + .map(|turn_index| turn_index + 1) + .unwrap_or(0) + .max(persisted_turns.len()); + + if session.dialog_turn_ids.len() < persisted_turns.len() { + warn!( + "Session metadata is behind persisted turns, rebuilding dialog_turn_ids: session_id={}, session_turn_count={}, persisted_turn_count={}", + session_id, + session.dialog_turn_ids.len(), + persisted_turns.len() + ); + session.dialog_turn_ids = persisted_turn_ids; + } else if session.dialog_turn_ids.len() > recoverable_turn_count { + warn!( + "Session metadata exceeds recoverable history, truncating: session_id={}, session_turn_count={}, recoverable_turn_count={}", + session_id, + session.dialog_turn_ids.len(), + recoverable_turn_count + ); + session.dialog_turn_ids.truncate(recoverable_turn_count); + } else if persisted_turns.len() == session.dialog_turn_ids.len() + && session.dialog_turn_ids != persisted_turn_ids + { + warn!( + "Session metadata turn ids diverge from persisted turns, normalizing order: session_id={}", + session_id + ); + session.dialog_turn_ids = persisted_turn_ids; + } + + if recoverable_turn_count == 0 && !session.dialog_turn_ids.is_empty() && messages.is_empty() + { warn!( "Session has no available context snapshot and messages are empty, clearing turns: session_id={}", session_id @@ -611,10 +732,7 @@ impl SessionManager { session.dialog_turn_ids.clear(); } - let context_msg_count = self - .compression_manager - .get_context_messages(session_id) - .len(); + let context_msg_count = self.context_store.get_context_messages(session_id).len(); info!( "Session restored: session_id={}, session_name={}, messages={}, context_messages={}", @@ -659,12 +777,8 @@ impl SessionManager { })? }; - // 2) Restore history/compression context in memory - self.history_manager - .restore_session(session_id, messages.clone()) - .await?; - self.compression_manager - .restore_session(session_id, messages); + // 2) Restore the in-memory context cache. + self.context_store.replace_context(session_id, messages); // 3) Truncate session turn list & persist if let Some(mut session) = self.sessions.get_mut(session_id) { @@ -735,8 +849,9 @@ impl SessionManager { let session = self .get_session(session_id) .ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", session_id)))?; - let workspace_path = - Self::effective_workspace_path_from_config(&session.config).await.ok_or_else(|| { + let workspace_path = Self::effective_workspace_path_from_config(&session.config) + .await + .ok_or_else(|| { BitFunError::Validation(format!( "Session workspace_path is missing: {}", session_id @@ -764,7 +879,7 @@ impl SessionManager { session.last_activity_at = SystemTime::now(); } - // 2. Add user message to history and compression managers + // 2. Add the user message to the in-memory context cache. let user_message = if let Some(images) = image_contexts.as_ref().filter(|v| !v.is_empty()).cloned() { Message::user_multimodal(user_input.clone(), images) @@ -775,12 +890,7 @@ impl SessionManager { .with_turn_id(turn_id.clone()) .with_semantic_kind(MessageSemanticKind::ActualUserInput) }; - self.history_manager - .add_message(session_id, user_message.clone()) - .await?; - self.compression_manager - .add_message(session_id, user_message) - .await?; + self.context_store.add_message(session_id, user_message); // 3. Persist if self.config.enable_persistence { @@ -809,6 +919,9 @@ impl SessionManager { .await?; } + self.persist_context_snapshot_for_turn_best_effort(session_id, turn_index, "turn_started") + .await; + debug!( "Starting dialog turn: turn_id={}, turn_index={}", turn_id, turn_index @@ -825,9 +938,15 @@ impl SessionManager { final_response: String, stats: TurnStats, ) -> BitFunResult<()> { - let workspace_path = self.effective_session_workspace_path(session_id).await.ok_or_else(|| { - BitFunError::Validation(format!("Session workspace_path is missing: {}", session_id)) - })?; + let workspace_path = self + .effective_session_workspace_path(session_id) + .await + .ok_or_else(|| { + BitFunError::Validation(format!( + "Session workspace_path is missing: {}", + session_id + )) + })?; let turn_index = self .sessions .get(session_id) @@ -880,37 +999,12 @@ impl SessionManager { turn.duration_ms = Some(stats.duration_ms); turn.end_time = Some(completion_timestamp); - if self.config.enable_persistence { - match self.get_context_messages(session_id).await { - Ok(context_messages) => { - if let Err(err) = self - .persistence_manager - .save_turn_context_snapshot( - &workspace_path, - session_id, - turn.turn_index, - &context_messages, - ) - .await - { - warn!( - "failed to save turn context snapshot: session_id={}, turn_index={}, err={}", - session_id, - turn.turn_index, - err - ); - } - } - Err(err) => { - warn!( - "failed to build context messages for snapshot: session_id={}, turn_index={}, err={}", - session_id, - turn.turn_index, - err - ); - } - } - } + self.persist_context_snapshot_for_turn_best_effort( + session_id, + turn.turn_index, + "turn_completed", + ) + .await; // Persist if self.config.enable_persistence { @@ -935,9 +1029,15 @@ impl SessionManager { turn_id: &str, error: String, ) -> BitFunResult<()> { - let workspace_path = self.effective_session_workspace_path(session_id).await.ok_or_else(|| { - BitFunError::Validation(format!("Session workspace_path is missing: {}", session_id)) - })?; + let workspace_path = self + .effective_session_workspace_path(session_id) + .await + .ok_or_else(|| { + BitFunError::Validation(format!( + "Session workspace_path is missing: {}", + session_id + )) + })?; let turn_index = self .sessions .get(session_id) @@ -957,32 +1057,13 @@ impl SessionManager { .as_millis() as u64, ); + self.persist_context_snapshot_for_turn_best_effort( + session_id, + turn.turn_index, + "turn_failed", + ) + .await; if self.config.enable_persistence { - match self.get_context_messages(session_id).await { - Ok(context_messages) => { - if let Err(err) = self - .persistence_manager - .save_turn_context_snapshot( - &workspace_path, - session_id, - turn.turn_index, - &context_messages, - ) - .await - { - warn!( - "failed to save turn context snapshot on failure: session_id={}, turn_index={}, err={}", - session_id, turn.turn_index, err - ); - } - } - Err(err) => { - warn!( - "failed to build context messages for snapshot on failure: session_id={}, turn_index={}, err={}", - session_id, turn.turn_index, err - ); - } - } self.persistence_manager .save_dialog_turn(&workspace_path, &turn) .await?; @@ -1011,8 +1092,13 @@ impl SessionManager { let session = self.sessions.get(child_session_id).ok_or_else(|| { BitFunError::NotFound(format!("Session not found: {}", child_session_id)) })?; - let turn_id = format!("btw-turn-{}", request_id); + let turn_index = session + .dialog_turn_ids + .iter() + .position(|existing| existing == &turn_id) + .unwrap_or(session.dialog_turn_ids.len()); + let user_message_id = format!("btw-user-{}", request_id); let round_id = format!("btw-round-{}", request_id); let text_id = format!("btw-text-{}", request_id); @@ -1023,7 +1109,7 @@ impl SessionManager { let mut turn = DialogTurnData::new( turn_id.clone(), - 0, + turn_index, child_session_id.to_string(), UserMessageData { id: user_message_id, @@ -1074,28 +1160,18 @@ impl SessionManager { .save_dialog_turn(workspace_path, &turn) .await?; - // Sync messages to in-memory caches so subsequent对话 can access context + // Sync messages to the in-memory caches so subsequent turns can access context. let user_message = Message::user(question.to_string()) .with_turn_id(turn_id.clone()) .with_semantic_kind(MessageSemanticKind::ActualUserInput); - let assistant_message = Message::assistant(full_text.to_string()) - .with_turn_id(turn_id.clone()); - - // Add to MessageHistoryManager - self.history_manager - .add_message(child_session_id, user_message.clone()) - .await?; - self.history_manager - .add_message(child_session_id, assistant_message.clone()) - .await?; + let assistant_message = + Message::assistant(full_text.to_string()).with_turn_id(turn_id.clone()); - // Add to CompressionManager - self.compression_manager - .add_message(child_session_id, user_message) - .await?; - self.compression_manager - .add_message(child_session_id, assistant_message) - .await?; + // Add to the in-memory runtime context cache. + self.context_store + .add_message(child_session_id, user_message); + self.context_store + .add_message(child_session_id, assistant_message); if let Some(mut session) = self.sessions.get_mut(child_session_id) { if !session @@ -1107,51 +1183,79 @@ impl SessionManager { } session.updated_at = SystemTime::now(); session.last_activity_at = SystemTime::now(); + + if self.config.enable_persistence { + self.persistence_manager + .save_session(workspace_path, &session) + .await?; + } } + self.persist_context_snapshot_for_turn_best_effort( + child_session_id, + turn_index, + "btw_turn_persisted", + ) + .await; + Ok(()) } // ============ Helper Methods ============ - /// Get session's message history (complete) + /// Get a best-effort message view for the session. + /// When persistence is enabled, rebuild from persisted turns so callers see the + /// canonical turn history instead of the runtime context cache. pub async fn get_messages(&self, session_id: &str) -> BitFunResult> { - self.history_manager.get_messages(session_id).await + if self.config.enable_persistence { + if let Some(workspace_path) = self.effective_session_workspace_path(session_id).await { + let messages = self + .rebuild_messages_from_turns(&workspace_path, session_id) + .await?; + if !messages.is_empty() { + return Ok(messages); + } + } + } + + Ok(self.context_store.get_context_messages(session_id)) } - /// Get session's message history (paginated) + /// Get a paginated best-effort message view for the session. pub async fn get_messages_paginated( &self, session_id: &str, limit: usize, before_message_id: Option<&str>, ) -> BitFunResult<(Vec, bool)> { - self.history_manager - .get_messages_paginated(session_id, limit, before_message_id) - .await + let messages = self.get_messages(session_id).await?; + Ok(Self::paginate_messages(&messages, limit, before_message_id)) } - /// Get session's context messages (may be compressed) + /// Get session's runtime context messages (may already include compressed reminders). pub async fn get_context_messages(&self, session_id: &str) -> BitFunResult> { - // Get context messages from compression manager (may be compressed) - let context_messages = self.compression_manager.get_context_messages(session_id); + let context_messages = self.context_store.get_context_messages(session_id); Ok(context_messages) } - /// Add message to session + /// Add a semantic message to the runtime context cache and immediately refresh the current + /// turn snapshot so crashes do not lose the latest in-memory context change. pub async fn add_message(&self, session_id: &str, message: Message) -> BitFunResult<()> { - // Add to history manager - self.history_manager - .add_message(session_id, message.clone()) - .await?; - // Also add to compression manager - self.compression_manager - .add_message(session_id, message) - .await?; + self.context_store.add_message(session_id, message); + self.persist_current_turn_context_snapshot_best_effort(session_id, "context_message_added") + .await; Ok(()) } + /// Replace the runtime context cache for a session and immediately refresh the current turn + /// snapshot. This is primarily used after compression rewrites the model-visible context. + pub async fn replace_context_messages(&self, session_id: &str, messages: Vec) { + self.context_store.replace_context(session_id, messages); + self.persist_current_turn_context_snapshot_best_effort(session_id, "context_replaced") + .await; + } + /// Get dialog turn count pub fn get_turn_count(&self, session_id: &str) -> usize { self.sessions @@ -1167,11 +1271,6 @@ impl SessionManager { .map(|s| s.compression_state.clone()) } - /// Get compression manager (for ExecutionEngine use) - pub fn get_compression_manager(&self) -> Arc { - self.compression_manager.clone() - } - /// Update session's compression state pub async fn update_compression_state( &self, diff --git a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts index 4628cca6..a56129e3 100644 --- a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts +++ b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts @@ -95,13 +95,6 @@ export interface UpdateSessionModelRequest { } -export interface Message { - id: string; - role: 'user' | 'assistant' | 'tool' | 'system'; - content: any; - timestamp: number; -} - export interface ModeInfo { id: string; name: string; @@ -286,21 +279,6 @@ export class AgentAPI { } } - - async getSessionMessages(sessionId: string, limit?: number): Promise { - try { - return await api.invoke('get_session_messages', { - request: { - sessionId, - limit - } - }); - } catch (error) { - throw createTauriCommandError('get_session_messages', error, { sessionId, limit }); - } - } - - async confirmToolExecution(sessionId: string, toolId: string): Promise { try { await api.invoke('confirm_tool_execution', {