Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
45c8a76
Add port and host to register_instance for all languages
v1kko Nov 20, 2025
6d21081
Add extra assertion to tests
v1kko Nov 25, 2025
b1b2021
Add profiling event to profiling database
v1kko Nov 28, 2025
08ef79e
communicate muscle processes to be monitored
v1kko Nov 28, 2025
3bbc2c9
Implementing the Protocol for monitoring
v1kko Nov 28, 2025
d23c172
Add the last mile to get everything in the sqlite database
v1kko Nov 28, 2025
2f157c7
Merge branch 'multiscale:develop' into better_monitoring
v1kko Dec 4, 2025
f57c879
Add (skeleton) MLPServer, split events to events/usage_evenst
v1kko Dec 8, 2025
f867167
Pass resource usage through MLPServer
v1kko Dec 9, 2025
919b62d
Get MLPServer address to Agents
v1kko Dec 10, 2025
c1058a9
remove wrong implementation to monitor
v1kko Dec 10, 2025
6fb36b1
Removing leftovers
v1kko Dec 10, 2025
4a84e76
Fix mypy issues
v1kko Dec 10, 2025
d1d1a32
Make environment optional, if no environment for a new process is spe…
v1kko Dec 10, 2025
4e07f29
Fix all flake8 issues
v1kko Dec 10, 2025
b54c0e3
pass procid and hostname from the instance instead of the MMPClient
v1kko Dec 10, 2025
154f29b
Fix profile_database_test to count forward instead of backwards
v1kko Dec 10, 2025
1366ede
Fix private variable access of the instance_manager
v1kko Dec 10, 2025
0b1d965
Fix mmp_client.cpp::register_instance headers
v1kko Dec 10, 2025
9a65742
Fix report usage implementation, only report back once per second, an…
v1kko Dec 19, 2025
1f9af1a
fix mlp_tests
v1kko Dec 19, 2025
b7cccee
fix threading issue with py3.8
v1kko Dec 19, 2025
bcead64
fix last remaining threading issue by force closing the Pool
v1kko Dec 19, 2025
935fc58
Fix tests by calling MLPClient.close() at the end of the tests
v1kko Dec 20, 2025
8c9a8d3
Usage collection now non-threaded, but with simple timestamps, to sat…
v1kko Dec 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_test/test_cpp_mmp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def mock_remove(name: Reference):
# add some peers
manager._instance_registry.add(
Reference('macro'), ['tcp:test3', 'tcp:test4'],
[Port('out', Operator.O_I), Port('in', Operator.S)])
[Port('out', Operator.O_I), Port('in', Operator.S)], 1234, "test_host")

# create C++ client
# it runs through the various RPC calls
Expand Down
4 changes: 4 additions & 0 deletions integration_test/test_registration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from unittest.mock import patch

import pytest
from os import getpid
from socket import gethostname
from ymmsl import Conduit, Operator, Port, Reference

from libmuscle.mmp_client import MMPClient
Expand All @@ -18,6 +20,8 @@ def test_registration(log_file_in_tmpdir, mmp_server):
assert registry.get_locations(instance_name) == ['tcp:localhost:10000']
assert registry.get_ports(instance_name)[0].name == 'test_in'
assert registry.get_ports(instance_name)[0].operator == Operator.S
assert registry.get_pid(instance_name) == getpid()
assert registry.get_hostname(instance_name) == gethostname()
client.close()


Expand Down
3 changes: 2 additions & 1 deletion libmuscle/cpp/src/libmuscle/mmp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ void MMPClient::register_instance(
auto request = Data::list(
static_cast<int>(RequestType::register_instance),
static_cast<std::string>(instance_id_), encoded_locs,
encoded_ports, MUSCLE3_VERSION);
encoded_ports, get_process_id(), get_hostname(),
Comment thread
v1kko marked this conversation as resolved.
Outdated
MUSCLE3_VERSION);

auto response = call_manager_(request);

