Skip to content
41 changes: 28 additions & 13 deletions rasa/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from types import LambdaType
from typing import Any, Dict, List, Optional, Text, Tuple, Union



from rasa.core.http_interpreter import RasaNLUHttpInterpreter
from rasa.engine import loader
from rasa.engine.constants import PLACEHOLDER_MESSAGE, PLACEHOLDER_TRACKER
Expand Down Expand Up @@ -149,34 +151,47 @@ def _load_model(
except tarfile.ReadError:
raise ModelNotFound(f"Model {model_path} can not be loaded.")


async def handle_message(
self, message: UserMessage
) -> Optional[List[Dict[Text, Any]]]:
"""Handle a single message with this processor."""
# preprocess message if necessary

# Start timer for performance monitoring
start_time = time.perf_counter()

# Preprocess and log message, but do not save tracker yet
tracker = await self.log_message(message, should_save_tracker=False)


# If the model is NLU-only, skip action prediction
if self.model_metadata.training_type == TrainingType.NLU:
await self.save_tracker(tracker)
rasa.shared.utils.io.raise_warning(
"No core model. Skipping action prediction and execution.",
docs=DOCS_URL_POLICIES,
)

# Stop timer and log runtime duration
end_time = time.perf_counter()
duration = round(end_time - start_time, 3)
logger.info(f"[Runtime] NLU-only message '{message.text}' processed in {duration}s")

return None

tracker = await self.run_action_extract_slots(message.output_channel, tracker)

await self._run_prediction_loop(message.output_channel, tracker)

await self.run_anonymization_pipeline(tracker)


# Predict the next action
await self._predict_and_execute_next_action(message.output_channel, tracker)

# Save tracker state after processing
await self.save_tracker(tracker)

if isinstance(message.output_channel, CollectingOutputChannel):
return message.output_channel.messages


# Stop timer and log total processing duration
end_time = time.perf_counter()
duration = round(end_time - start_time, 3)
logger.info(f"[Runtime] Message '{message.text}' fully processed in {duration}s")

return None


async def run_action_extract_slots(
self, output_channel: OutputChannel, tracker: DialogueStateTracker
) -> DialogueStateTracker:
Expand Down
19 changes: 19 additions & 0 deletions rasa/core/tracker_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,25 @@ def _additional_events(
tracker.events, number_of_events_since_last_session, len(tracker.events)
)

class BufferedTrackerStore(TrackerStore):
"""A tracker store that buffers events and flushes them to the underlying store."""
def __init__(self, flush_interval=5):
self._pending_events = [] # buffer for events
self.flush_interval = flush_interval # time interval to flush events

def update(self, event):
self._pending_events.append(event) # add event to buffer
# Check if we need to flush
if len(self._pending_events) >= self.flush_interval:
# If the buffer is full, flush the events
self.flush()

def flush(self):
if self._pending_events:
tracker.update_events(self._pending_events) # update the tracker with buffered events
# Clear the buffer after flushing
self._pending_events.clear()
super().update(tracker) # save the tracker to the underlying store

class FailSafeTrackerStore(TrackerStore):
"""Tracker store wrapper.
Expand Down
12 changes: 12 additions & 0 deletions tests/core/test_buffered_tracker_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
def test_buffered_tracker_save():
store = BufferedTrackerStore(flush_interval=2)
tracker = DialogueStateTracker("user123", slots=[])
event1 = UserUttered("hi")
event2 = SlotSet("pizza", "pepperoni")

store.update(event1)
assert len(store._pending_events) == 1

store.update(event2)
# flush should trigger
assert len(store._pending_events) == 0 # should be cleared after flush