diff --git a/rasa/core/processor.py b/rasa/core/processor.py index fc628c7a7247..6cba13bf222b 100644 --- a/rasa/core/processor.py +++ b/rasa/core/processor.py @@ -8,6 +8,8 @@ from types import LambdaType from typing import Any, Dict, List, Optional, Text, Tuple, Union + + from rasa.core.http_interpreter import RasaNLUHttpInterpreter from rasa.engine import loader from rasa.engine.constants import PLACEHOLDER_MESSAGE, PLACEHOLDER_TRACKER @@ -149,34 +151,47 @@ def _load_model( except tarfile.ReadError: raise ModelNotFound(f"Model {model_path} can not be loaded.") + async def handle_message( self, message: UserMessage ) -> Optional[List[Dict[Text, Any]]]: """Handle a single message with this processor.""" - # preprocess message if necessary + + # Start timer for performance monitoring + start_time = time.perf_counter() + + # Preprocess and log message, but do not save tracker yet tracker = await self.log_message(message, should_save_tracker=False) - + + # If the model is NLU-only, skip action prediction if self.model_metadata.training_type == TrainingType.NLU: await self.save_tracker(tracker) rasa.shared.utils.io.raise_warning( "No core model. Skipping action prediction and execution.", docs=DOCS_URL_POLICIES, ) + + # Stop timer and log runtime duration + end_time = time.perf_counter() + duration = round(end_time - start_time, 3) + logger.info(f"[Runtime] NLU-only message '{message.text}' processed in {duration}s") + return None - - tracker = await self.run_action_extract_slots(message.output_channel, tracker) - - await self._run_prediction_loop(message.output_channel, tracker) - - await self.run_anonymization_pipeline(tracker) - + + # Predict the next action + await self._predict_and_execute_next_action(message.output_channel, tracker) + + # Save tracker state after processing await self.save_tracker(tracker) - - if isinstance(message.output_channel, CollectingOutputChannel): - return message.output_channel.messages - + + # Stop timer and log total processing duration + end_time = time.perf_counter() + duration = round(end_time - start_time, 3) + logger.info(f"[Runtime] Message '{message.text}' fully processed in {duration}s") + return None + async def run_action_extract_slots( self, output_channel: OutputChannel, tracker: DialogueStateTracker ) -> DialogueStateTracker: diff --git a/rasa/core/tracker_store.py b/rasa/core/tracker_store.py index a91f9f5c6b87..4794f0550a20 100644 --- a/rasa/core/tracker_store.py +++ b/rasa/core/tracker_store.py @@ -1373,6 +1373,25 @@ def _additional_events( tracker.events, number_of_events_since_last_session, len(tracker.events) ) +class BufferedTrackerStore(TrackerStore): + """A tracker store that buffers events and flushes them to the underlying store.""" + def __init__(self, flush_interval=5): + self._pending_events = [] # buffer for events + self.flush_interval = flush_interval # time interval to flush events + + def update(self, event): + self._pending_events.append(event) # add event to buffer + # Check if we need to flush + if len(self._pending_events) >= self.flush_interval: + # If the buffer is full, flush the events + self.flush() + + def flush(self): + if self._pending_events: + tracker.update_events(self._pending_events) # update the tracker with buffered events + # Clear the buffer after flushing + self._pending_events.clear() + super().update(tracker) # save the tracker to the underlying store class FailSafeTrackerStore(TrackerStore): """Tracker store wrapper. diff --git a/tests/core/test_buffered_tracker_store.py b/tests/core/test_buffered_tracker_store.py new file mode 100644 index 000000000000..19e687d5158c --- /dev/null +++ b/tests/core/test_buffered_tracker_store.py @@ -0,0 +1,12 @@ +def test_buffered_tracker_save(): + store = BufferedTrackerStore(flush_interval=2) + tracker = DialogueStateTracker("user123", slots=[]) + event1 = UserUttered("hi") + event2 = SlotSet("pizza", "pepperoni") + + store.update(event1) + assert len(store._pending_events) == 1 + + store.update(event2) + # flush should trigger + assert len(store._pending_events) == 0 # should be cleared after flush \ No newline at end of file