Expand Down
13 changes: 13 additions & 0 deletions libmuscle/cpp/src/libmuscle/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
#include <stdexcept>
#include <string>
#include <sys/stat.h>
#include <sys/stat.h>
#include <thread>
#include <unistd.h>
#include <limits.h>


using std::chrono::duration;
Expand Down Expand Up @@ -114,6 +117,16 @@ bool Retrier::should_give_up() {
const double Retrier::default_base_delay_ = 0.5;
const double Retrier::default_timeout_ = 30.0;
const double Retrier::factor_ = std::pow(2.0, 1.0 / 3.0);
int get_process_id() {
return ::getpid();
}

std::string get_hostname() {
char hostname[HOST_NAME_MAX+1];
if (::gethostname(hostname, HOST_NAME_MAX+1) != 0)
throw std::runtime_error("Could not get hostname");
return std::string(hostname);
}

} }

8 changes: 8 additions & 0 deletions libmuscle/cpp/src/libmuscle/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ std::ostream & operator<<(std::ostream & os, Optional<T> const & t);
*/
double time_monotonic();

/* Return the current process ID.
*/
int get_process_id();

/* Return the hostname.
*/
std::string get_hostname();


/* Helper class for retrying things with a delay and timeout.
*
Expand Down
53 changes: 35 additions & 18 deletions libmuscle/python/libmuscle/manager/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from libmuscle.native_instantiator.native_instantiator import NativeInstantiator
from libmuscle.planner.planner import Planner, ResourceAssignment
from libmuscle.planner.resources import Resources
from libmuscle.profiling import ProfileEvent
from libmuscle.manager.profile_store import ProfileStore


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,26 +64,31 @@ class InstanceManager:
"""Instantiates and manages running instances"""
def __init__(
self, configuration: Configuration, run_dir: RunDir,
instance_registry: InstanceRegistry) -> None:
instance_registry: InstanceRegistry,
profile_store: ProfileStore) -> None:
"""Create an InstanceManager.

Args:
configuration: The global configuration
run_dir: Directory to run in
instance_registry: The InstanceRegistry to use
profile_store: The ProfileStore to use
"""
self._configuration = configuration
self._run_dir = run_dir
self._instance_registry = instance_registry
self._profile_store = profile_store

self._resources_in: Queue[Resources] = Queue()
self._requests_out: Queue[InstantiatorRequest] = Queue()
self._results_in: Queue[_ResultType] = Queue()
self._log_records_in: Queue[logging.LogRecord] = Queue()
self._profile_events_in: Queue[List[Tuple[str, ProfileEvent]]] = Queue()

self._instantiator = NativeInstantiator(
self._resources_in, self._requests_out, self._results_in,
self._log_records_in, self._run_dir.path)
self._log_records_in, self._profile_events_in,
self._run_dir.path)
self._instantiator.start()

self._log_handler = LogHandlingThread(self._log_records_in)
Expand Down Expand Up @@ -172,23 +179,33 @@ def cancel_all() -> None:
results: List[Process] = list()

while self._num_running > 0:
result = self._results_in.get()
try:
Comment thread
v1kko marked this conversation as resolved.
Outdated
profile_events = self._profile_events_in.get_nowait()
for instance_id, event in profile_events:
self._profile_store.add_event(Reference(instance_id), event)
except Empty:
pass

if isinstance(result, CrashedResult):
if isinstance(result.exception, ConfigurationError):
_logger.error(str(result.exception))
else:
_logger.error(
'Instantiator crashed. This should not happen, please file'
' a bug report.')
return False

results.append(result)
if result.status != ProcessStatus.CANCELED:
registered = self._instance_registry.did_register(result.instance)
if result.exit_code != 0 or not registered:
cancel_all()
self._num_running -= 1
try:
result = self._results_in.get_nowait()

if isinstance(result, CrashedResult):
if isinstance(result.exception, ConfigurationError):
_logger.error(str(result.exception))
else:
_logger.error(
'Instantiator crashed. This should not happen, please file'
' a bug report.')
return False

results.append(result)
if result.status != ProcessStatus.CANCELED:
registered = self._instance_registry.did_register(result.instance)
if result.exit_code != 0 or not registered:
cancel_all()
self._num_running -= 1
except Empty:
continue

# Summarise outcome
crashes: List[Tuple[Process, Path]] = list()
Expand Down
36 changes: 34 additions & 2 deletions libmuscle/python/libmuscle/manager/instance_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ def __init__(self) -> None:
self._deregistered_one = Condition() # doubles as lock
self._locations: Dict[Reference, List[str]] = {}
self._ports: Dict[Reference, List[Port]] = {}
self._pids: Dict[Reference, int] = {}
self._hostnames: Dict[Reference, str] = {}
self._seen: Set[Reference] = set()
self._startup = True

def add(self, name: Reference, locations: List[str], ports: List[Port]
) -> None:
def add(self, name: Reference, locations: List[str], ports: List[Port],
pid: int, hostname: str) -> None:
"""Add an instance to the registry.

Args:
name: Name of the instance.
locations: Network locations where it can be reached.
ports: List of ports of this instance.
pid: PID of the instance
hostname: Hostname of the instance

Raises:
ValueError: If an instance with this name has already been
Expand All @@ -42,6 +46,8 @@ def add(self, name: Reference, locations: List[str], ports: List[Port]

self._locations[name] = locations
self._ports[name] = ports
self._pids[name] = pid
self._hostnames[name] = hostname
self._seen.add(name)
self._startup = False

Expand Down Expand Up @@ -69,6 +75,30 @@ def get_ports(self, name: Reference) -> List[Port]:
with self._deregistered_one:
return self._ports[name]

def get_pid(self, name: Reference) -> int:
"""Retrieves the PID of a registered instance.

Args:
name: The name of the instance to get the PID of.

Raises:
KeyError: If no instance with this name was registered.
"""
with self._deregistered_one:
return self._pids[name]

def get_hostname(self, name: Reference) -> str:
"""Retrieves the hostname of a registered instance.

Args:
name: The name of the instance to get the hostname of.

Raises:
KeyError: If no instance with this name was registered.
"""
with self._deregistered_one:
return self._hostnames[name]

def remove(self, name: Reference) -> None:
"""Remove an instance from the registry.

Expand All @@ -81,6 +111,8 @@ def remove(self, name: Reference) -> None:
with self._deregistered_one:
del self._locations[name]
del self._ports[name]
del self._pids[name]
del self._hostnames[name]
self._deregistered_one.notify()

def did_register(self, name: Reference) -> bool:
Expand Down
6 changes: 6 additions & 0 deletions libmuscle/python/libmuscle/manager/instantiator.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ def __init__(
self.stdout_path = stdout_path
self.stderr_path = stderr_path

class MonitorRequest(InstantiatorRequest):
"""Requests monitoring a process."""
def __init__(self, instance: str, hostname: str, pid: int) -> None:
self.instance = instance
self.hostname = hostname
self.pid = pid

class CancelAllRequest(InstantiatorRequest):
"""Requests stopping all running processes."""
Expand Down
6 changes: 4 additions & 2 deletions libmuscle/python/libmuscle/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def __init__(
configuration = self._configuration.as_configuration()
if self._run_dir is not None:
self._instance_manager = InstanceManager(
configuration, self._run_dir, self._instance_registry)
configuration, self._run_dir, self._instance_registry,
profile_store=self._profile_store)
except ValueError:
pass

Expand All @@ -85,7 +86,8 @@ def __init__(
self._server = MMPServer(
self._logger, self._profile_store, self._configuration,
self._instance_registry, self._topology_store,
self._snapshot_registry, self._deadlock_detector, run_dir)
self._snapshot_registry, self._deadlock_detector, run_dir,
instance_manager=self._instance_manager)

if self._instance_manager:
self._instance_manager.set_manager_location(
Expand Down
23 changes: 17 additions & 6 deletions libmuscle/python/libmuscle/manager/mmp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
ProfileEvent, ProfileEventType, ProfileTimestamp)
from libmuscle.snapshot import SnapshotMetadata
from libmuscle.timestamp import Timestamp
from libmuscle.manager.instance_manager import InstanceManager
from libmuscle.manager.instantiator import MonitorRequest


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -79,7 +81,8 @@ def __init__(
topology_store: TopologyStore,
snapshot_registry: SnapshotRegistry,
deadlock_detector: DeadlockDetector,
run_dir: Optional[RunDir]
run_dir: Optional[RunDir],
instance_manager: Optional[InstanceManager] = None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be optional, there's always an InstanceManager, so we can always pass it. I really want to have the runtime structure as inflexible (and therefore comprehensible) as possible.

@v1kko v1kko Dec 10, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed: the InstanceManager is Optional, as muscle can run without it. If you want to make the InstanceManager mandatory, I think that should be another PR

) -> None:
"""Create an MMPRequestHandler.

Expand All @@ -98,6 +101,7 @@ def __init__(
self._deadlock_detector = deadlock_detector
self._run_dir = run_dir
self._reference_time = time.monotonic()
self._instance_manager = instance_manager

def handle_request(self, request: bytes) -> bytes:
"""Handles a manager request.
Expand Down Expand Up @@ -146,13 +150,15 @@ def close(self) -> None:

def _register_instance(
self, instance_id: str, locations: List[str],
ports: List[List[str]], version: str = '') -> Any:
ports: List[List[str]], pid: int, hostname: str, version: str = '') -> Any:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instances that use MPI will have a hostname/pid for each rank. How would that be passed here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the process calling "register_instance" will be monitored, if we want to also monitor all the other MPI processes, then I think that the following needs to be done:

  • The MPI Process that calls register_instance must provide all PIDS and hostnames
  • We need to call this method for each provided PID/Hostname Pair

I think the backend is flexible enough to have Many PIDs for a single Instance ID

"""Handle a register instance request.

Args:
instance_id: ID of the instance to register
locations: Locations where it can be reached
ports: Ports of this instance
pid: PID of the instance
hostname: Hostname of the instance
version: Version of libmuscle that this instance uses

Returns:
Expand All @@ -169,11 +175,14 @@ def _register_instance(
f' manager libmuscle version ({libmuscle.__version__}).'
' Please ensure that the instance and the manager use the'
' same version of libmuscle.']

port_objs = [decode_port(p) for p in ports]
instance = Reference(instance_id)

if self._instance_manager:
self._instance_manager._requests_out.put(MonitorRequest(instance_id, hostname, pid))
Comment thread
v1kko marked this conversation as resolved.
Outdated

try:
self._instance_registry.add(instance, locations, port_objs)
self._instance_registry.add(instance, locations, port_objs, pid, hostname)

_logger.info(f'Registered instance {instance_id}')
return [ResponseType.SUCCESS.value]
Expand Down Expand Up @@ -417,7 +426,8 @@ def __init__(
topology_store: TopologyStore,
snapshot_registry: SnapshotRegistry,
deadlock_detector: DeadlockDetector,
run_dir: Optional[RunDir]
run_dir: Optional[RunDir],
instance_manager: Optional[InstanceManager]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about optionality, and before run_dir with the other system components makes more sense.

) -> None:
"""Create an MMPServer.

Expand All @@ -439,7 +449,8 @@ def __init__(
"""
self._handler = MMPRequestHandler(
logger, profile_store, configuration, instance_registry,
topology_store, snapshot_registry, deadlock_detector, run_dir)
topology_store, snapshot_registry, deadlock_detector, run_dir,
instance_manager=instance_manager)
try:
self._server = TcpTransportServer(self._handler, 9000)
except OSError as e:
Expand Down
Loading
Loading