Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
45c8a76
Add port and host to register_instance for all languages
v1kko Nov 20, 2025
6d21081
Add extra assertion to tests
v1kko Nov 25, 2025
b1b2021
Add profiling event to profiling database
v1kko Nov 28, 2025
08ef79e
communicate muscle processes to be monitored
v1kko Nov 28, 2025
3bbc2c9
Implementing the Protocol for monitoring
v1kko Nov 28, 2025
d23c172
Add the last mile to get everything in the sqlite database
v1kko Nov 28, 2025
2f157c7
Merge branch 'multiscale:develop' into better_monitoring
v1kko Dec 4, 2025
f57c879
Add (skeleton) MLPServer, split events to events/usage_evenst
v1kko Dec 8, 2025
f867167
Pass resource usage through MLPServer
v1kko Dec 9, 2025
919b62d
Get MLPServer address to Agents
v1kko Dec 10, 2025
c1058a9
remove wrong implementation to monitor
v1kko Dec 10, 2025
6fb36b1
Removing leftovers
v1kko Dec 10, 2025
4a84e76
Fix mypy issues
v1kko Dec 10, 2025
d1d1a32
Make environment optional, if no environment for a new process is spe…
v1kko Dec 10, 2025
4e07f29
Fix all flake8 issues
v1kko Dec 10, 2025
b54c0e3
pass procid and hostname from the instance instead of the MMPClient
v1kko Dec 10, 2025
154f29b
Fix profile_database_test to count forward instead of backwards
v1kko Dec 10, 2025
1366ede
Fix private variable access of the instance_manager
v1kko Dec 10, 2025
0b1d965
Fix mmp_client.cpp::register_instance headers
v1kko Dec 10, 2025
9a65742
Fix report usage implementation, only report back once per second, an…
v1kko Dec 19, 2025
1f9af1a
fix mlp_tests
v1kko Dec 19, 2025
b7cccee
fix threading issue with py3.8
v1kko Dec 19, 2025
bcead64
fix last remaining threading issue by force closing the Pool
v1kko Dec 19, 2025
935fc58
Fix tests by calling MLPClient.close() at the end of the tests
v1kko Dec 20, 2025
8c9a8d3
Usage collection now non-threaded, but with simple timestamps, to sat…
v1kko Dec 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libmuscle/python/libmuscle/mcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class RequestType(Enum):
REPORT_RESOURCES = 41
GET_COMMAND = 42
REPORT_RESULT = 43
MONITOR_USAGE = 44
Comment thread
v1kko marked this conversation as resolved.
Outdated


class ResponseType(Enum):
Expand All @@ -57,3 +58,4 @@ class AgentCommandType(Enum):
START = 1
CANCEL_ALL = 2
SHUTDOWN = 3
ADD_MONITOR = 4
3 changes: 2 additions & 1 deletion libmuscle/python/libmuscle/mmp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def encode_profile_event(event: ProfileEvent) -> Any:
event.event_type.value,
event.start_time.nanoseconds, event.stop_time.nanoseconds,
encoded_port, event.port_length, event.slot,
event.message_number, event.message_size, event.message_timestamp]
event.message_number, event.message_size, event.message_timestamp,
event.cpu_percent, event.memory_usage]


def decode_checkpoint_rule(rule: Dict[str, Any]) -> CheckpointRule:
Expand Down
10 changes: 8 additions & 2 deletions libmuscle/python/libmuscle/native_instantiator/agent/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
from libmuscle.native_instantiator.process_manager import ProcessManager
from libmuscle.native_instantiator.agent.map_client import MAPClient
from libmuscle.native_instantiator.agent.agent_commands import (
CancelAllCommand, ShutdownCommand, StartCommand)
AddMonitorCommand, CancelAllCommand, ShutdownCommand, StartCommand)
from libmuscle.planner.resources import Core, CoreSet, OnNodeResources


_logger = logging.getLogger(__name__)


Expand All @@ -30,6 +29,7 @@ def __init__(self, node_name: str, server_location: str) -> None:
self._process_manager = ProcessManager()

self._node_name = node_name
self._monitor_pids : List[Tuple[str, int]] = []

_logger.info(f'Connecting to manager at {server_location}')
self._server = MAPClient(self._node_name, server_location)
Expand All @@ -51,6 +51,8 @@ def run(self) -> None:
self._process_manager.start(
command.name, command.work_dir, command.args, command.env,
command.stdout, command.stderr)
elif isinstance(command, AddMonitorCommand):
self._monitor_pids.append((command.instance, command.pid))
elif isinstance(command, CancelAllCommand):
_logger.info('Cancelling all instances')
self._process_manager.cancel_all()
Expand All @@ -60,12 +62,16 @@ def run(self) -> None:
shutting_down = True
_logger.info('Agent shutting down')


finished = self._process_manager.get_finished()
if finished:
for name, exit_code in finished:
_logger.info(f'Process {name} finished with exit code {exit_code}')
self._server.report_result(finished)

if not shutting_down and not finished:
self._server.monitor_usage(self._monitor_pids, _logger)

sleep(0.1)

