mirror of
https://github.com/zadam/trilium.git
synced 2025-11-03 03:46:37 +01:00
add some more useful tools
CLOSER.... works?
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import type { ChatPipelineInput, ChatPipelineConfig, PipelineMetrics, StreamCallback } from './interfaces.js';
|
||||
import type { ChatResponse, StreamChunk } from '../ai_interface.js';
|
||||
import type { ChatResponse, StreamChunk, Message } from '../ai_interface.js';
|
||||
import { ContextExtractionStage } from './stages/context_extraction_stage.js';
|
||||
import { SemanticContextExtractionStage } from './stages/semantic_context_extraction_stage.js';
|
||||
import { AgentToolsContextStage } from './stages/agent_tools_context_stage.js';
|
||||
@@ -12,6 +12,7 @@ import { VectorSearchStage } from './stages/vector_search_stage.js';
|
||||
import toolRegistry from '../tools/tool_registry.js';
|
||||
import toolInitializer from '../tools/tool_initializer.js';
|
||||
import log from '../../log.js';
|
||||
import type { LLMServiceInterface } from '../interfaces/agent_tool_interfaces.js';
|
||||
|
||||
/**
|
||||
* Pipeline for managing the entire chat flow
|
||||
@@ -80,6 +81,7 @@ export class ChatPipeline {
|
||||
* This is the main entry point that orchestrates all pipeline stages
|
||||
*/
|
||||
async execute(input: ChatPipelineInput): Promise<ChatResponse> {
|
||||
log.info(`========== STARTING CHAT PIPELINE ==========`);
|
||||
log.info(`Executing chat pipeline with ${input.messages.length} messages`);
|
||||
const startTime = Date.now();
|
||||
this.metrics.totalExecutions++;
|
||||
@@ -113,89 +115,107 @@ export class ChatPipeline {
|
||||
|
||||
// First, select the appropriate model based on query complexity and content length
|
||||
const modelSelectionStartTime = Date.now();
|
||||
log.info(`========== MODEL SELECTION ==========`);
|
||||
const modelSelection = await this.stages.modelSelection.execute({
|
||||
options: input.options,
|
||||
query: input.query,
|
||||
contentLength
|
||||
});
|
||||
this.updateStageMetrics('modelSelection', modelSelectionStartTime);
|
||||
log.info(`Selected model: ${modelSelection.options.model || 'default'}, enableTools: ${modelSelection.options.enableTools}`);
|
||||
|
||||
// Determine if we should use tools or semantic context
|
||||
const useTools = modelSelection.options.enableTools === true;
|
||||
const useEnhancedContext = input.options?.useAdvancedContext === true;
|
||||
|
||||
// Determine which pipeline flow to use
|
||||
let context: string | undefined;
|
||||
// Early return if we don't have a query or enhanced context is disabled
|
||||
if (!input.query || !useEnhancedContext) {
|
||||
log.info(`========== SIMPLE QUERY MODE ==========`);
|
||||
log.info('Enhanced context disabled or no query provided, skipping context enrichment');
|
||||
|
||||
// For context-aware chats, get the appropriate context
|
||||
if (input.noteId && input.query) {
|
||||
const contextStartTime = Date.now();
|
||||
if (input.showThinking) {
|
||||
// Get enhanced context with agent tools if thinking is enabled
|
||||
const agentContext = await this.stages.agentToolsContext.execute({
|
||||
noteId: input.noteId,
|
||||
query: input.query,
|
||||
showThinking: input.showThinking
|
||||
});
|
||||
context = agentContext.context;
|
||||
this.updateStageMetrics('agentToolsContext', contextStartTime);
|
||||
} else if (!useTools) {
|
||||
// Only get semantic context if tools are NOT enabled
|
||||
// When tools are enabled, we'll let the LLM request context via tools instead
|
||||
log.info('Getting semantic context for note using pipeline stages');
|
||||
|
||||
// First use the vector search stage to find relevant notes
|
||||
const vectorSearchStartTime = Date.now();
|
||||
log.info(`Executing vector search stage for query: "${input.query?.substring(0, 50)}..."`);
|
||||
|
||||
const vectorSearchResult = await this.stages.vectorSearch.execute({
|
||||
query: input.query || '',
|
||||
noteId: input.noteId,
|
||||
options: {
|
||||
maxResults: 10,
|
||||
useEnhancedQueries: true,
|
||||
threshold: 0.6
|
||||
}
|
||||
});
|
||||
|
||||
this.updateStageMetrics('vectorSearch', vectorSearchStartTime);
|
||||
|
||||
log.info(`Vector search found ${vectorSearchResult.searchResults.length} relevant notes`);
|
||||
|
||||
// Then pass to the semantic context stage to build the formatted context
|
||||
const semanticContext = await this.stages.semanticContextExtraction.execute({
|
||||
noteId: input.noteId,
|
||||
query: input.query,
|
||||
messages: input.messages
|
||||
});
|
||||
|
||||
context = semanticContext.context;
|
||||
this.updateStageMetrics('semanticContextExtraction', contextStartTime);
|
||||
} else {
|
||||
log.info('Tools are enabled - using minimal direct context to avoid race conditions');
|
||||
// Get context from current note directly without semantic search
|
||||
if (input.noteId) {
|
||||
try {
|
||||
const contextExtractor = new (await import('../../llm/context/index.js')).ContextExtractor();
|
||||
// Just get the direct content of the current note
|
||||
context = await contextExtractor.extractContext(input.noteId, {
|
||||
includeContent: true,
|
||||
includeParents: true,
|
||||
includeChildren: true,
|
||||
includeLinks: true,
|
||||
includeSimilar: false // Skip semantic search to avoid race conditions
|
||||
});
|
||||
log.info(`Direct context extracted (${context.length} chars) without semantic search`);
|
||||
} catch (error: any) {
|
||||
log.error(`Error extracting direct context: ${error.message}`);
|
||||
context = ""; // Fallback to empty context if extraction fails
|
||||
}
|
||||
} else {
|
||||
context = ""; // No note ID, so no context
|
||||
}
|
||||
}
|
||||
// Prepare messages without additional context
|
||||
const messagePreparationStartTime = Date.now();
|
||||
const preparedMessages = await this.stages.messagePreparation.execute({
|
||||
messages: input.messages,
|
||||
systemPrompt: input.options?.systemPrompt,
|
||||
options: modelSelection.options
|
||||
});
|
||||
this.updateStageMetrics('messagePreparation', messagePreparationStartTime);
|
||||
|
||||
// Generate completion using the LLM
|
||||
const llmStartTime = Date.now();
|
||||
const completion = await this.stages.llmCompletion.execute({
|
||||
messages: preparedMessages.messages,
|
||||
options: modelSelection.options
|
||||
});
|
||||
this.updateStageMetrics('llmCompletion', llmStartTime);
|
||||
|
||||
return completion.response;
|
||||
}
|
||||
|
||||
// Prepare messages with context and system prompt
|
||||
// STAGE 1: Start with the user's query
|
||||
const userQuery = input.query || '';
|
||||
log.info(`========== STAGE 1: USER QUERY ==========`);
|
||||
log.info(`Processing query with: question="${userQuery.substring(0, 50)}...", noteId=${input.noteId}, showThinking=${input.showThinking}`);
|
||||
|
||||
// STAGE 2: Perform query decomposition using the LLM
|
||||
log.info(`========== STAGE 2: QUERY DECOMPOSITION ==========`);
|
||||
log.info('Performing query decomposition to generate effective search queries');
|
||||
const llmService = await this.getLLMService();
|
||||
let searchQueries = [userQuery]; // Default to original query
|
||||
|
||||
if (llmService && llmService.generateSearchQueries) {
|
||||
try {
|
||||
const decompositionResult = await llmService.generateSearchQueries(userQuery);
|
||||
if (decompositionResult && decompositionResult.length > 0) {
|
||||
searchQueries = decompositionResult;
|
||||
log.info(`Generated ${searchQueries.length} search queries: ${JSON.stringify(searchQueries)}`);
|
||||
} else {
|
||||
log.info('Query decomposition returned no results, using original query');
|
||||
}
|
||||
} catch (error: any) {
|
||||
log.error(`Error in query decomposition: ${error.message || String(error)}`);
|
||||
}
|
||||
} else {
|
||||
log.info('No LLM service available for query decomposition, using original query');
|
||||
}
|
||||
|
||||
// STAGE 3: Execute vector similarity search with decomposed queries
|
||||
const vectorSearchStartTime = Date.now();
|
||||
log.info(`========== STAGE 3: VECTOR SEARCH ==========`);
|
||||
log.info('Using VectorSearchStage pipeline component to find relevant notes');
|
||||
|
||||
const vectorSearchResult = await this.stages.vectorSearch.execute({
|
||||
query: userQuery,
|
||||
noteId: input.noteId || 'global',
|
||||
options: {
|
||||
maxResults: 5, // Can be adjusted
|
||||
useEnhancedQueries: true,
|
||||
threshold: 0.6,
|
||||
llmService: llmService || undefined
|
||||
}
|
||||
});
|
||||
|
||||
this.updateStageMetrics('vectorSearch', vectorSearchStartTime);
|
||||
|
||||
log.info(`Vector search found ${vectorSearchResult.searchResults.length} relevant notes`);
|
||||
|
||||
// Extract context from search results
|
||||
log.info(`========== SEMANTIC CONTEXT EXTRACTION ==========`);
|
||||
const semanticContextStartTime = Date.now();
|
||||
const semanticContext = await this.stages.semanticContextExtraction.execute({
|
||||
noteId: input.noteId || 'global',
|
||||
query: userQuery,
|
||||
messages: input.messages,
|
||||
searchResults: vectorSearchResult.searchResults
|
||||
});
|
||||
|
||||
const context = semanticContext.context;
|
||||
this.updateStageMetrics('semanticContextExtraction', semanticContextStartTime);
|
||||
log.info(`Extracted semantic context (${context.length} chars)`);
|
||||
|
||||
// STAGE 4: Prepare messages with context and tool definitions for the LLM
|
||||
log.info(`========== STAGE 4: MESSAGE PREPARATION ==========`);
|
||||
const messagePreparationStartTime = Date.now();
|
||||
const preparedMessages = await this.stages.messagePreparation.execute({
|
||||
messages: input.messages,
|
||||
@@ -204,9 +224,7 @@ export class ChatPipeline {
|
||||
options: modelSelection.options
|
||||
});
|
||||
this.updateStageMetrics('messagePreparation', messagePreparationStartTime);
|
||||
|
||||
// Generate completion using the LLM
|
||||
const llmStartTime = Date.now();
|
||||
log.info(`Prepared ${preparedMessages.messages.length} messages for LLM, tools enabled: ${useTools}`);
|
||||
|
||||
// Setup streaming handler if streaming is enabled and callback provided
|
||||
const enableStreaming = this.config.enableStreaming &&
|
||||
@@ -218,11 +236,15 @@ export class ChatPipeline {
|
||||
modelSelection.options.stream = true;
|
||||
}
|
||||
|
||||
// STAGE 5 & 6: Handle LLM completion and tool execution loop
|
||||
log.info(`========== STAGE 5: LLM COMPLETION ==========`);
|
||||
const llmStartTime = Date.now();
|
||||
const completion = await this.stages.llmCompletion.execute({
|
||||
messages: preparedMessages.messages,
|
||||
options: modelSelection.options
|
||||
});
|
||||
this.updateStageMetrics('llmCompletion', llmStartTime);
|
||||
log.info(`Received LLM response from model: ${completion.response.model}, provider: ${completion.response.provider}`);
|
||||
|
||||
// Handle streaming if enabled and available
|
||||
if (enableStreaming && completion.response.stream && streamCallback) {
|
||||
@@ -242,123 +264,247 @@ export class ChatPipeline {
|
||||
// Process any tool calls in the response
|
||||
let currentMessages = preparedMessages.messages;
|
||||
let currentResponse = completion.response;
|
||||
let needsFollowUp = false;
|
||||
let toolCallIterations = 0;
|
||||
const maxToolCallIterations = this.config.maxToolCallIterations;
|
||||
|
||||
// Check if tools were enabled in the options
|
||||
const toolsEnabled = modelSelection.options.enableTools !== false;
|
||||
|
||||
log.info(`========== TOOL CALL PROCESSING ==========`);
|
||||
log.info(`Tools enabled: ${toolsEnabled}`);
|
||||
log.info(`Tool calls in response: ${currentResponse.tool_calls ? currentResponse.tool_calls.length : 0}`);
|
||||
log.info(`Current response format: ${typeof currentResponse}`);
|
||||
log.info(`Response keys: ${Object.keys(currentResponse).join(', ')}`);
|
||||
|
||||
// Detailed tool call inspection
|
||||
|
||||
// Log decision points for tool execution
|
||||
log.info(`========== TOOL EXECUTION DECISION ==========`);
|
||||
log.info(`Tools enabled in options: ${toolsEnabled}`);
|
||||
log.info(`Response provider: ${currentResponse.provider || 'unknown'}`);
|
||||
log.info(`Response model: ${currentResponse.model || 'unknown'}`);
|
||||
log.info(`Response has tool_calls: ${currentResponse.tool_calls ? 'true' : 'false'}`);
|
||||
if (currentResponse.tool_calls) {
|
||||
currentResponse.tool_calls.forEach((tool, idx) => {
|
||||
log.info(`Tool call ${idx+1}: ${JSON.stringify(tool)}`);
|
||||
});
|
||||
log.info(`Number of tool calls: ${currentResponse.tool_calls.length}`);
|
||||
log.info(`Tool calls details: ${JSON.stringify(currentResponse.tool_calls)}`);
|
||||
|
||||
// Check if we have a response from Ollama, which might be handled differently
|
||||
if (currentResponse.provider === 'Ollama') {
|
||||
log.info(`ATTENTION: Response is from Ollama - checking if tool execution path is correct`);
|
||||
log.info(`Tool calls type: ${typeof currentResponse.tool_calls}`);
|
||||
log.info(`First tool call name: ${currentResponse.tool_calls[0]?.function?.name || 'unknown'}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Process tool calls if present and tools are enabled
|
||||
// Tool execution loop
|
||||
if (toolsEnabled && currentResponse.tool_calls && currentResponse.tool_calls.length > 0) {
|
||||
log.info(`========== STAGE 6: TOOL EXECUTION ==========`);
|
||||
log.info(`Response contains ${currentResponse.tool_calls.length} tool calls, processing...`);
|
||||
|
||||
// Start tool calling loop
|
||||
log.info(`Starting tool calling loop with max ${maxToolCallIterations} iterations`);
|
||||
// Format tool calls for logging
|
||||
log.info(`========== TOOL CALL DETAILS ==========`);
|
||||
currentResponse.tool_calls.forEach((toolCall, idx) => {
|
||||
log.info(`Tool call ${idx + 1}: name=${toolCall.function?.name || 'unknown'}, id=${toolCall.id || 'no-id'}`);
|
||||
log.info(`Arguments: ${toolCall.function?.arguments || '{}'}`);
|
||||
});
|
||||
|
||||
do {
|
||||
log.info(`Tool calling iteration ${toolCallIterations + 1}`);
|
||||
// Keep track of whether we're in a streaming response
|
||||
const isStreaming = enableStreaming && streamCallback;
|
||||
let streamingPaused = false;
|
||||
|
||||
// Execute tool calling stage
|
||||
const toolCallingStartTime = Date.now();
|
||||
const toolCallingResult = await this.stages.toolCalling.execute({
|
||||
response: currentResponse,
|
||||
messages: currentMessages,
|
||||
options: modelSelection.options
|
||||
});
|
||||
this.updateStageMetrics('toolCalling', toolCallingStartTime);
|
||||
// If streaming was enabled, send an update to the user
|
||||
if (isStreaming && streamCallback) {
|
||||
streamingPaused = true;
|
||||
await streamCallback('', true); // Signal pause in streaming
|
||||
await streamCallback('\n\n[Executing tools...]\n\n', false);
|
||||
}
|
||||
|
||||
// Update state for next iteration
|
||||
currentMessages = toolCallingResult.messages;
|
||||
needsFollowUp = toolCallingResult.needsFollowUp;
|
||||
while (toolCallIterations < maxToolCallIterations) {
|
||||
toolCallIterations++;
|
||||
log.info(`========== TOOL ITERATION ${toolCallIterations}/${maxToolCallIterations} ==========`);
|
||||
|
||||
// Make another call to the LLM if needed
|
||||
if (needsFollowUp) {
|
||||
log.info(`Tool execution completed, making follow-up LLM call (iteration ${toolCallIterations + 1})...`);
|
||||
// Create a copy of messages before tool execution
|
||||
const previousMessages = [...currentMessages];
|
||||
|
||||
// Generate a new LLM response with the updated messages
|
||||
const followUpStartTime = Date.now();
|
||||
log.info(`Sending follow-up request to LLM with ${currentMessages.length} messages (including tool results)`);
|
||||
try {
|
||||
const toolCallingStartTime = Date.now();
|
||||
log.info(`========== PIPELINE TOOL EXECUTION FLOW ==========`);
|
||||
log.info(`About to call toolCalling.execute with ${currentResponse.tool_calls.length} tool calls`);
|
||||
log.info(`Tool calls being passed to stage: ${JSON.stringify(currentResponse.tool_calls)}`);
|
||||
|
||||
const followUpCompletion = await this.stages.llmCompletion.execute({
|
||||
const toolCallingResult = await this.stages.toolCalling.execute({
|
||||
response: currentResponse,
|
||||
messages: currentMessages,
|
||||
options: modelSelection.options
|
||||
});
|
||||
this.updateStageMetrics('llmCompletion', followUpStartTime);
|
||||
this.updateStageMetrics('toolCalling', toolCallingStartTime);
|
||||
|
||||
// Update current response for next iteration
|
||||
currentResponse = followUpCompletion.response;
|
||||
log.info(`ToolCalling stage execution complete, got result with needsFollowUp: ${toolCallingResult.needsFollowUp}`);
|
||||
|
||||
// Check for more tool calls
|
||||
const hasMoreToolCalls = !!(currentResponse.tool_calls && currentResponse.tool_calls.length > 0);
|
||||
// Update messages with tool results
|
||||
currentMessages = toolCallingResult.messages;
|
||||
|
||||
if (hasMoreToolCalls) {
|
||||
log.info(`Follow-up response contains ${currentResponse.tool_calls?.length || 0} more tool calls`);
|
||||
// Log the tool results for debugging
|
||||
const toolResultMessages = currentMessages.filter(
|
||||
msg => msg.role === 'tool' && !previousMessages.includes(msg)
|
||||
);
|
||||
|
||||
log.info(`========== TOOL EXECUTION RESULTS ==========`);
|
||||
toolResultMessages.forEach((msg, idx) => {
|
||||
log.info(`Tool result ${idx + 1}: tool_call_id=${msg.tool_call_id}, content=${msg.content.substring(0, 50)}...`);
|
||||
|
||||
// If streaming, show tool executions to the user
|
||||
if (isStreaming && streamCallback) {
|
||||
// For each tool result, format a readable message for the user
|
||||
const toolName = this.getToolNameFromToolCallId(currentMessages, msg.tool_call_id || '');
|
||||
const formattedToolResult = `[Tool: ${toolName || 'unknown'}]\n${msg.content}\n\n`;
|
||||
streamCallback(formattedToolResult, false);
|
||||
}
|
||||
});
|
||||
|
||||
// Check if we need another LLM completion for tool results
|
||||
if (toolCallingResult.needsFollowUp) {
|
||||
log.info(`========== TOOL FOLLOW-UP REQUIRED ==========`);
|
||||
log.info('Tool execution complete, sending results back to LLM');
|
||||
|
||||
// Ensure messages are properly formatted
|
||||
this.validateToolMessages(currentMessages);
|
||||
|
||||
// If streaming, show progress to the user
|
||||
if (isStreaming && streamCallback) {
|
||||
await streamCallback('[Generating response with tool results...]\n\n', false);
|
||||
}
|
||||
|
||||
// Generate a new completion with the updated messages
|
||||
const followUpStartTime = Date.now();
|
||||
const followUpCompletion = await this.stages.llmCompletion.execute({
|
||||
messages: currentMessages,
|
||||
options: {
|
||||
...modelSelection.options,
|
||||
// Ensure tool support is still enabled for follow-up requests
|
||||
enableTools: true,
|
||||
// Disable streaming during tool execution follow-ups
|
||||
stream: false
|
||||
}
|
||||
});
|
||||
this.updateStageMetrics('llmCompletion', followUpStartTime);
|
||||
|
||||
// Update current response for the next iteration
|
||||
currentResponse = followUpCompletion.response;
|
||||
|
||||
// Check if we need to continue the tool calling loop
|
||||
if (!currentResponse.tool_calls || currentResponse.tool_calls.length === 0) {
|
||||
log.info(`========== TOOL EXECUTION COMPLETE ==========`);
|
||||
log.info('No more tool calls, breaking tool execution loop');
|
||||
break;
|
||||
} else {
|
||||
log.info(`========== ADDITIONAL TOOL CALLS DETECTED ==========`);
|
||||
log.info(`Next iteration has ${currentResponse.tool_calls.length} more tool calls`);
|
||||
// Log the next set of tool calls
|
||||
currentResponse.tool_calls.forEach((toolCall, idx) => {
|
||||
log.info(`Next tool call ${idx + 1}: name=${toolCall.function?.name || 'unknown'}, id=${toolCall.id || 'no-id'}`);
|
||||
log.info(`Arguments: ${toolCall.function?.arguments || '{}'}`);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
log.info(`Follow-up response contains no more tool calls - completing tool loop`);
|
||||
log.info(`========== TOOL EXECUTION COMPLETE ==========`);
|
||||
log.info('No follow-up needed, breaking tool execution loop');
|
||||
break;
|
||||
}
|
||||
} catch (error: any) {
|
||||
log.info(`========== TOOL EXECUTION ERROR ==========`);
|
||||
log.error(`Error in tool execution: ${error.message || String(error)}`);
|
||||
|
||||
// Add error message to the conversation if tool execution fails
|
||||
currentMessages.push({
|
||||
role: 'system',
|
||||
content: `Error executing tool: ${error.message || String(error)}. Please try a different approach.`
|
||||
});
|
||||
|
||||
// If streaming, show error to the user
|
||||
if (isStreaming && streamCallback) {
|
||||
await streamCallback(`[Tool execution error: ${error.message || 'unknown error'}]\n\n`, false);
|
||||
}
|
||||
|
||||
// Continue loop if there are more tool calls
|
||||
needsFollowUp = hasMoreToolCalls;
|
||||
// Make a follow-up request to the LLM with the error information
|
||||
const errorFollowUpCompletion = await this.stages.llmCompletion.execute({
|
||||
messages: currentMessages,
|
||||
options: modelSelection.options
|
||||
});
|
||||
|
||||
// Update current response and break the tool loop
|
||||
currentResponse = errorFollowUpCompletion.response;
|
||||
break;
|
||||
}
|
||||
|
||||
// Increment iteration counter
|
||||
toolCallIterations++;
|
||||
|
||||
} while (needsFollowUp && toolCallIterations < maxToolCallIterations);
|
||||
|
||||
// If we hit max iterations but still have tool calls, log a warning
|
||||
if (toolCallIterations >= maxToolCallIterations && needsFollowUp) {
|
||||
log.error(`Reached maximum tool call iterations (${maxToolCallIterations}), stopping`);
|
||||
}
|
||||
|
||||
log.info(`Completed ${toolCallIterations} tool call iterations`);
|
||||
if (toolCallIterations >= maxToolCallIterations) {
|
||||
log.info(`========== MAXIMUM TOOL ITERATIONS REACHED ==========`);
|
||||
log.error(`Reached maximum tool call iterations (${maxToolCallIterations}), terminating loop`);
|
||||
|
||||
// Add a message to inform the LLM that we've reached the limit
|
||||
currentMessages.push({
|
||||
role: 'system',
|
||||
content: `Maximum tool call iterations (${maxToolCallIterations}) reached. Please provide your best response with the information gathered so far.`
|
||||
});
|
||||
|
||||
// If streaming, inform the user about iteration limit
|
||||
if (isStreaming && streamCallback) {
|
||||
await streamCallback(`[Reached maximum of ${maxToolCallIterations} tool calls. Finalizing response...]\n\n`, false);
|
||||
}
|
||||
|
||||
// Make a final request to get a summary response
|
||||
const finalFollowUpCompletion = await this.stages.llmCompletion.execute({
|
||||
messages: currentMessages,
|
||||
options: {
|
||||
...modelSelection.options,
|
||||
enableTools: false // Disable tools for the final response
|
||||
}
|
||||
});
|
||||
|
||||
// Update the current response
|
||||
currentResponse = finalFollowUpCompletion.response;
|
||||
}
|
||||
|
||||
// If streaming was paused for tool execution, resume it now with the final response
|
||||
if (isStreaming && streamCallback && streamingPaused) {
|
||||
// Resume streaming with the final response text
|
||||
await streamCallback(currentResponse.text, true);
|
||||
}
|
||||
} else if (toolsEnabled) {
|
||||
log.info(`========== NO TOOL CALLS DETECTED ==========`);
|
||||
log.info(`LLM response did not contain any tool calls, skipping tool execution`);
|
||||
}
|
||||
|
||||
// For non-streaming responses, process the final response
|
||||
const processStartTime = Date.now();
|
||||
const processed = await this.stages.responseProcessing.execute({
|
||||
// Process the final response
|
||||
log.info(`========== FINAL RESPONSE PROCESSING ==========`);
|
||||
const responseProcessingStartTime = Date.now();
|
||||
const processedResponse = await this.stages.responseProcessing.execute({
|
||||
response: currentResponse,
|
||||
options: input.options
|
||||
options: modelSelection.options
|
||||
});
|
||||
this.updateStageMetrics('responseProcessing', processStartTime);
|
||||
this.updateStageMetrics('responseProcessing', responseProcessingStartTime);
|
||||
log.info(`Final response processed, returning to user (${processedResponse.text.length} chars)`);
|
||||
|
||||
// Combine response with processed text, using accumulated text if streamed
|
||||
const finalResponse: ChatResponse = {
|
||||
...currentResponse,
|
||||
text: accumulatedText || processed.text
|
||||
};
|
||||
// Return the final response to the user
|
||||
// The ResponseProcessingStage returns {text}, not {response}
|
||||
// So we update our currentResponse with the processed text
|
||||
currentResponse.text = processedResponse.text;
|
||||
|
||||
const endTime = Date.now();
|
||||
const executionTime = endTime - startTime;
|
||||
|
||||
// Update overall average execution time
|
||||
this.metrics.averageExecutionTime =
|
||||
(this.metrics.averageExecutionTime * (this.metrics.totalExecutions - 1) + executionTime) /
|
||||
this.metrics.totalExecutions;
|
||||
|
||||
log.info(`Chat pipeline completed in ${executionTime}ms`);
|
||||
|
||||
return finalResponse;
|
||||
log.info(`========== PIPELINE COMPLETE ==========`);
|
||||
return currentResponse;
|
||||
} catch (error: any) {
|
||||
log.error(`Error in chat pipeline: ${error.message}`);
|
||||
log.info(`========== PIPELINE ERROR ==========`);
|
||||
log.error(`Error in chat pipeline: ${error.message || String(error)}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to get an LLM service for query processing
|
||||
*/
|
||||
private async getLLMService(): Promise<LLMServiceInterface | null> {
|
||||
try {
|
||||
const aiServiceManager = await import('../ai_service_manager.js').then(module => module.default);
|
||||
return aiServiceManager.getService();
|
||||
} catch (error: any) {
|
||||
log.error(`Error getting LLM service: ${error.message || String(error)}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a stream chunk through the response processing stage
|
||||
*/
|
||||
@@ -428,4 +574,52 @@ export class ChatPipeline {
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find tool name from tool call ID by looking at previous assistant messages
|
||||
*/
|
||||
private getToolNameFromToolCallId(messages: Message[], toolCallId: string): string {
|
||||
if (!toolCallId) return 'unknown';
|
||||
|
||||
// Look for assistant messages with tool_calls
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const message = messages[i];
|
||||
if (message.role === 'assistant' && message.tool_calls) {
|
||||
// Find the tool call with the matching ID
|
||||
const toolCall = message.tool_calls.find(tc => tc.id === toolCallId);
|
||||
if (toolCall && toolCall.function && toolCall.function.name) {
|
||||
return toolCall.function.name;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 'unknown';
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate tool messages to ensure they're properly formatted
|
||||
*/
|
||||
private validateToolMessages(messages: Message[]): void {
|
||||
for (let i = 0; i < messages.length; i++) {
|
||||
const message = messages[i];
|
||||
|
||||
// Ensure tool messages have required fields
|
||||
if (message.role === 'tool') {
|
||||
if (!message.tool_call_id) {
|
||||
log.info(`Tool message missing tool_call_id, adding placeholder`);
|
||||
message.tool_call_id = `tool_${i}`;
|
||||
}
|
||||
|
||||
// Content should be a string
|
||||
if (typeof message.content !== 'string') {
|
||||
log.info(`Tool message content is not a string, converting`);
|
||||
try {
|
||||
message.content = JSON.stringify(message.content);
|
||||
} catch (e) {
|
||||
message.content = String(message.content);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user