diff --git a/src/apps/desktop/src/api/agentic_api.rs b/src/apps/desktop/src/api/agentic_api.rs index cfbe2fda..5c753959 100644 --- a/src/apps/desktop/src/api/agentic_api.rs +++ b/src/apps/desktop/src/api/agentic_api.rs @@ -80,6 +80,17 @@ pub struct StartDialogTurnResponse { pub message: String, } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CompactSessionRequest { + pub session_id: String, + pub workspace_path: Option, + #[serde(default)] + pub remote_connection_id: Option, + #[serde(default)] + pub remote_ssh_host: Option, +} + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct EnsureCoordinatorSessionRequest { @@ -355,6 +366,54 @@ pub async fn start_dialog_turn( }) } +#[tauri::command] +pub async fn compact_session( + coordinator: State<'_, Arc>, + app_state: State<'_, AppState>, + request: CompactSessionRequest, +) -> Result { + let session_id = request.session_id.trim(); + if session_id.is_empty() { + return Err("session_id is required".to_string()); + } + + if coordinator + .get_session_manager() + .get_session(session_id) + .is_none() + { + let workspace_path = request + .workspace_path + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + .ok_or_else(|| { + "workspace_path is required when the session is not loaded".to_string() + })?; + let effective = desktop_effective_session_storage_path( + &app_state, + workspace_path, + request.remote_connection_id.as_deref(), + request.remote_ssh_host.as_deref(), + ) + .await; + coordinator + .restore_session(&effective, session_id) + .await + .map_err(|e| format!("Failed to restore session before compacting: {}", e))?; + } + + coordinator + .compact_session_manually(session_id.to_string()) + .await + .map_err(|e| format!("Failed to compact session: {}", e))?; + + Ok(StartDialogTurnResponse { + success: true, + message: "Session compaction started".to_string(), + }) +} + #[tauri::command] pub async fn ensure_assistant_bootstrap( coordinator: State<'_, Arc>, diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 617fe242..519c2a87 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -315,6 +315,7 @@ pub async fn run() { api::agentic_api::update_session_model, api::agentic_api::ensure_coordinator_session, api::agentic_api::start_dialog_turn, + api::agentic_api::compact_session, api::agentic_api::ensure_assistant_bootstrap, api::agentic_api::cancel_dialog_turn, api::agentic_api::delete_session, diff --git a/src/crates/core/src/agentic/coordination/coordinator.rs b/src/crates/core/src/agentic/coordination/coordinator.rs index 4c1729f8..b0dae0da 100644 --- a/src/crates/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/core/src/agentic/coordination/coordinator.rs @@ -11,7 +11,7 @@ use crate::agentic::core::{ use crate::agentic::events::{ AgenticEvent, EventPriority, EventQueue, EventRouter, EventSubscriber, }; -use crate::agentic::execution::{ExecutionContext, ExecutionEngine}; +use crate::agentic::execution::{ContextCompactionOutcome, ExecutionContext, ExecutionEngine}; use crate::agentic::image_analysis::ImageContextData; use crate::agentic::round_preempt::DialogRoundPreemptSource; use crate::agentic::session::SessionManager; @@ -29,6 +29,9 @@ use tokio::sync::mpsc; use tokio::time::{sleep, Duration, Instant}; use tokio_util::sync::CancellationToken; +const MANUAL_COMPACTION_COMMAND: &str = "/compact"; +const CONTEXT_COMPRESSION_TOOL_NAME: &str = "ContextCompression"; + /// Subagent execution result /// /// Contains the text response after subagent execution @@ -259,6 +262,139 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet ) } + fn estimate_context_tokens(messages: &[Message]) -> usize { + let mut cloned = messages.to_vec(); + cloned.iter_mut().map(|message| message.get_tokens()).sum() + } + + fn manual_compaction_metadata() -> serde_json::Value { + serde_json::json!({ + "kind": "manual_compaction", + "command": MANUAL_COMPACTION_COMMAND, + }) + } + + fn build_manual_compaction_round_completed( + turn_id: &str, + outcome: &ContextCompactionOutcome, + context_window: usize, + threshold: f32, + ) -> crate::service::session::ModelRoundData { + use crate::service::session::{ModelRoundData, ToolCallData, ToolItemData, ToolResultData}; + + let completed_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let started_at = completed_at.saturating_sub(outcome.duration_ms); + + ModelRoundData { + id: format!("{}-manual-compaction-round", turn_id), + turn_id: turn_id.to_string(), + round_index: 0, + timestamp: started_at, + text_items: Vec::new(), + tool_items: vec![ToolItemData { + id: outcome.compression_id.clone(), + tool_name: CONTEXT_COMPRESSION_TOOL_NAME.to_string(), + tool_call: ToolCallData { + input: serde_json::json!({ + "trigger": "manual", + "tokens_before": outcome.tokens_before, + "context_window": context_window, + "threshold": threshold, + }), + id: outcome.compression_id.clone(), + }, + tool_result: Some(ToolResultData { + result: serde_json::json!({ + "compression_count": outcome.compression_count, + "tokens_before": outcome.tokens_before, + "tokens_after": outcome.tokens_after, + "compression_ratio": outcome.compression_ratio, + "duration": outcome.duration_ms, + "applied": outcome.applied, + "has_summary": outcome.has_summary, + "summary_source": outcome.summary_source, + }), + success: true, + result_for_assistant: None, + error: None, + duration_ms: Some(outcome.duration_ms), + }), + ai_intent: None, + start_time: started_at, + end_time: Some(completed_at), + duration_ms: Some(outcome.duration_ms), + order_index: Some(0), + is_subagent_item: None, + parent_task_tool_id: None, + subagent_session_id: None, + status: Some("completed".to_string()), + }], + thinking_items: Vec::new(), + start_time: started_at, + end_time: Some(completed_at), + status: "completed".to_string(), + } + } + + fn build_manual_compaction_round_failed( + turn_id: &str, + compression_id: String, + error: &str, + context_window: usize, + threshold: f32, + ) -> crate::service::session::ModelRoundData { + use crate::service::session::{ModelRoundData, ToolCallData, ToolItemData, ToolResultData}; + + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + ModelRoundData { + id: format!("{}-manual-compaction-round", turn_id), + turn_id: turn_id.to_string(), + round_index: 0, + timestamp, + text_items: Vec::new(), + tool_items: vec![ToolItemData { + id: compression_id.clone(), + tool_name: CONTEXT_COMPRESSION_TOOL_NAME.to_string(), + tool_call: ToolCallData { + input: serde_json::json!({ + "trigger": "manual", + "context_window": context_window, + "threshold": threshold, + "summary_source": "none", + }), + id: compression_id, + }, + tool_result: Some(ToolResultData { + result: serde_json::Value::Null, + success: false, + result_for_assistant: None, + error: Some(error.to_string()), + duration_ms: None, + }), + ai_intent: None, + start_time: timestamp, + end_time: Some(timestamp), + duration_ms: Some(0), + order_index: Some(0), + is_subagent_item: None, + parent_task_tool_id: None, + subagent_session_id: None, + status: Some("error".to_string()), + }], + thinking_items: Vec::new(), + start_time: timestamp, + end_time: Some(timestamp), + status: "error".to_string(), + } + } + pub fn new( session_manager: Arc, execution_engine: Arc, @@ -764,6 +900,164 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet .await } + /// Compact the active session context as a persisted maintenance turn. + pub async fn compact_session_manually(&self, session_id: String) -> BitFunResult<()> { + let session = self + .session_manager + .get_session(&session_id) + .ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", session_id)))?; + + match &session.state { + SessionState::Idle => {} + SessionState::Processing { + current_turn_id, + phase, + } => { + return Err(BitFunError::Validation(format!( + "Session is still processing: current_turn_id={}, phase={:?}", + current_turn_id, phase + ))); + } + SessionState::Error { error, .. } => { + return Err(BitFunError::Validation(format!( + "Session must be idle before manual compaction: {}", + error + ))); + } + } + + let context_messages = self + .session_manager + .get_context_messages(&session_id) + .await?; + let needs_restore = if context_messages.is_empty() { + true + } else { + context_messages.len() == 1 && session.dialog_turn_ids.len() > 0 + }; + + if needs_restore { + let workspace_path = session.config.workspace_path.as_deref().ok_or_else(|| { + BitFunError::Validation(format!( + "workspace_path is required when restoring session: {}", + session_id + )) + })?; + self.session_manager + .restore_session(Path::new(workspace_path), &session_id) + .await?; + } + + let context_messages = self + .session_manager + .get_context_messages(&session_id) + .await?; + let turn_index = self.session_manager.get_turn_count(&session_id); + let user_message_metadata = Some(Self::manual_compaction_metadata()); + let turn_id = self + .session_manager + .start_maintenance_turn( + &session_id, + MANUAL_COMPACTION_COMMAND.to_string(), + None, + user_message_metadata.clone(), + ) + .await?; + + self.emit_event(AgenticEvent::DialogTurnStarted { + session_id: session_id.clone(), + turn_id: turn_id.clone(), + turn_index, + user_input: MANUAL_COMPACTION_COMMAND.to_string(), + original_user_input: None, + user_message_metadata: user_message_metadata.clone(), + subagent_parent_info: None, + }) + .await; + + let current_tokens = Self::estimate_context_tokens(&context_messages); + let context_window = session.config.max_context_tokens; + let compression_threshold = session.config.compression_threshold; + + match self + .execution_engine + .compact_session_context( + &session_id, + &turn_id, + context_messages, + current_tokens, + context_window, + "manual", + crate::agentic::session::CompressionTailPolicy::CollapseAll, + ) + .await + { + Ok(outcome) => { + let model_round = Self::build_manual_compaction_round_completed( + &turn_id, + &outcome, + context_window, + compression_threshold, + ); + self.session_manager + .complete_maintenance_turn( + &session_id, + &turn_id, + vec![model_round], + outcome.duration_ms, + ) + .await?; + self.session_manager + .update_session_state(&session_id, SessionState::Idle) + .await?; + + self.emit_event(AgenticEvent::DialogTurnCompleted { + session_id, + turn_id, + total_rounds: 1, + total_tools: 1, + duration_ms: outcome.duration_ms, + subagent_parent_info: None, + }) + .await; + + Ok(()) + } + Err(err) => { + let error_text = err.to_string(); + let compression_id = format!("compression_{}", uuid::Uuid::new_v4()); + let model_round = Self::build_manual_compaction_round_failed( + &turn_id, + compression_id, + &error_text, + context_window, + compression_threshold, + ); + let _ = self + .session_manager + .fail_maintenance_turn( + &session_id, + &turn_id, + error_text.clone(), + vec![model_round], + ) + .await; + let _ = self + .session_manager + .update_session_state(&session_id, SessionState::Idle) + .await; + self.emit_event(AgenticEvent::DialogTurnFailed { + session_id, + turn_id, + error: error_text.clone(), + subagent_parent_info: None, + }) + .await; + Err(err) + } + } + } + async fn start_dialog_turn_internal( &self, session_id: String, diff --git a/src/crates/core/src/agentic/core/message.rs b/src/crates/core/src/agentic/core/message.rs index 6612237c..acd8263b 100644 --- a/src/crates/core/src/agentic/core/message.rs +++ b/src/crates/core/src/agentic/core/message.rs @@ -71,6 +71,8 @@ pub struct MessageMetadata { pub enum MessageSemanticKind { ActualUserInput, InternalReminder, + CompressionBoundaryMarker, + CompressionSummary, /// Shown in chat after Computer use; omitted from model API requests (see `build_ai_messages_for_send`). ComputerUseVerificationScreenshot, /// Full-screen snapshot appended after mutating ComputerUse tool results within the same turn; diff --git a/src/crates/core/src/agentic/core/messages_helper.rs b/src/crates/core/src/agentic/core/messages_helper.rs index 519c40fb..8e5871d8 100644 --- a/src/crates/core/src/agentic/core/messages_helper.rs +++ b/src/crates/core/src/agentic/core/messages_helper.rs @@ -1,4 +1,4 @@ -use super::{Message, MessageContent, MessageRole}; +use super::{CompressedTodoItem, CompressedTodoSnapshot, Message, MessageContent, MessageRole}; use crate::util::types::Message as AIMessage; use log::warn; pub struct MessageHelper; @@ -136,22 +136,60 @@ impl MessageHelper { } } - pub fn get_last_todo(messages: &[Message]) -> Option { + pub fn get_last_todo_snapshot(messages: &[Message]) -> Option { for message in messages.iter().rev() { if message.role == MessageRole::Assistant { - match &message.content { - MessageContent::Mixed { tool_calls, .. } => { - if tool_calls.is_empty() { + let MessageContent::Mixed { tool_calls, .. } = &message.content else { + continue; + }; + if tool_calls.is_empty() { + continue; + } + for tool_call in tool_calls.iter().rev() { + if tool_call.tool_name != "TodoWrite" { + continue; + } + + let todos = tool_call.arguments.get("todos")?.as_array()?; + let mut compressed_todos = Vec::new(); + + for todo in todos { + let Some(todo_object) = todo.as_object() else { continue; - } - for tool_call in tool_calls.iter().rev() { - if tool_call.tool_name == "TodoWrite" { - let todos = tool_call.arguments.get("todos").unwrap_or_default(); - return Some(todos.to_string()); - } - } + }; + let Some(content) = todo_object + .get("content") + .and_then(serde_json::Value::as_str) + .map(str::trim) + .filter(|content| !content.is_empty()) + else { + continue; + }; + + let status = todo_object + .get("status") + .and_then(serde_json::Value::as_str) + .unwrap_or("pending"); + let id = todo_object + .get("id") + .and_then(serde_json::Value::as_str) + .map(str::to_string); + + compressed_todos.push(CompressedTodoItem { + id, + content: content.to_string(), + status: status.to_string(), + }); + } + + if compressed_todos.is_empty() { + continue; } - _ => {} + + return Some(CompressedTodoSnapshot { + todos: compressed_todos, + summary: None, + }); } } } diff --git a/src/crates/core/src/agentic/core/state.rs b/src/crates/core/src/agentic/core/state.rs index 4ed2f55c..851e3bce 100644 --- a/src/crates/core/src/agentic/core/state.rs +++ b/src/crates/core/src/agentic/core/state.rs @@ -26,6 +26,7 @@ pub enum SessionState { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum ProcessingPhase { Starting, // Starting + Compacting, // Context compaction Thinking, // AI thinking Streaming, // Streaming output ToolCalling, // Tool calling diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 7c892d95..82e33c03 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -11,7 +11,7 @@ use crate::agentic::image_analysis::{ build_multimodal_message_with_images, process_image_contexts_for_provider, ImageContextData, ImageLimits, }; -use crate::agentic::session::{ContextCompressor, SessionManager}; +use crate::agentic::session::{CompressionTailPolicy, ContextCompressor, SessionManager}; use crate::agentic::tools::{get_all_registered_tools, SubagentParentInfo}; use crate::agentic::WorkspaceBinding; use crate::infrastructure::ai::get_global_ai_client_factory; @@ -39,6 +39,19 @@ impl Default for ExecutionEngineConfig { } } +#[derive(Debug, Clone)] +pub struct ContextCompactionOutcome { + pub compression_id: String, + pub compression_count: usize, + pub tokens_before: usize, + pub tokens_after: usize, + pub compression_ratio: f64, + pub duration_ms: u64, + pub has_summary: bool, + pub summary_source: String, + pub applied: bool, +} + /// Execution engine pub struct ExecutionEngine { round_executor: Arc, @@ -441,6 +454,7 @@ impl ExecutionEngine { context_window: usize, tool_definitions: &Option>, system_prompt_message: Message, + tail_policy: CompressionTailPolicy, ) -> BitFunResult)>> { let event_subagent_parent_info = subagent_parent_info.map(|info| info.clone().into()); let mut session = self @@ -483,7 +497,13 @@ impl ExecutionEngine { // Execute compression match self .context_compressor - .compress_turns(session_id, context_window, turn_index_to_keep, turns) + .compress_turns( + session_id, + context_window, + turn_index_to_keep, + turns, + tail_policy, + ) .await { Ok(compression_result) => { @@ -529,6 +549,11 @@ impl ExecutionEngine { compression_ratio: (compressed_tokens as f64) / (current_tokens as f64), duration_ms, has_summary: compression_result.has_model_summary, + summary_source: if compression_result.has_model_summary { + "model".to_string() + } else { + "local_fallback".to_string() + }, subagent_parent_info: event_subagent_parent_info.clone(), }, EventPriority::Normal, @@ -556,6 +581,175 @@ impl ExecutionEngine { } } + /// Compact the current session context outside the normal dialog execution loop. + /// Always emits compression started/completed/failed events for the provided turn. + pub async fn compact_session_context( + &self, + session_id: &str, + dialog_turn_id: &str, + messages: Vec, + current_tokens: usize, + context_window: usize, + trigger: &str, + tail_policy: CompressionTailPolicy, + ) -> BitFunResult { + let mut session = self + .session_manager + .get_session(session_id) + .ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", session_id)))?; + let start_time = std::time::Instant::now(); + let compression_id = format!("compression_{}", uuid::Uuid::new_v4()); + + self.emit_event( + AgenticEvent::ContextCompressionStarted { + session_id: session_id.to_string(), + turn_id: dialog_turn_id.to_string(), + compression_id: compression_id.clone(), + trigger: trigger.to_string(), + tokens_before: current_tokens, + context_window, + threshold: session.config.compression_threshold, + subagent_parent_info: None, + }, + EventPriority::Normal, + ) + .await; + + let turns = self + .context_compressor + .collect_all_turns_for_manual_compaction(session_id, messages)?; + + if turns.is_empty() { + let duration_ms = start_time.elapsed().as_millis() as u64; + let tokens_after = current_tokens; + let compression_ratio = if current_tokens == 0 { + 1.0 + } else { + (tokens_after as f64) / (current_tokens as f64) + }; + + self.emit_event( + AgenticEvent::ContextCompressionCompleted { + session_id: session_id.to_string(), + turn_id: dialog_turn_id.to_string(), + compression_id: compression_id.clone(), + compression_count: session.compression_state.compression_count, + tokens_before: current_tokens, + tokens_after, + compression_ratio, + duration_ms, + has_summary: false, + summary_source: "none".to_string(), + subagent_parent_info: None, + }, + EventPriority::Normal, + ) + .await; + + return Ok(ContextCompactionOutcome { + compression_id, + compression_count: session.compression_state.compression_count, + tokens_before: current_tokens, + tokens_after, + compression_ratio, + duration_ms, + has_summary: false, + summary_source: "none".to_string(), + applied: false, + }); + } + + match self + .context_compressor + .compress_turns( + session_id, + context_window, + turns.len(), + turns, + tail_policy, + ) + .await + { + Ok(compression_result) => { + let mut compressed_messages = compression_result.messages; + self.session_manager + .replace_context_messages(session_id, compressed_messages.clone()) + .await; + + session.compression_state.increment_compression_count(); + let compression_count = session.compression_state.compression_count; + let _ = self + .session_manager + .update_compression_state(session_id, session.compression_state.clone()) + .await; + + let duration_ms = start_time.elapsed().as_millis() as u64; + let tokens_after = compressed_messages + .iter_mut() + .map(|message| message.get_tokens()) + .sum::(); + let compression_ratio = if current_tokens == 0 { + 1.0 + } else { + (tokens_after as f64) / (current_tokens as f64) + }; + + self.emit_event( + AgenticEvent::ContextCompressionCompleted { + session_id: session_id.to_string(), + turn_id: dialog_turn_id.to_string(), + compression_id: compression_id.clone(), + compression_count, + tokens_before: current_tokens, + tokens_after, + compression_ratio, + duration_ms, + has_summary: compression_result.has_model_summary, + summary_source: if compression_result.has_model_summary { + "model".to_string() + } else { + "local_fallback".to_string() + }, + subagent_parent_info: None, + }, + EventPriority::Normal, + ) + .await; + + Ok(ContextCompactionOutcome { + compression_id, + compression_count, + tokens_before: current_tokens, + tokens_after, + compression_ratio, + duration_ms, + has_summary: compression_result.has_model_summary, + summary_source: if compression_result.has_model_summary { + "model".to_string() + } else { + "local_fallback".to_string() + }, + applied: true, + }) + } + Err(err) => { + self.emit_event( + AgenticEvent::ContextCompressionFailed { + session_id: session_id.to_string(), + turn_id: dialog_turn_id.to_string(), + compression_id: compression_id.clone(), + error: err.to_string(), + subagent_parent_info: None, + }, + EventPriority::High, + ) + .await; + + Err(BitFunError::Session(err.to_string())) + } + } + } + /// Execute a complete dialog turn (may contain multiple model rounds) /// Returns ExecutionResult containing the final response and all newly generated messages pub async fn execute_dialog_turn( @@ -909,6 +1103,7 @@ impl ExecutionEngine { context_window, &tool_definitions, system_prompt_message.clone(), + CompressionTailPolicy::PreserveLiveFrontier, ) .await { diff --git a/src/crates/core/src/agentic/insights/collector.rs b/src/crates/core/src/agentic/insights/collector.rs index d519f7c3..be698d2b 100644 --- a/src/crates/core/src/agentic/insights/collector.rs +++ b/src/crates/core/src/agentic/insights/collector.rs @@ -442,6 +442,10 @@ fn rebuild_messages_from_turns(turns: &[DialogTurnData]) -> Vec { let mut messages = Vec::new(); for turn in turns { + if !turn.kind.is_model_visible() { + continue; + } + let user_ts = UNIX_EPOCH + Duration::from_millis(turn.start_time); let mut user_msg = Message::user(turn.user_message.content.clone()); user_msg.timestamp = user_ts; diff --git a/src/crates/core/src/agentic/session/compression/compressor.rs b/src/crates/core/src/agentic/session/compression/compressor.rs index c74e8b2d..3338ed07 100644 --- a/src/crates/core/src/agentic/session/compression/compressor.rs +++ b/src/crates/core/src/agentic/session/compression/compressor.rs @@ -3,11 +3,11 @@ //! Responsible only for transforming a session context into a compressed one. use super::fallback::{ - build_structured_compression_reminder, CompressionFallbackOptions, CompressionReminder, + build_structured_compression_summary, CompressionFallbackOptions, CompressionSummaryArtifact, }; use crate::agentic::core::{ - render_system_reminder, CompressionPayload, Message, MessageHelper, MessageRole, - MessageSemanticKind, + render_system_reminder, CompressedTodoSnapshot, CompressionEntry, CompressionPayload, Message, + MessageHelper, MessageRole, MessageSemanticKind, }; use crate::infrastructure::ai::{get_global_ai_client_factory, AIClient}; use crate::util::errors::{BitFunError, BitFunResult}; @@ -62,6 +62,12 @@ pub struct CompressionResult { pub has_model_summary: bool, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CompressionTailPolicy { + CollapseAll, + PreserveLiveFrontier, +} + /// Stateless context compression service. pub struct ContextCompressor { config: CompressionConfig, @@ -86,16 +92,13 @@ impl ContextCompressor { result } - /// Returns `(turn_index_to_keep, turns)`. - /// If `turn_index_to_keep` is 0, no compression is needed. - pub async fn preprocess_turns( + fn collect_conversation_turns( &self, session_id: &str, - context_window: usize, mut messages: Vec, - ) -> BitFunResult<(usize, Vec)> { + ) -> BitFunResult> { debug!( - "Starting session context compression analysis: session_id={}", + "Collecting conversation turns for compression: session_id={}", session_id ); @@ -113,10 +116,10 @@ impl ContextCompressor { if all_messages.is_empty() { debug!( - "Session context is empty, no compression needed: session_id={}", + "Session context is empty, no compression candidates: session_id={}", session_id ); - return Ok((0, Vec::new())); + return Ok(Vec::new()); } let mut turns_messages = MessageHelper::group_messages_by_turns(all_messages); @@ -125,13 +128,38 @@ impl ContextCompressor { .iter_mut() .map(|turn| turn.iter_mut().map(|m| m.get_tokens()).sum::()) .collect(); - { - let turns_msg_num: Vec = turns_messages.iter().map(|t| t.len()).collect(); - debug!( - "Session has {} turn(s), messages per turn: {:?}, tokens per turn: {:?}", - turns_count, turns_msg_num, turns_tokens - ); + let turns_msg_num: Vec = turns_messages.iter().map(|turn| turn.len()).collect(); + debug!( + "Session has {} turn(s), messages per turn: {:?}, tokens per turn: {:?}", + turns_count, turns_msg_num, turns_tokens + ); + + Ok(turns_messages + .into_iter() + .zip(turns_tokens) + .map(|(msgs, tokens)| TurnWithTokens::new(msgs, tokens)) + .collect()) + } + + /// Returns `(turn_index_to_keep, turns)`. + /// If `turn_index_to_keep` is 0, no compression is needed. + pub async fn preprocess_turns( + &self, + session_id: &str, + context_window: usize, + messages: Vec, + ) -> BitFunResult<(usize, Vec)> { + debug!( + "Starting session context compression analysis: session_id={}", + session_id + ); + + let turns = self.collect_conversation_turns(session_id, messages)?; + if turns.is_empty() { + return Ok((0, Vec::new())); } + let turns_count = turns.len(); + let turns_tokens: Vec = turns.iter().map(|turn| turn.tokens).collect(); let token_limit_keep_turns = (context_window as f32 * self.config.keep_turns_ratio) as usize; @@ -151,20 +179,25 @@ impl ContextCompressor { session_id, turn_index_to_keep ); - let turns: Vec = turns_messages - .into_iter() - .zip(turns_tokens.into_iter()) - .map(|(msgs, tokens)| TurnWithTokens::new(msgs, tokens)) - .collect(); Ok((turn_index_to_keep, turns)) } + /// Collect all non-system conversation turns for a full manual compaction pass. + pub fn collect_all_turns_for_manual_compaction( + &self, + session_id: &str, + messages: Vec, + ) -> BitFunResult> { + self.collect_conversation_turns(session_id, messages) + } + pub async fn compress_turns( &self, session_id: &str, context_window: usize, turn_index_to_keep: usize, mut turns: Vec, + tail_policy: CompressionTailPolicy, ) -> BitFunResult { if turns.is_empty() { debug!("No turns need compression: session_id={}", session_id); @@ -184,19 +217,11 @@ impl ContextCompressor { has_model_summary: false, }); }; - let last_user_message = { - last_turn_messages - .first() - .cloned() - .and_then(|first_message| { - if first_message.role == MessageRole::User { - Some(first_message) - } else { - None - } - }) - }; - let last_todo = MessageHelper::get_last_todo(last_turn_messages); + let last_user_message = last_turn_messages + .iter() + .find(|message| message.is_actual_user_message()) + .cloned(); + let last_todo = MessageHelper::get_last_todo_snapshot(last_turn_messages); trace!("Last user message: {:?}", last_user_message); trace!("Last todo: {:?}", last_todo); let turns_to_keep = turns.split_off(turn_index_to_keep); @@ -204,31 +229,27 @@ impl ContextCompressor { let mut compressed_messages = Vec::new(); let mut has_model_summary = false; if !turns.is_empty() { - let reminder = self + let mut summary_artifact = self .execute_compression_with_fallback(turns, context_window) .await?; - trace!("Compression reminder generated"); - has_model_summary = reminder.used_model_summary; - compressed_messages.push(self.create_reminder_message(reminder)); + if turns_to_keep.is_empty() { + self.append_todo_snapshot(&mut summary_artifact, last_todo.clone()); + } + trace!("Compression summary artifact generated"); + has_model_summary = summary_artifact.used_model_summary; + let (boundary_message, summary_message) = self.create_summary_turn(summary_artifact); + compressed_messages.push(boundary_message); + compressed_messages.push(summary_message); } if !turns_to_keep.is_empty() { for turn in turns_to_keep { compressed_messages.extend(turn.messages); } - } else { + } else if matches!(tail_policy, CompressionTailPolicy::PreserveLiveFrontier) { if let Some(last_user_message) = last_user_message { compressed_messages.push(last_user_message); } - if let Some(last_todo) = last_todo { - compressed_messages.push( - Message::user(render_system_reminder(&format!( - "Below is the most recent to-do list. Continue working on these tasks:\n{}", - last_todo - ))) - .with_semantic_kind(MessageSemanticKind::InternalReminder), - ); - } } debug!( @@ -243,17 +264,83 @@ impl ContextCompressor { }) } - fn create_reminder_message(&self, reminder: CompressionReminder) -> Message { - Message::user(render_system_reminder(&reminder.model_text)) - .with_semantic_kind(MessageSemanticKind::InternalReminder) - .with_compression_payload(reminder.payload) + fn create_summary_turn( + &self, + summary_artifact: CompressionSummaryArtifact, + ) -> (Message, Message) { + let boundary = Message::user(render_system_reminder(&Self::render_boundary_marker_text( + summary_artifact.used_model_summary, + ))) + .with_semantic_kind(MessageSemanticKind::CompressionBoundaryMarker); + + let summary = Message::assistant(summary_artifact.summary_text) + .with_semantic_kind(MessageSemanticKind::CompressionSummary) + .with_compression_payload(summary_artifact.payload); + + (boundary, summary) + } + + fn append_todo_snapshot( + &self, + summary_artifact: &mut CompressionSummaryArtifact, + todo_snapshot: Option, + ) { + let Some(todo_snapshot) = todo_snapshot else { + return; + }; + + let todo_text = Self::render_todo_snapshot(&todo_snapshot); + if !todo_text.is_empty() { + summary_artifact.summary_text = format!( + "{}\n\nLatest task list snapshot at the compression boundary:\n{}", + summary_artifact.summary_text.trim_end(), + todo_text + ); + } + + summary_artifact + .payload + .entries + .push(CompressionEntry::Turn { + turn_id: None, + messages: Vec::new(), + todo: Some(todo_snapshot), + }); + } + + fn render_todo_snapshot(todo_snapshot: &CompressedTodoSnapshot) -> String { + if todo_snapshot.todos.is_empty() { + return todo_snapshot.summary.clone().unwrap_or_default(); + } + + let mut lines: Vec = todo_snapshot + .todos + .iter() + .map(|todo| format!("- [{}] {}", todo.status, todo.content)) + .collect(); + + if let Some(summary) = &todo_snapshot.summary { + if !summary.trim().is_empty() { + lines.push(format!("Task list note: {}", summary.trim())); + } + } + + lines.join("\n") + } + + fn render_boundary_marker_text(used_model_summary: bool) -> String { + let mut msg = "Earlier conversation was compressed for context management. Use the summary in the next assistant message as historical context.".to_string(); + if !used_model_summary { + msg.push_str(" This compressed context is a partial reconstructed record. Message text, tool arguments, task lists, and tool results may be truncated or omitted."); + } + msg } async fn execute_compression_with_fallback( &self, turns_to_compress: Vec, context_window: usize, - ) -> BitFunResult { + ) -> BitFunResult { let summary_result = match get_global_ai_client_factory().await { Ok(ai_client_factory) => match ai_client_factory .get_client_by_func_agent("compression") @@ -277,8 +364,11 @@ impl ContextCompressor { match summary_result { Ok(summary) => { trace!("Compression summary: {}", summary); - Ok(CompressionReminder { - model_text: format!("Previous conversation is summarized below:\n{}", summary), + Ok(CompressionSummaryArtifact { + summary_text: format!( + "Previous conversation is summarized below:\n{}", + summary + ), payload: CompressionPayload::from_summary(summary), used_model_summary: true, }) @@ -288,14 +378,14 @@ impl ContextCompressor { "Model-based compression failed, falling back to structured local compression: {}", err ); - let reminder = build_structured_compression_reminder( + let summary_artifact = build_structured_compression_summary( turns_to_compress .into_iter() .map(|turn| turn.messages) .collect(), &self.build_fallback_options(context_window), ); - Ok(reminder) + Ok(summary_artifact) } } } @@ -311,6 +401,26 @@ impl ContextCompressor { } } + fn normalize_model_summary_output(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + + if let Some(summary) = extract_tag_content(trimmed, "summary") { + let summary = summary.trim(); + if !summary.is_empty() { + return Some(summary.to_string()); + } + } + + if trimmed.contains("") { + return None; + } + + Some(trimmed.to_string()) + } + async fn execute_compression( &self, ai_client: Arc, @@ -444,8 +554,15 @@ Be thorough and precise. Do not lose important technical details from either the system_message_for_summary: Message, messages: Vec, ) -> BitFunResult { - self.generate_summary_with_retry(ai_client, system_message_for_summary, messages, 2) - .await + let raw_summary = self + .generate_summary_with_retry(ai_client, system_message_for_summary, messages, 2) + .await?; + Self::normalize_model_summary_output(&raw_summary).ok_or_else(|| { + BitFunError::AIClient( + "Model-based compression returned without a usable " + .to_string(), + ) + }) } async fn generate_summary_with_retry( @@ -511,7 +628,9 @@ Be thorough and precise. Do not lose important technical details from either the r#"Your task is to create a detailed summary of the conversation so far, paying close attention to the user's explicit requests and your previous actions. This summary should be thorough in capturing technical details, code patterns, and architectural decisions that would be essential for continuing development work without losing context. -Before providing your final summary, wrap your analysis in tags to organize your thoughts and ensure you've covered all necessary points. In your analysis process: +Before providing your final summary, wrap your analysis in tags to organize your thoughts and ensure you've covered all necessary points. Then output the final retained summary in tags. +Important: only the content inside will be kept as compressed history. The section is transient and will be discarded, so do not put any required final information only in . +In your analysis process: 1. Chronologically analyze each message and section of the conversation. For each section thoroughly identify: - The user's explicit requests and intents @@ -596,3 +715,193 @@ Please provide your summary based on the conversation so far, following this str .to_string() } } + +fn extract_tag_content<'a>(text: &'a str, tag: &str) -> Option<&'a str> { + let open = format!("<{tag}>"); + let close = format!(""); + let start = text.find(&open)?; + let after_open = &text[start + open.len()..]; + let end = after_open.find(&close)?; + Some(&after_open[..end]) +} + +#[cfg(test)] +mod tests { + use super::{CompressionTailPolicy, ContextCompressor, TurnWithTokens}; + use crate::agentic::core::{ + render_system_reminder, CompressionEntry, CompressionPayload, Message, MessageSemanticKind, + }; + + fn make_turn(messages: Vec) -> TurnWithTokens { + let mut messages_with_tokens = messages; + let tokens = messages_with_tokens + .iter_mut() + .map(|message| message.get_tokens()) + .sum(); + TurnWithTokens::new(messages_with_tokens, tokens) + } + + fn todo_turn() -> TurnWithTokens { + make_turn(vec![ + Message::user("Continue the refactor".to_string()), + Message::assistant_with_tools( + "Planning next steps".to_string(), + vec![crate::agentic::core::ToolCall { + tool_id: "todo_1".to_string(), + tool_name: "TodoWrite".to_string(), + arguments: serde_json::json!({ + "todos": [ + {"content": "Update compressor", "status": "in_progress"}, + {"content": "Add regression tests", "status": "pending"} + ] + }), + is_error: false, + }], + ), + ]) + } + + #[tokio::test] + async fn collapse_all_creates_closed_compression_turn() { + let compressor = ContextCompressor::new(Default::default()); + let result = compressor + .compress_turns( + "session", + 8000, + 1, + vec![todo_turn()], + CompressionTailPolicy::CollapseAll, + ) + .await + .expect("compression succeeds"); + + assert_eq!(result.messages.len(), 2); + assert_eq!( + result.messages[0].metadata.semantic_kind, + Some(MessageSemanticKind::CompressionBoundaryMarker) + ); + assert_eq!( + result.messages[1].metadata.semantic_kind, + Some(MessageSemanticKind::CompressionSummary) + ); + + let boundary_text = match &result.messages[0].content { + crate::agentic::core::MessageContent::Text(text) => text, + _ => panic!("expected boundary marker text"), + }; + assert!(boundary_text.contains("partial reconstructed record")); + + let summary_text = match &result.messages[1].content { + crate::agentic::core::MessageContent::Text(text) => text, + _ => panic!("expected assistant text summary"), + }; + assert!(summary_text.contains("Latest task list snapshot at the compression boundary")); + assert!(summary_text.contains("Update compressor")); + } + + #[tokio::test] + async fn preserve_live_frontier_keeps_last_user_after_summary_turn() { + let compressor = ContextCompressor::new(Default::default()); + let result = compressor + .compress_turns( + "session", + 8000, + 1, + vec![todo_turn()], + CompressionTailPolicy::PreserveLiveFrontier, + ) + .await + .expect("compression succeeds"); + + assert_eq!(result.messages.len(), 3); + assert_eq!( + result.messages[2].role, + crate::agentic::core::MessageRole::User + ); + assert!(result.messages[2].is_actual_user_message()); + } + + #[test] + fn synthetic_summary_turn_payload_remains_atomic_on_recompression() { + let marker = Message::user(render_system_reminder( + "Earlier conversation was compressed.", + )) + .with_semantic_kind(MessageSemanticKind::CompressionBoundaryMarker); + let summary = Message::assistant("Summary text".to_string()) + .with_semantic_kind(MessageSemanticKind::CompressionSummary) + .with_compression_payload(CompressionPayload::from_summary("Summary text".to_string())); + + let summary_artifact = + crate::agentic::session::compression::fallback::build_structured_compression_summary( + vec![vec![marker, summary]], + &crate::agentic::session::compression::fallback::CompressionFallbackOptions { + max_tokens: 10_000, + user_chars: 120, + assistant_chars: 120, + tool_arg_chars: 80, + tool_command_chars: 80, + }, + ); + + assert!(matches!( + &summary_artifact.payload.entries[0], + CompressionEntry::ModelSummary { text } if text == "Summary text" + )); + } + + #[test] + fn model_summary_boundary_marker_omits_partial_record_notice() { + let marker = ContextCompressor::render_boundary_marker_text(true); + assert!(!marker.contains("partial reconstructed record")); + assert!(marker.contains("historical context")); + } + + #[test] + fn model_summary_output_uses_summary_tag_body_only() { + let normalized = ContextCompressor::normalize_model_summary_output( + "\ninternal reasoning\n\n\nFinal summary\n", + ); + + assert_eq!(normalized.as_deref(), Some("Final summary")); + } + + #[test] + fn model_summary_output_without_tags_keeps_plain_text() { + let normalized = + ContextCompressor::normalize_model_summary_output("Plain summary without tags"); + + assert_eq!(normalized.as_deref(), Some("Plain summary without tags")); + } + + #[test] + fn model_summary_output_with_analysis_but_no_summary_is_rejected() { + let normalized = ContextCompressor::normalize_model_summary_output( + "\ninternal reasoning\n", + ); + + assert_eq!(normalized, None); + } + + #[tokio::test] + async fn manual_compaction_turn_collection_includes_all_non_system_turns() { + let compressor = ContextCompressor::new(Default::default()); + let messages = vec![ + Message::system("system".to_string()), + Message::user("First request".to_string()), + Message::assistant("First reply".to_string()), + Message::user("Second request".to_string()), + Message::assistant("Second reply".to_string()), + ]; + + let manual_turns = compressor + .collect_all_turns_for_manual_compaction("session", messages.clone()) + .expect("manual collection succeeds"); + let (_, passive_turns) = compressor + .preprocess_turns("session", 8_000, messages) + .await + .expect("passive preprocessing succeeds"); + + assert_eq!(manual_turns.len(), 2); + assert_eq!(manual_turns.len(), passive_turns.len()); + } +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/builder.rs b/src/crates/core/src/agentic/session/compression/fallback/builder.rs index 0376f4ea..232569a1 100644 --- a/src/crates/core/src/agentic/session/compression/fallback/builder.rs +++ b/src/crates/core/src/agentic/session/compression/fallback/builder.rs @@ -145,8 +145,11 @@ fn flush_turn_entry( } fn extract_nested_compression_entries(message: &Message) -> Option> { - if message.metadata.semantic_kind != Some(MessageSemanticKind::InternalReminder) { - return None; + match message.metadata.semantic_kind { + Some(MessageSemanticKind::CompressionBoundaryMarker) => return Some(Vec::new()), + Some(MessageSemanticKind::CompressionSummary) + | Some(MessageSemanticKind::InternalReminder) => {} + _ => return None, } if let Some(payload) = message.metadata.compression_payload.clone() { @@ -155,6 +158,10 @@ fn extract_nested_compression_entries(message: &Message) -> Option text.clone(), MessageContent::Multimodal { text, .. } => text.clone(), diff --git a/src/crates/core/src/agentic/session/compression/fallback/mod.rs b/src/crates/core/src/agentic/session/compression/fallback/mod.rs index 3c074f69..2e46b959 100644 --- a/src/crates/core/src/agentic/session/compression/fallback/mod.rs +++ b/src/crates/core/src/agentic/session/compression/fallback/mod.rs @@ -8,18 +8,18 @@ use builder::build_entries_from_turns; use payload::trim_payload_to_budget; use render::render_payload_for_model; -pub use types::{CompressionFallbackOptions, CompressionReminder}; +pub use types::{CompressionFallbackOptions, CompressionSummaryArtifact}; -pub fn build_structured_compression_reminder( +pub fn build_structured_compression_summary( turns: Vec>, options: &CompressionFallbackOptions, -) -> CompressionReminder { +) -> CompressionSummaryArtifact { let entries = build_entries_from_turns(turns, options); let trimmed_payload = trim_payload_to_budget(entries, options); - let model_text = render_payload_for_model(&trimmed_payload); + let summary_text = render_payload_for_model(&trimmed_payload); - CompressionReminder { - model_text, + CompressionSummaryArtifact { + summary_text, payload: trimmed_payload, used_model_summary: false, } diff --git a/src/crates/core/src/agentic/session/compression/fallback/payload.rs b/src/crates/core/src/agentic/session/compression/fallback/payload.rs index 03560abb..29497e80 100644 --- a/src/crates/core/src/agentic/session/compression/fallback/payload.rs +++ b/src/crates/core/src/agentic/session/compression/fallback/payload.rs @@ -160,6 +160,6 @@ fn flush_rebuilt_turn( fn estimate_payload_tokens(payload: &CompressionPayload) -> usize { let rendered = render_payload_for_model(payload); - let mut reminder = Message::user(render_system_reminder(&rendered)); - reminder.get_tokens() + let mut synthetic_message = Message::user(render_system_reminder(&rendered)); + synthetic_message.get_tokens() } diff --git a/src/crates/core/src/agentic/session/compression/fallback/render.rs b/src/crates/core/src/agentic/session/compression/fallback/render.rs index f5ca3986..4e635d12 100644 --- a/src/crates/core/src/agentic/session/compression/fallback/render.rs +++ b/src/crates/core/src/agentic/session/compression/fallback/render.rs @@ -5,17 +5,11 @@ use serde_json::{json, Value}; pub(super) fn render_payload_for_model(payload: &CompressionPayload) -> String { if payload.entries.is_empty() { - return [ - "Earlier conversation has been condensed for context management.", - "The omitted history could not be kept within the available context budget.", - ] - .join("\n\n"); + return "No detailed historical entries fit within the remaining context budget." + .to_string(); } - let mut sections = vec![ - "Earlier conversation has been condensed for context management.".to_string(), - "The history below is a partial record. Message text, tool arguments, and task lists may have been truncated or omitted during compression. All tool results have been cleared from this compressed history. Treat it as approximate historical context rather than a complete verbatim transcript.".to_string(), - ]; + let mut sections = Vec::new(); for (index, entry) in payload.entries.iter().enumerate() { match entry { @@ -28,8 +22,9 @@ pub(super) fn render_payload_for_model(payload: &CompressionPayload) -> String { } CompressionEntry::Turn { messages, todo, .. } => { let mut lines = vec![format!("Historical turn {}:", index + 1)]; + let mut previous_role = None; for message in messages { - render_compressed_message(&mut lines, message); + render_compressed_message(&mut lines, message, &mut previous_role); } if let Some(todo) = todo { lines.push("Latest task list for this turn:".to_string()); @@ -54,15 +49,24 @@ pub(super) fn render_payload_for_model(payload: &CompressionPayload) -> String { sections.join("\n\n") } -fn render_compressed_message(lines: &mut Vec, message: &CompressedMessage) { +fn render_compressed_message( + lines: &mut Vec, + message: &CompressedMessage, + previous_role: &mut Option, +) { let role_label = match message.role { CompressedMessageRole::User => "User", CompressedMessageRole::Assistant => "Assistant", }; + let is_new_role_segment = *previous_role != Some(message.role); if let Some(text) = message.text.as_ref() { - lines.push(format!("{role_label}: {text}")); - } else { + if is_new_role_segment { + lines.push(format!("{role_label}: {text}")); + } else { + lines.push(text.clone()); + } + } else if is_new_role_segment { lines.push(format!("{role_label}:")); } @@ -77,6 +81,8 @@ fn render_compressed_message(lines: &mut Vec, message: &CompressedMessag } lines.push(format!("Tool call: {}", rendered)); } + + *previous_role = Some(message.role); } fn render_tool_arguments(arguments: &Value) -> String { diff --git a/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs b/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs index c87f9e12..b5ee3be2 100644 --- a/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs +++ b/src/crates/core/src/agentic/session/compression/fallback/sanitize.rs @@ -1,12 +1,13 @@ use super::types::CompressionFallbackOptions; -use crate::agentic::core::{CompressedTodoItem, CompressedTodoSnapshot}; +use crate::agentic::core::{strip_prompt_markup, CompressedTodoItem, CompressedTodoSnapshot}; use serde_json::{Map, Value}; pub(super) fn sanitize_user_text( text: &str, options: &CompressionFallbackOptions, ) -> Option { - sanitize_text(text, options.user_chars) + let normalized = strip_prompt_markup(text); + sanitize_text(&normalized, options.user_chars) } pub(super) fn sanitize_assistant_text( diff --git a/src/crates/core/src/agentic/session/compression/fallback/tests.rs b/src/crates/core/src/agentic/session/compression/fallback/tests.rs index c3720c04..a55f0fcc 100644 --- a/src/crates/core/src/agentic/session/compression/fallback/tests.rs +++ b/src/crates/core/src/agentic/session/compression/fallback/tests.rs @@ -1,7 +1,7 @@ -use super::{build_structured_compression_reminder, CompressionFallbackOptions}; +use super::{build_structured_compression_summary, CompressionFallbackOptions}; use crate::agentic::core::{ - render_system_reminder, CompressedMessageRole, CompressionEntry, CompressionPayload, Message, - MessageSemanticKind, ToolCall, ToolResult, + render_system_reminder, render_user_query, CompressedMessageRole, CompressionEntry, + CompressionPayload, Message, MessageSemanticKind, ToolCall, ToolResult, }; use serde_json::json; @@ -40,7 +40,7 @@ fn clears_tool_results_from_compressed_history() { image_attachments: None, }); - let reminder = build_structured_compression_reminder( + let summary_artifact = build_structured_compression_summary( vec![vec![ Message::user("inspect".to_string()), assistant, @@ -49,7 +49,7 @@ fn clears_tool_results_from_compressed_history() { &default_options(), ); - let turn = match &reminder.payload.entries[0] { + let turn = match &summary_artifact.payload.entries[0] { CompressionEntry::Turn { messages, .. } => messages, _ => panic!("expected turn entry"), }; @@ -59,8 +59,11 @@ fn clears_tool_results_from_compressed_history() { .expect("assistant message"); assert_eq!(assistant_message.tool_calls.len(), 1); - assert!(!reminder.model_text.contains("Tool result:")); - assert!(reminder.model_text.contains("All tool results have been cleared")); + assert!(!summary_artifact.summary_text.contains("Tool result:")); + assert!(!summary_artifact + .summary_text + .contains("All tool results have been cleared")); + assert!(summary_artifact.summary_text.contains("Historical turn 1:")); } #[test] @@ -70,11 +73,99 @@ fn reuses_existing_compression_payload_atomically() { .with_semantic_kind(MessageSemanticKind::InternalReminder) .with_compression_payload(CompressionPayload::from_summary(prior_summary.clone())); - let reminder = - build_structured_compression_reminder(vec![vec![reminder_message]], &default_options()); + let summary_artifact = + build_structured_compression_summary(vec![vec![reminder_message]], &default_options()); assert!(matches!( - &reminder.payload.entries[0], + &summary_artifact.payload.entries[0], CompressionEntry::ModelSummary { text } if text == &prior_summary )); } + +#[test] +fn strips_user_query_markup_from_fallback_user_messages() { + let raw = format!( + "{}\n{}", + render_user_query("Implement manual /compact"), + render_system_reminder("Keep responses concise") + ); + + let summary_artifact = + build_structured_compression_summary(vec![vec![Message::user(raw)]], &default_options()); + + let turn = match &summary_artifact.payload.entries[0] { + CompressionEntry::Turn { messages, .. } => messages, + _ => panic!("expected turn entry"), + }; + let user_message = turn + .iter() + .find(|message| message.role == CompressedMessageRole::User) + .expect("user message"); + + assert_eq!( + user_message.text.as_deref(), + Some("Implement manual /compact") + ); + assert!(!summary_artifact.summary_text.contains("")); + assert!(!summary_artifact.summary_text.contains("")); +} + +#[test] +fn drops_system_reminder_only_user_messages_from_fallback_summary() { + let summary_artifact = build_structured_compression_summary( + vec![vec![Message::user(render_system_reminder( + "Summarized context boundary marker", + ))]], + &default_options(), + ); + + assert!(summary_artifact.payload.entries.is_empty()); + assert_eq!( + summary_artifact.summary_text, + "No detailed historical entries fit within the remaining context budget." + ); +} + +#[test] +fn groups_consecutive_assistant_messages_under_single_role_header() { + let summary_artifact = build_structured_compression_summary( + vec![vec![ + Message::user("Update the component styling.".to_string()), + Message::assistant_with_tools( + "".to_string(), + vec![ToolCall { + tool_id: "tool_1".to_string(), + tool_name: "Read".to_string(), + arguments: json!({ + "file_path": "/workspace/example.txt" + }), + is_error: false, + }], + ), + Message::assistant_with_tools( + "".to_string(), + vec![ToolCall { + tool_id: "tool_2".to_string(), + tool_name: "Edit".to_string(), + arguments: json!({ + "file_path": "/workspace/example.txt", + "old_string": "before", + "new_string": "after" + }), + is_error: false, + }], + ), + Message::assistant("Updated the styling changes.".to_string()), + ]], + &default_options(), + ); + + let assistant_headers = summary_artifact.summary_text.matches("Assistant:").count(); + assert_eq!(assistant_headers, 1); + assert!(summary_artifact.summary_text.contains( + "Assistant:\nTool call: Read {\"file_path\":\"/workspace/example.txt\"}" + )); + assert!(summary_artifact + .summary_text + .contains("Updated the styling changes.")); +} diff --git a/src/crates/core/src/agentic/session/compression/fallback/types.rs b/src/crates/core/src/agentic/session/compression/fallback/types.rs index fc1be95c..dd27488d 100644 --- a/src/crates/core/src/agentic/session/compression/fallback/types.rs +++ b/src/crates/core/src/agentic/session/compression/fallback/types.rs @@ -10,8 +10,8 @@ pub struct CompressionFallbackOptions { } #[derive(Debug, Clone)] -pub struct CompressionReminder { - pub model_text: String, +pub struct CompressionSummaryArtifact { + pub summary_text: String, pub payload: CompressionPayload, pub used_model_summary: bool, } diff --git a/src/crates/core/src/agentic/session/session_manager.rs b/src/crates/core/src/agentic/session/session_manager.rs index e8b82b04..090a6640 100644 --- a/src/crates/core/src/agentic/session/session_manager.rs +++ b/src/crates/core/src/agentic/session/session_manager.rs @@ -11,7 +11,7 @@ use crate::agentic::persistence::PersistenceManager; use crate::agentic::session::SessionContextStore; use crate::infrastructure::ai::get_global_ai_client_factory; use crate::service::session::{ - DialogTurnData, ModelRoundData, TextItemData, TurnStatus, UserMessageData, + DialogTurnData, DialogTurnKind, ModelRoundData, TextItemData, TurnStatus, UserMessageData, }; use crate::service::snapshot::ensure_snapshot_manager_for_workspace; use crate::util::errors::{BitFunError, BitFunResult}; @@ -128,6 +128,10 @@ impl SessionManager { let mut messages = Vec::new(); for turn in turns { + if !turn.kind.is_model_visible() { + continue; + } + let user_message = if let Some(metadata) = &turn.user_message.metadata { let images = metadata .get("images") @@ -826,18 +830,16 @@ impl SessionManager { // ============ Dialog Turn Management ============ - /// Start a new dialog turn - /// turn_id: Optional frontend-specified ID, if None then backend generates - /// Returns: turn_id - pub async fn start_dialog_turn( + async fn start_persisted_turn( &self, session_id: &str, + kind: DialogTurnKind, user_input: String, turn_id: Option, - image_contexts: Option>, + context_message: Option, + processing_phase: ProcessingPhase, user_message_metadata: Option, ) -> BitFunResult { - // Check if session exists let session = self .get_session(session_id) .ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", session_id)))?; @@ -851,7 +853,6 @@ impl SessionManager { })?; let turn_index = session.dialog_turn_ids.len(); - // Pass frontend's turnId let turn = DialogTurn::new( session_id.to_string(), turn_index, @@ -860,33 +861,24 @@ impl SessionManager { ); let turn_id = turn.turn_id.clone(); - // 1. Add to session and update state to Processing (includes current_turn_id) if let Some(mut session) = self.sessions.get_mut(session_id) { session.dialog_turn_ids.push(turn_id.clone()); session.state = SessionState::Processing { current_turn_id: turn_id.clone(), - phase: ProcessingPhase::Starting, + phase: processing_phase, }; session.updated_at = SystemTime::now(); session.last_activity_at = SystemTime::now(); } - // 2. Add the user message to the in-memory context cache. - let user_message = - if let Some(images) = image_contexts.as_ref().filter(|v| !v.is_empty()).cloned() { - Message::user_multimodal(user_input.clone(), images) - .with_turn_id(turn_id.clone()) - .with_semantic_kind(MessageSemanticKind::ActualUserInput) - } else { - Message::user(user_input.clone()) - .with_turn_id(turn_id.clone()) - .with_semantic_kind(MessageSemanticKind::ActualUserInput) - }; - self.context_store.add_message(session_id, user_message); + if let Some(message) = context_message { + self.context_store + .add_message(session_id, message.with_turn_id(turn_id.clone())); + } - // 3. Persist if self.config.enable_persistence { - let turn_data = DialogTurnData::new( + let turn_data = DialogTurnData::new_with_kind( + kind, turn_id.clone(), turn_index, session_id.to_string(), @@ -914,10 +906,67 @@ impl SessionManager { self.persist_context_snapshot_for_turn_best_effort(session_id, turn_index, "turn_started") .await; - debug!( - "Starting dialog turn: turn_id={}, turn_index={}", - turn_id, turn_index - ); + Ok(turn_id) + } + + /// Start a new dialog turn + /// turn_id: Optional frontend-specified ID, if None then backend generates + /// Returns: turn_id + pub async fn start_dialog_turn( + &self, + session_id: &str, + user_input: String, + turn_id: Option, + image_contexts: Option>, + user_message_metadata: Option, + ) -> BitFunResult { + let user_message = + if let Some(images) = image_contexts.as_ref().filter(|v| !v.is_empty()).cloned() { + Message::user_multimodal(user_input.clone(), images) + .with_semantic_kind(MessageSemanticKind::ActualUserInput) + } else { + Message::user(user_input.clone()) + .with_semantic_kind(MessageSemanticKind::ActualUserInput) + }; + + let turn_id = self + .start_persisted_turn( + session_id, + DialogTurnKind::UserDialog, + user_input, + turn_id, + Some(user_message), + ProcessingPhase::Starting, + user_message_metadata, + ) + .await?; + + debug!("Starting dialog turn: turn_id={}", turn_id); + + Ok(turn_id) + } + + /// Start a persisted maintenance turn that should not enter model-visible context. + pub async fn start_maintenance_turn( + &self, + session_id: &str, + display_message: String, + turn_id: Option, + user_message_metadata: Option, + ) -> BitFunResult { + let turn_id = self + .start_persisted_turn( + session_id, + DialogTurnKind::ManualCompaction, + display_message, + turn_id, + None, + ProcessingPhase::Compacting, + user_message_metadata, + ) + .await?; + + debug!("Starting maintenance turn: turn_id={}", turn_id); Ok(turn_id) } @@ -1069,6 +1118,117 @@ impl SessionManager { Ok(()) } + /// Complete a maintenance turn and persist its synthetic model round payload. + pub async fn complete_maintenance_turn( + &self, + session_id: &str, + turn_id: &str, + model_rounds: Vec, + duration_ms: u64, + ) -> BitFunResult<()> { + let workspace_path = self + .effective_session_workspace_path(session_id) + .await + .ok_or_else(|| { + BitFunError::Validation(format!( + "Session workspace_path is missing: {}", + session_id + )) + })?; + let turn_index = self + .sessions + .get(session_id) + .and_then(|session| session.dialog_turn_ids.iter().position(|id| id == turn_id)) + .ok_or_else(|| BitFunError::NotFound(format!("Dialog turn not found: {}", turn_id)))?; + let mut turn = self + .persistence_manager + .load_dialog_turn(&workspace_path, session_id, turn_index) + .await? + .ok_or_else(|| BitFunError::NotFound(format!("Dialog turn not found: {}", turn_id)))?; + + let completion_timestamp = SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + turn.model_rounds = model_rounds; + turn.status = TurnStatus::Completed; + turn.duration_ms = Some(duration_ms); + turn.end_time = Some(completion_timestamp); + + self.persist_context_snapshot_for_turn_best_effort( + session_id, + turn.turn_index, + "maintenance_turn_completed", + ) + .await; + + if self.config.enable_persistence { + self.persistence_manager + .save_dialog_turn(&workspace_path, &turn) + .await?; + } + + Ok(()) + } + + /// Mark a maintenance turn as failed while preserving its synthetic tool state. + pub async fn fail_maintenance_turn( + &self, + session_id: &str, + turn_id: &str, + error: String, + model_rounds: Vec, + ) -> BitFunResult<()> { + let workspace_path = self + .effective_session_workspace_path(session_id) + .await + .ok_or_else(|| { + BitFunError::Validation(format!( + "Session workspace_path is missing: {}", + session_id + )) + })?; + let turn_index = self + .sessions + .get(session_id) + .and_then(|session| session.dialog_turn_ids.iter().position(|id| id == turn_id)) + .ok_or_else(|| BitFunError::NotFound(format!("Dialog turn not found: {}", turn_id)))?; + let mut turn = self + .persistence_manager + .load_dialog_turn(&workspace_path, session_id, turn_index) + .await? + .ok_or_else(|| BitFunError::NotFound(format!("Dialog turn not found: {}", turn_id)))?; + + let completion_timestamp = SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + turn.model_rounds = model_rounds; + turn.status = TurnStatus::Error; + turn.duration_ms = Some(completion_timestamp.saturating_sub(turn.start_time)); + turn.end_time = Some(completion_timestamp); + + self.persist_context_snapshot_for_turn_best_effort( + session_id, + turn.turn_index, + "maintenance_turn_failed", + ) + .await; + + if self.config.enable_persistence { + self.persistence_manager + .save_dialog_turn(&workspace_path, &turn) + .await?; + } + + debug!( + "Maintenance turn marked as failed: turn_id={}, turn_index={}, error={}", + turn_id, turn.turn_index, error + ); + + Ok(()) + } + /// Persist a completed `/btw` side-question turn into an existing child session. pub async fn persist_btw_turn( &self, @@ -1472,3 +1632,43 @@ impl SessionManager { debug!("Cleanup task started"); } } + +#[cfg(test)] +mod tests { + use super::SessionManager; + use crate::service::session::{DialogTurnData, DialogTurnKind, UserMessageData}; + + #[test] + fn build_messages_from_turns_skips_manual_compaction_turns() { + let turns = vec![ + DialogTurnData::new( + "turn-1".to_string(), + 0, + "session-1".to_string(), + UserMessageData { + id: "user-1".to_string(), + content: "hello".to_string(), + timestamp: 1, + metadata: None, + }, + ), + DialogTurnData::new_with_kind( + DialogTurnKind::ManualCompaction, + "turn-2".to_string(), + 1, + "session-1".to_string(), + UserMessageData { + id: "user-2".to_string(), + content: "/compact".to_string(), + timestamp: 2, + metadata: None, + }, + ), + ]; + + let messages = SessionManager::build_messages_from_turns(&turns); + + assert_eq!(messages.len(), 1); + assert!(messages[0].is_actual_user_message()); + } +} diff --git a/src/crates/core/src/service/remote_connect/remote_server.rs b/src/crates/core/src/service/remote_connect/remote_server.rs index bb9e0958..a60fe585 100644 --- a/src/crates/core/src/service/remote_connect/remote_server.rs +++ b/src/crates/core/src/service/remote_connect/remote_server.rs @@ -651,6 +651,10 @@ fn turns_to_chat_messages(turns: &[crate::service::session::DialogTurnData]) -> let mut result = Vec::new(); for turn in turns { + if !turn.kind.is_model_visible() { + continue; + } + let images = turn .user_message .metadata diff --git a/src/crates/core/src/service/session/types.rs b/src/crates/core/src/service/session/types.rs index 975bc39a..bc905d56 100644 --- a/src/crates/core/src/service/session/types.rs +++ b/src/crates/core/src/service/session/types.rs @@ -134,6 +134,10 @@ pub struct DialogTurnData { /// Timestamp pub timestamp: u64, + /// Turn kind + #[serde(default, alias = "turn_kind")] + pub kind: DialogTurnKind, + /// User message #[serde(alias = "user_message")] pub user_message: UserMessageData, @@ -158,6 +162,26 @@ pub struct DialogTurnData { pub status: TurnStatus, } +/// Persisted dialog turn kind. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum DialogTurnKind { + UserDialog, + ManualCompaction, +} + +impl Default for DialogTurnKind { + fn default() -> Self { + Self::UserDialog + } +} + +impl DialogTurnKind { + pub fn is_model_visible(self) -> bool { + matches!(self, Self::UserDialog) + } +} + /// User message data #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -482,6 +506,23 @@ impl DialogTurnData { turn_index: usize, session_id: String, user_message: UserMessageData, + ) -> Self { + Self::new_with_kind( + DialogTurnKind::UserDialog, + turn_id, + turn_index, + session_id, + user_message, + ) + } + + /// Creates a new dialog turn with an explicit persisted kind. + pub fn new_with_kind( + kind: DialogTurnKind, + turn_id: String, + turn_index: usize, + session_id: String, + user_message: UserMessageData, ) -> Self { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -493,6 +534,7 @@ impl DialogTurnData { turn_index, session_id, timestamp: now, + kind, user_message, model_rounds: Vec::new(), start_time: now, @@ -522,3 +564,48 @@ impl DialogTurnData { .sum() } } + +#[cfg(test)] +mod tests { + use super::{DialogTurnData, DialogTurnKind, UserMessageData}; + + #[test] + fn dialog_turn_kind_defaults_to_user_dialog_for_legacy_payloads() { + let payload = serde_json::json!({ + "turnId": "turn-1", + "turnIndex": 0, + "sessionId": "session-1", + "timestamp": 1, + "userMessage": { + "id": "user-1", + "content": "hello", + "timestamp": 1 + }, + "modelRounds": [], + "startTime": 1, + "status": "completed" + }); + + let turn: DialogTurnData = + serde_json::from_value(payload).expect("legacy payload should deserialize"); + + assert_eq!(turn.kind, DialogTurnKind::UserDialog); + } + + #[test] + fn dialog_turn_data_new_defaults_to_user_dialog() { + let turn = DialogTurnData::new( + "turn-1".to_string(), + 0, + "session-1".to_string(), + UserMessageData { + id: "user-1".to_string(), + content: "hello".to_string(), + timestamp: 1, + metadata: None, + }, + ); + + assert_eq!(turn.kind, DialogTurnKind::UserDialog); + } +} diff --git a/src/crates/events/src/agentic.rs b/src/crates/events/src/agentic.rs index 28a7d6f7..fcf70855 100644 --- a/src/crates/events/src/agentic.rs +++ b/src/crates/events/src/agentic.rs @@ -126,6 +126,7 @@ pub enum AgenticEvent { compression_ratio: f64, duration_ms: u64, has_summary: bool, + summary_source: String, subagent_parent_info: Option, }, diff --git a/src/crates/transport/src/adapters/tauri.rs b/src/crates/transport/src/adapters/tauri.rs index 5a933e10..2fd16f4d 100644 --- a/src/crates/transport/src/adapters/tauri.rs +++ b/src/crates/transport/src/adapters/tauri.rs @@ -306,6 +306,7 @@ impl TransportAdapter for TauriTransportAdapter { compression_ratio, duration_ms, has_summary, + summary_source, } => { self.app_handle.emit( "agentic://context-compression-completed", @@ -319,6 +320,7 @@ impl TransportAdapter for TauriTransportAdapter { "compressionRatio": compression_ratio, "durationMs": duration_ms, "hasSummary": has_summary, + "summarySource": summary_source, "subagentParentInfo": subagent_parent_info, }), )?; diff --git a/src/web-ui/src/component-library/components/registry.tsx b/src/web-ui/src/component-library/components/registry.tsx index 468851b3..d38785c2 100644 --- a/src/web-ui/src/component-library/components/registry.tsx +++ b/src/web-ui/src/component-library/components/registry.tsx @@ -1562,6 +1562,7 @@ console.log(user.greet());`); { compression_count: 3, has_summary: true, + summary_source: 'model', tokens_before: 50000, tokens_after: 15000, compression_ratio: 0.7, @@ -1571,6 +1572,26 @@ console.log(user.greet());`); )} /> +

上下文压缩 - 本地 fallback

+ +

上下文压缩 - 执行中

= ({ dispatchInput({ type: 'SET_VALUE', payload: text }); inputValueRef.current = text; - const isBtwCommand = text.trim().toLowerCase().startsWith('/btw'); + const trimmedLower = text.trim().toLowerCase(); + const isBtwCommand = trimmedLower.startsWith('/btw'); + const isCompactCommand = trimmedLower.startsWith('/compact'); const isProcessing = !!derivedState?.isProcessing; // Don't queue /btw while the main session is processing; /btw runs independently. - if (derivedState?.isProcessing && !isBtwCommand) { + if (derivedState?.isProcessing && !isBtwCommand && !isCompactCommand) { setQueuedInput(text); } @@ -728,8 +730,8 @@ export const ChatInput: React.FC = ({ return; } - // When idle, keep the picker for mode switching, but don't interfere with /btw being a real command. - if (!isBtwCommand) { + // When idle, keep the picker for mode switching, but don't interfere with executable slash commands. + if (!isBtwCommand && !isCompactCommand) { setSlashCommandState({ isActive: true, kind: 'all', @@ -796,6 +798,67 @@ export const ChatInput: React.FC = ({ dispatchInput({ type: 'SET_VALUE', payload: message }); } }, [currentSessionId, derivedState, inputState.value, isBtwSession, setQueuedInput, t, workspacePath]); + + const submitCompactFromInput = useCallback(async () => { + if (!effectiveTargetSessionId || !effectiveTargetSession) { + notificationService.error( + t('chatInput.compactNoSession', { defaultValue: 'No active session for /compact' }) + ); + return; + } + + if (derivedState?.isProcessing) { + notificationService.warning( + t('chatInput.compactBusy', { + defaultValue: 'Wait until the session is idle before using /compact.', + }) + ); + return; + } + + const message = inputState.value.trim(); + if (!/^\/compact\s*$/i.test(message)) { + notificationService.warning( + t('chatInput.compactUsage', { defaultValue: 'Use /compact without extra arguments.' }) + ); + return; + } + + dispatchInput({ type: 'CLEAR_VALUE' }); + setQueuedInput(null); + setSlashCommandState({ isActive: false, kind: 'modes', query: '', selectedIndex: 0 }); + + try { + const { agentAPI } = await import('@/infrastructure/api'); + await agentAPI.compactSession({ + sessionId: effectiveTargetSessionId, + workspacePath: effectiveTargetSession.workspacePath, + remoteConnectionId: effectiveTargetSession.remoteConnectionId, + remoteSshHost: effectiveTargetSession.remoteSshHost, + }); + } catch (error) { + log.error('Failed to trigger /compact', { + error, + sessionId: effectiveTargetSessionId, + }); + dispatchInput({ type: 'ACTIVATE' }); + dispatchInput({ type: 'SET_VALUE', payload: message }); + notificationService.error( + error instanceof Error ? error.message : t('error.unknown'), + { + title: t('chatInput.compactFailed', { defaultValue: 'Session compaction failed' }), + duration: 5000, + } + ); + } + }, [ + derivedState?.isProcessing, + effectiveTargetSession, + effectiveTargetSessionId, + inputState.value, + setQueuedInput, + t, + ]); const handleSendOrCancel = useCallback(async () => { if (!derivedState) return; @@ -823,6 +886,18 @@ export const ChatInput: React.FC = ({ await submitBtwFromInput(); return; } + + if (/^\/compact\s*$/i.test(message)) { + await submitCompactFromInput(); + return; + } + + if (message.toLowerCase().startsWith('/compact')) { + notificationService.warning( + t('chatInput.compactUsage', { defaultValue: 'Use /compact without extra arguments.' }) + ); + return; + } // Add to history before clearing (session-scoped) if (effectiveTargetSessionId) { @@ -856,6 +931,8 @@ export const ChatInput: React.FC = ({ effectiveTargetSessionId, setQueuedInput, submitBtwFromInput, + submitCompactFromInput, + t, ]); const getFilteredIncrementalModes = useCallback(() => { @@ -918,16 +995,20 @@ export const ChatInput: React.FC = ({ }, [requestModeChange]); const getFilteredActions = useCallback(() => { - if (isBtwSession) { - return []; - } - // For now we only support one action: /btw. const items: SlashActionItem[] = [ + ...(isBtwSession + ? [] + : [{ + kind: 'action' as const, + id: 'btw', + command: '/btw', + label: t('btw.title', { defaultValue: 'Side question' }), + }]), { kind: 'action', - id: 'btw', - command: '/btw', - label: t('btw.title', { defaultValue: 'Side question' }), + id: 'compact', + command: '/compact', + label: t('chatInput.compactAction', { defaultValue: 'Compact session' }), }, ]; @@ -960,25 +1041,32 @@ export const ChatInput: React.FC = ({ }, [canSwitchModes, getFilteredActions, incrementalCodeModes, slashCommandState.query]); const selectSlashCommandAction = useCallback((actionId: string) => { - if (isBtwSession) return; - if (actionId !== 'btw') return; - const raw = inputState.value || ''; const lower = raw.trimStart().toLowerCase(); let next = raw; - if (!lower.startsWith('/btw')) { - next = '/btw '; - } else { - // Normalize to "/btw " + rest, preserving any already typed question. - const m = raw.match(/^(\s*)\/btw\b/i); - if (m) { - const leadingWs = m[1] || ''; - const rest = raw.slice(m[0].length); - next = `${leadingWs}/btw ${rest.trimStart()}`; - } else { + + if (actionId === 'btw') { + if (isBtwSession) { + return; + } + if (!lower.startsWith('/btw')) { next = '/btw '; + } else { + // Normalize to "/btw " + rest, preserving any already typed question. + const m = raw.match(/^(\s*)\/btw\b/i); + if (m) { + const leadingWs = m[1] || ''; + const rest = raw.slice(m[0].length); + next = `${leadingWs}/btw ${rest.trimStart()}`; + } else { + next = '/btw '; + } } + } else if (actionId === 'compact') { + next = '/compact'; + } else { + return; } dispatchInput({ type: 'SET_VALUE', payload: next }); diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts index 8f6bc688..84fb7394 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/EventHandlerModule.ts @@ -730,6 +730,8 @@ function handleDialogTurnStarted(context: FlowChatContext, event: any): void { const displayContent = originalUserInput ? cleanRemoteUserInput(originalUserInput) : cleanRemoteUserInput(userInput || ''); + const turnKind = + userMessageMetadata?.kind === 'manual_compaction' ? 'manual_compaction' : 'user_dialog'; const freshSession = store.getState().sessions.get(sessionId); const dialogTurn = freshSession?.dialogTurns.find((turn: DialogTurn) => turn.id === turnId); @@ -737,11 +739,13 @@ function handleDialogTurnStarted(context: FlowChatContext, event: any): void { const newTurn: DialogTurn = { id: turnId, sessionId, + kind: turnKind, userMessage: { id: `user_remote_${Date.now()}`, content: displayContent, timestamp: Date.now(), hasImages, + metadata: userMessageMetadata, images, }, modelRounds: [], @@ -768,6 +772,11 @@ function handleDialogTurnStarted(context: FlowChatContext, event: any): void { if (typeof turnIndex === 'number' && dialogTurn.backendTurnIndex === undefined) { store.updateDialogTurn(sessionId, turnId, turn => ({ ...turn, + kind: turn.kind || turnKind, + userMessage: { + ...turn.userMessage, + metadata: turn.userMessage.metadata || userMessageMetadata, + }, backendTurnIndex: turnIndex, })); } @@ -1115,6 +1124,15 @@ function handleCompressionStarted(_context: FlowChatContext, event: any): void { log.debug('Dialog turn not found (compression started)', { turnId }); return; } + + const currentState = stateMachineManager.getCurrentState(sessionId); + if (isStreamingExecutionState(currentState)) { + void stateMachineManager + .transition(sessionId, SessionExecutionEvent.COMPACTION_STARTED) + .catch(error => { + log.error('State machine transition failed on compression start', { sessionId, error }); + }); + } const compressionItem: FlowToolItem = { id: compressionId, @@ -1159,7 +1177,7 @@ function handleCompressionStarted(_context: FlowChatContext, event: any): void { function handleCompressionCompleted(context: FlowChatContext, event: any): void { const { sessionId, turnId, compressionId, compressionCount, - tokensBefore, tokensAfter, compressionRatio, durationMs + tokensBefore, tokensAfter, compressionRatio, durationMs, hasSummary, summarySource } = event; log.info('Context compression completed', { @@ -1177,6 +1195,8 @@ function handleCompressionCompleted(context: FlowChatContext, event: any): void tokens_after: tokensAfter, compression_ratio: compressionRatio, duration: durationMs, + has_summary: hasSummary, + summary_source: summarySource, }, success: true, duration_ms: durationMs || 0 diff --git a/src/web-ui/src/flow_chat/services/flow-chat-manager/PersistenceModule.ts b/src/web-ui/src/flow_chat/services/flow-chat-manager/PersistenceModule.ts index eef64c2d..6fbdc952 100644 --- a/src/web-ui/src/flow_chat/services/flow-chat-manager/PersistenceModule.ts +++ b/src/web-ui/src/flow_chat/services/flow-chat-manager/PersistenceModule.ts @@ -294,27 +294,35 @@ export async function saveAllInProgressTurns(context: FlowChatContext): Promise< * Convert FlowChat DialogTurn to backend format */ export function convertDialogTurnToBackendFormat(dialogTurn: DialogTurn, turnIndex: number): any { + const userMetadata = dialogTurn.userMessage.metadata + ? { ...dialogTurn.userMessage.metadata } + : undefined; + const mergedUserMetadata = + dialogTurn.userMessage.images?.length + ? { + ...(userMetadata || {}), + images: dialogTurn.userMessage.images.map(img => ({ + id: img.id, + name: img.name, + data_url: img.dataUrl, + image_path: img.imagePath, + mime_type: img.mimeType, + })), + original_text: dialogTurn.userMessage.content, + } + : userMetadata; + return { turnId: dialogTurn.id, turnIndex, sessionId: dialogTurn.sessionId, timestamp: dialogTurn.startTime, + kind: dialogTurn.kind || 'user_dialog', userMessage: { id: dialogTurn.userMessage.id, content: dialogTurn.userMessage.content, timestamp: dialogTurn.userMessage.timestamp, - metadata: dialogTurn.userMessage.images?.length - ? { - images: dialogTurn.userMessage.images.map(img => ({ - id: img.id, - name: img.name, - data_url: img.dataUrl, - image_path: img.imagePath, - mime_type: img.mimeType, - })), - original_text: dialogTurn.userMessage.content, - } - : undefined, + metadata: mergedUserMetadata, }, modelRounds: dialogTurn.modelRounds.map((round, roundIndex) => { return { diff --git a/src/web-ui/src/flow_chat/state-machine/derivedState.ts b/src/web-ui/src/flow_chat/state-machine/derivedState.ts index 48b7ed1a..b65e125c 100644 --- a/src/web-ui/src/flow_chat/state-machine/derivedState.ts +++ b/src/web-ui/src/flow_chat/state-machine/derivedState.ts @@ -132,6 +132,9 @@ function getSendButtonMode( function getProgressBarMode(phase: ProcessingPhase | null): SessionDerivedState['progressBarMode'] { switch (phase) { + case ProcessingPhase.COMPACTING: + return 'indeterminate'; + case ProcessingPhase.TOOL_CALLING: return 'segmented'; @@ -177,6 +180,9 @@ function getProgressBarLabel( context: SessionStateMachine['context'] ): string { switch (phase) { + case ProcessingPhase.COMPACTING: + return 'Compressing session context...'; + case ProcessingPhase.STARTING: return 'Connecting to AI...'; @@ -207,6 +213,9 @@ function getProgressBarLabel( function getProgressBarColor(phase: ProcessingPhase | null): string { switch (phase) { + case ProcessingPhase.COMPACTING: + return '#0f766e'; + case ProcessingPhase.STARTING: return '#3b82f6'; diff --git a/src/web-ui/src/flow_chat/state-machine/transitions.ts b/src/web-ui/src/flow_chat/state-machine/transitions.ts index ee9fd097..ff957f76 100644 --- a/src/web-ui/src/flow_chat/state-machine/transitions.ts +++ b/src/web-ui/src/flow_chat/state-machine/transitions.ts @@ -28,6 +28,7 @@ export const STATE_TRANSITIONS: StateTransitionTable = { [SessionExecutionEvent.BACKEND_STREAM_COMPLETED]: SessionExecutionState.FINISHING, + [SessionExecutionEvent.COMPACTION_STARTED]: SessionExecutionState.PROCESSING, [SessionExecutionEvent.MODEL_ROUND_START]: SessionExecutionState.PROCESSING, [SessionExecutionEvent.TEXT_CHUNK_RECEIVED]: SessionExecutionState.PROCESSING, [SessionExecutionEvent.TOOL_DETECTED]: SessionExecutionState.PROCESSING, @@ -42,6 +43,7 @@ export const STATE_TRANSITIONS: StateTransitionTable = { [SessionExecutionEvent.USER_CANCEL]: SessionExecutionState.IDLE, [SessionExecutionEvent.ERROR_OCCURRED]: SessionExecutionState.ERROR, [SessionExecutionEvent.FINISHING_SETTLED]: SessionExecutionState.IDLE, + [SessionExecutionEvent.COMPACTION_STARTED]: SessionExecutionState.FINISHING, [SessionExecutionEvent.MODEL_ROUND_START]: SessionExecutionState.FINISHING, [SessionExecutionEvent.TEXT_CHUNK_RECEIVED]: SessionExecutionState.FINISHING, [SessionExecutionEvent.TOOL_DETECTED]: SessionExecutionState.FINISHING, @@ -64,6 +66,7 @@ export const STATE_TRANSITIONS: StateTransitionTable = { */ export const PHASE_TRANSITIONS: Record = { [SessionExecutionEvent.START]: ProcessingPhase.STARTING, + [SessionExecutionEvent.COMPACTION_STARTED]: ProcessingPhase.COMPACTING, [SessionExecutionEvent.MODEL_ROUND_START]: ProcessingPhase.THINKING, [SessionExecutionEvent.TEXT_CHUNK_RECEIVED]: ProcessingPhase.STREAMING, [SessionExecutionEvent.TOOL_DETECTED]: ProcessingPhase.TOOL_CALLING, diff --git a/src/web-ui/src/flow_chat/state-machine/types.ts b/src/web-ui/src/flow_chat/state-machine/types.ts index dc6645bd..1f60b307 100644 --- a/src/web-ui/src/flow_chat/state-machine/types.ts +++ b/src/web-ui/src/flow_chat/state-machine/types.ts @@ -33,6 +33,7 @@ export enum SessionExecutionState { */ export enum ProcessingPhase { STARTING = 'starting', + COMPACTING = 'compacting', THINKING = 'thinking', STREAMING = 'streaming', FINALIZING = 'finalizing', @@ -45,6 +46,7 @@ export enum ProcessingPhase { */ export enum SessionExecutionEvent { START = 'start', + COMPACTION_STARTED = 'compaction_started', MODEL_ROUND_START = 'model_round_start', TEXT_CHUNK_RECEIVED = 'text_chunk_received', TOOL_DETECTED = 'tool_detected', diff --git a/src/web-ui/src/flow_chat/store/FlowChatStore.ts b/src/web-ui/src/flow_chat/store/FlowChatStore.ts index f7bedf73..3da1a318 100644 --- a/src/web-ui/src/flow_chat/store/FlowChatStore.ts +++ b/src/web-ui/src/flow_chat/store/FlowChatStore.ts @@ -1425,10 +1425,12 @@ export class FlowChatStore { turnIndex, sessionId, timestamp: dialogTurn.startTime, + kind: dialogTurn.kind || 'user_dialog', userMessage: { id: dialogTurn.userMessage.id, content: dialogTurn.userMessage.content, - timestamp: dialogTurn.userMessage.timestamp + timestamp: dialogTurn.userMessage.timestamp, + metadata: dialogTurn.userMessage.metadata, }, modelRounds: dialogTurn.modelRounds.map((round, roundIndex) => { const textItems = round.items @@ -1704,12 +1706,14 @@ export class FlowChatStore { return { id: turn.turnId, sessionId: turn.sessionId, + kind: turn.kind || 'user_dialog', userMessage: { id: turn.userMessage.id, type: 'user' as const, content: displayContent, timestamp: turn.userMessage.timestamp, hasImages, + metadata, images, }, modelRounds: turn.modelRounds.map((round: any) => ({ diff --git a/src/web-ui/src/flow_chat/tool-cards/ContextCompressionDisplay.scss b/src/web-ui/src/flow_chat/tool-cards/ContextCompressionDisplay.scss index 467f4791..c5ea02ff 100644 --- a/src/web-ui/src/flow_chat/tool-cards/ContextCompressionDisplay.scss +++ b/src/web-ui/src/flow_chat/tool-cards/ContextCompressionDisplay.scss @@ -59,5 +59,20 @@ .icon-completed { color: rgba(34, 197, 94, 0.9); } + + .compression-detail-note { + padding: 12px 16px; + font-size: 12px; + line-height: 1.5; + color: var(--tool-card-text-secondary); + background: rgba(148, 163, 184, 0.08); + border-top: 1px solid rgba(148, 163, 184, 0.18); + + &.compression-detail-note--fallback { + color: rgba(245, 158, 11, 0.95); + background: rgba(245, 158, 11, 0.08); + border-top-color: rgba(245, 158, 11, 0.2); + } + } } diff --git a/src/web-ui/src/flow_chat/tool-cards/ContextCompressionDisplay.tsx b/src/web-ui/src/flow_chat/tool-cards/ContextCompressionDisplay.tsx index 0d743e6f..da754d30 100644 --- a/src/web-ui/src/flow_chat/tool-cards/ContextCompressionDisplay.tsx +++ b/src/web-ui/src/flow_chat/tool-cards/ContextCompressionDisplay.tsx @@ -16,6 +16,7 @@ interface ContextCompressionDisplayProps { session_id: string; compression_count: number; has_summary: boolean; + summary_source?: 'model' | 'local_fallback' | 'none'; tokens_before?: number; tokens_after?: number; compression_ratio?: number; @@ -41,6 +42,8 @@ export const ContextCompressionDisplay: React.FC tokensAfter: toolItem.toolResult?.result?.tokens_after || compressionData?.tokens_after, compressionRatio: toolItem.toolResult?.result?.compression_ratio || compressionData?.compression_ratio, duration: toolItem.toolResult?.duration_ms || compressionData?.duration, + hasSummary: toolItem.toolResult?.result?.has_summary ?? compressionData?.has_summary, + summarySource: toolItem.toolResult?.result?.summary_source || compressionData?.summary_source, trigger: toolItem.toolCall?.input?.trigger || compressionData?.trigger, status: (toolItem.status === 'cancelled' || toolItem.status === 'analyzing') ? 'completed' : toolItem.status, error: toolItem.toolResult?.error @@ -50,6 +53,8 @@ export const ContextCompressionDisplay: React.FC tokensAfter: compressionData?.tokens_after, compressionRatio: compressionData?.compression_ratio, duration: compressionData?.duration, + hasSummary: compressionData?.has_summary, + summarySource: compressionData?.summary_source, trigger: compressionData?.trigger, status: 'completed' as const }; @@ -75,6 +80,8 @@ export const ContextCompressionDisplay: React.FC const isLoading = data.status === 'preparing' || data.status === 'streaming' || data.status === 'running'; const isFailed = data.status === 'error'; + const usedLocalFallback = data.summarySource === 'local_fallback'; + const usedNoSummary = data.summarySource === 'none'; const renderToolIcon = () => { return ; @@ -129,12 +136,31 @@ export const ContextCompressionDisplay: React.FC ); + const renderExpandedContent = () => { + if (!usedLocalFallback && !usedNoSummary) { + return null; + } + + return ( +
+ {usedLocalFallback + ? t('toolCards.contextCompression.localFallbackNotice', { + defaultValue: 'Model-based summary was unavailable, so this compression used local structured truncation.', + }) + : t('toolCards.contextCompression.noSummaryNotice', { + defaultValue: 'No additional summary was generated for this compaction pass.', + })} +
+ ); + }; + return ( diff --git a/src/web-ui/src/flow_chat/types/flow-chat.ts b/src/web-ui/src/flow_chat/types/flow-chat.ts index 5ddf65bf..39f7cb8a 100644 --- a/src/web-ui/src/flow_chat/types/flow-chat.ts +++ b/src/web-ui/src/flow_chat/types/flow-chat.ts @@ -3,7 +3,7 @@ * Supports mixed streaming output. */ -import type { SessionKind } from '@/shared/types/session-history'; +import type { DialogTurnKind, SessionKind } from '@/shared/types/session-history'; // Base type for streaming items. export interface FlowItem { @@ -106,11 +106,13 @@ export interface TokenUsage { export interface DialogTurn { id: string; sessionId: string; // Used for event filtering. + kind?: DialogTurnKind; userMessage: { id: string; content: string; timestamp: number; hasImages?: boolean; + metadata?: Record; images?: Array<{ id: string; name: string; diff --git a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts index a56129e3..2629356f 100644 --- a/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts +++ b/src/web-ui/src/infrastructure/api/service-api/AgentAPI.ts @@ -57,6 +57,13 @@ export interface StartDialogTurnRequest { imageContexts?: ImageInputContextData[]; } +export interface CompactSessionRequest { + sessionId: string; + workspacePath?: string; + remoteConnectionId?: string; + remoteSshHost?: string; +} + export interface SessionInfo { sessionId: string; @@ -154,6 +161,7 @@ export interface CompressionEvent extends AgenticEvent { compressionRatio?: number; durationMs?: number; hasSummary?: boolean; + summarySource?: 'model' | 'local_fallback' | 'none'; error?: string; subagentParentInfo?: SubagentParentInfo; @@ -185,6 +193,14 @@ export class AgentAPI { } } + async compactSession(request: CompactSessionRequest): Promise<{ success: boolean; message: string }> { + try { + return await api.invoke<{ success: boolean; message: string }>('compact_session', { request }); + } catch (error) { + throw createTauriCommandError('compact_session', error, request); + } + } + async ensureAssistantBootstrap( request: EnsureAssistantBootstrapRequest ): Promise { diff --git a/src/web-ui/src/locales/en-US/flow-chat.json b/src/web-ui/src/locales/en-US/flow-chat.json index acd4349f..68fd396a 100644 --- a/src/web-ui/src/locales/en-US/flow-chat.json +++ b/src/web-ui/src/locales/en-US/flow-chat.json @@ -204,6 +204,11 @@ "noIncrementalModes": "No add-on modes available", "switchMode": "Switch Mode", "quickAction": "Quick action", + "compactAction": "Compact session", + "compactNoSession": "No active session for /compact", + "compactBusy": "Wait until the session is idle before using /compact.", + "compactUsage": "Use /compact without extra arguments.", + "compactFailed": "Session compaction failed", "currentMode": "Current mode: {{mode}}", "noMatchingMode": "No matching mode", "noMatchingCommand": "No matching command", @@ -701,7 +706,9 @@ "manualTrigger": "Manual trigger", "autoTrigger": "Auto trigger", "contextCompressionFailed": "Context compression failed", - "contextCompression": "Context compression:" + "contextCompression": "Context compression:", + "localFallbackNotice": "Model-based summary was unavailable, so this compression used local structured truncation.", + "noSummaryNotice": "No additional summary was generated for this compaction pass." }, "globSearch": { "parsingPattern": "Parsing search pattern...", diff --git a/src/web-ui/src/locales/zh-CN/flow-chat.json b/src/web-ui/src/locales/zh-CN/flow-chat.json index 454ee1f3..1c33de8e 100644 --- a/src/web-ui/src/locales/zh-CN/flow-chat.json +++ b/src/web-ui/src/locales/zh-CN/flow-chat.json @@ -204,6 +204,11 @@ "noIncrementalModes": "暂无可附加模式", "switchMode": "切换模式", "quickAction": "快捷操作", + "compactAction": "压缩会话", + "compactNoSession": "当前没有可用于 /compact 的会话", + "compactBusy": "请等待当前会话空闲后再使用 /compact。", + "compactUsage": "请直接使用 /compact,不要附加额外参数。", + "compactFailed": "会话压缩失败", "currentMode": "当前模式: {{mode}}", "noMatchingMode": "没有匹配的模式", "noMatchingCommand": "没有匹配的命令", @@ -693,7 +698,9 @@ "manualTrigger": "手动触发", "autoTrigger": "自动触发", "contextCompressionFailed": "上下文压缩失败", - "contextCompression": "上下文压缩:" + "contextCompression": "上下文压缩:", + "localFallbackNotice": "本次压缩无法使用模型生成总结,因此改用了本地结构化裁剪。", + "noSummaryNotice": "本次压缩没有生成额外的总结内容。" }, "globSearch": { "parsingPattern": "解析搜索模式中...", diff --git a/src/web-ui/src/shared/types/session-history.ts b/src/web-ui/src/shared/types/session-history.ts index e373d9e2..15df8ccb 100644 --- a/src/web-ui/src/shared/types/session-history.ts +++ b/src/web-ui/src/shared/types/session-history.ts @@ -38,6 +38,7 @@ export interface SessionMetadata { } export type SessionStatus = 'active' | 'archived' | 'completed'; +export type DialogTurnKind = 'user_dialog' | 'manual_compaction'; export interface SessionList { sessions: SessionMetadata[]; @@ -50,6 +51,7 @@ export interface DialogTurnData { turnIndex: number; sessionId: string; timestamp: number; + kind?: DialogTurnKind; userMessage: UserMessageData; modelRounds: ModelRoundData[]; startTime: number;