self._server.close()
Expand Down
34 changes: 32 additions & 2 deletions libmuscle/python/libmuscle/native_instantiator/agent/map_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from typing import Any, List, Optional, Tuple

import msgpack
import psutil

from libmuscle.mcp.protocol import AgentCommandType, RequestType, ResponseType
from libmuscle.mcp.tcp_transport_client import TcpTransportClient
from libmuscle.native_instantiator.agent.agent_commands import (
AgentCommand, StartCommand, CancelAllCommand, ShutdownCommand)
AgentCommand, AddMonitorCommand, StartCommand, CancelAllCommand,
ShutdownCommand)
from libmuscle.planner.resources import OnNodeResources


class MAPClient:
"""The client for the MUSCLE Agent Protocol.

Expand Down Expand Up @@ -44,6 +45,32 @@ def report_resources(self, resources: OnNodeResources) -> None:
resources.node_name, {'cpu': enc_cpu_resources}]
self._call_agent_manager(request)

def monitor_usage(self, pids: List[Tuple[str, int]], logger) -> None:
Comment thread
v1kko marked this conversation as resolved.
Outdated
"""Monitor usage of resources of processes with given (instance_id, pid) on this node."""
if len(pids) == 0:
""" Nothing to monitor, return """
return

usage : Dict[str, Tuple[float, int]] = {}
for instance_id, pid in pids:
try:
process = psutil.Process(pid)
cpu = process.cpu_percent()
mem = process.memory_info().vms
logger.debug(f'PID {pid}: CPU {cpu}%, Memory {mem}')
usage[instance_id] = (cpu, mem)
except psutil.NoSuchProcess:
logger.debug(f'PID {pid}: Process not found')

if len(usage) < 1:
""" Nothing to monitor """
return

request = [
RequestType.MONITOR_USAGE.value,
self._node_name, usage]
self._call_agent_manager(request)
Comment thread
v1kko marked this conversation as resolved.
Outdated

def get_command(self) -> Optional[AgentCommand]:
"""Get a command from the agent manager.

Expand All @@ -67,6 +94,9 @@ def get_command(self) -> Optional[AgentCommand]:
stderr = Path(command[6])

return StartCommand(name, workdir, args, env, stdout, stderr)

elif command[0] == AgentCommandType.ADD_MONITOR.value:
return AddMonitorCommand(command[1], command[2], command[3])

elif command[0] == AgentCommandType.CANCEL_ALL.value:
return CancelAllCommand()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def start(

def monitor_usage(self, instance: str, hostname: str, pid: int) -> None:
"""Monitor usage of resources."""
#self._server.deposit_command(hostname, AddMonitorCommand(instance, hostname, pid))
self._server.deposit_command(hostname, AddMonitorCommand(instance, hostname, pid))

def cancel_all(self) -> None:
"""Cancel all processes.
Expand Down
16 changes: 13 additions & 3 deletions libmuscle/python/libmuscle/native_instantiator/map_server.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import errno
import logging
from typing import Any, Dict, cast, List, Optional
from typing import Any, Dict, cast, List, Optional, Tuple

import msgpack

from libmuscle.mcp.protocol import AgentCommandType, RequestType, ResponseType
from libmuscle.mcp.tcp_transport_server import TcpTransportServer
from libmuscle.mcp.transport_server import RequestHandler
from libmuscle.native_instantiator.agent.agent_commands import (
AgentCommand, CancelAllCommand, ShutdownCommand, StartCommand)
AgentCommand, AddMonitorCommand, CancelAllCommand, ShutdownCommand, StartCommand)
from libmuscle.native_instantiator.iagent_manager import IAgentManager
from libmuscle.planner.resources import Core, CoreSet, OnNodeResources
from libmuscle.post_office import PostOffice
Expand Down Expand Up @@ -49,9 +49,15 @@ def handle_request(self, request: bytes) -> bytes:
response = self._get_command(*req_args)
elif req_type == RequestType.REPORT_RESULT.value:
response = self._report_result(*req_args)

elif req_type == RequestType.MONITOR_USAGE.value:
response = self._monitor_usage(*req_args)
return cast(bytes, msgpack.packb(response, use_bin_type=True))

def _monitor_usage(self, node_name: str, usage: Dict[int, Tuple[float, int]]) -> Any:
"""Handle a monitor usage request."""
print(f"Received usage report {usage} for node {node_name}")
return [ResponseType.SUCCESS.value]

def _report_resources(
self, node_name: str, data: Dict[str, Any]) -> Any:
"""Handle a report resources request.
Expand Down Expand Up @@ -166,6 +172,10 @@ def deposit_command(self, node_name: str, command: AgentCommand) -> None:
AgentCommandType.START.value, command.name, str(command.work_dir),
command.args, command.env, str(command.stdout), str(command.stderr)
]
elif isinstance(command, AddMonitorCommand):
command_obj = [
AgentCommandType.ADD_MONITOR.value, command.instance, command.hostname, command.pid
]
elif isinstance(command, CancelAllCommand):
command_obj = [AgentCommandType.CANCEL_ALL.value]
elif isinstance(command, ShutdownCommand):
Expand Down