Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
45c8a76
Add port and host to register_instance for all languages
v1kko Nov 20, 2025
6d21081
Add extra assertion to tests
v1kko Nov 25, 2025
b1b2021
Add profiling event to profiling database
v1kko Nov 28, 2025
08ef79e
communicate muscle processes to be monitored
v1kko Nov 28, 2025
3bbc2c9
Implementing the Protocol for monitoring
v1kko Nov 28, 2025
d23c172
Add the last mile to get everything in the sqlite database
v1kko Nov 28, 2025
2f157c7
Merge branch 'multiscale:develop' into better_monitoring
v1kko Dec 4, 2025
f57c879
Add (skeleton) MLPServer, split events to events/usage_evenst
v1kko Dec 8, 2025
f867167
Pass resource usage through MLPServer
v1kko Dec 9, 2025
919b62d
Get MLPServer address to Agents
v1kko Dec 10, 2025
c1058a9
remove wrong implementation to monitor
v1kko Dec 10, 2025
6fb36b1
Removing leftovers
v1kko Dec 10, 2025
4a84e76
Fix mypy issues
v1kko Dec 10, 2025
d1d1a32
Make environment optional, if no environment for a new process is spe…
v1kko Dec 10, 2025
4e07f29
Fix all flake8 issues
v1kko Dec 10, 2025
b54c0e3
pass procid and hostname from the instance instead of the MMPClient
v1kko Dec 10, 2025
154f29b
Fix profile_database_test to count forward instead of backwards
v1kko Dec 10, 2025
1366ede
Fix private variable access of the instance_manager
v1kko Dec 10, 2025
0b1d965
Fix mmp_client.cpp::register_instance headers
v1kko Dec 10, 2025
9a65742
Fix report usage implementation, only report back once per second, an…
v1kko Dec 19, 2025
1f9af1a
fix mlp_tests
v1kko Dec 19, 2025
b7cccee
fix threading issue with py3.8
v1kko Dec 19, 2025
bcead64
fix last remaining threading issue by force closing the Pool
v1kko Dec 19, 2025
935fc58
Fix tests by calling MLPClient.close() at the end of the tests
v1kko Dec 20, 2025
8c9a8d3
Usage collection now non-threaded, but with simple timestamps, to sat…
v1kko Dec 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_test/test_cpp_mmp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def mock_remove(name: Reference):
# add some peers
manager._instance_registry.add(
Reference('macro'), ['tcp:test3', 'tcp:test4'],
[Port('out', Operator.O_I), Port('in', Operator.S)])
[Port('out', Operator.O_I), Port('in', Operator.S)], 1234, "test_host")

# create C++ client
# it runs through the various RPC calls
Expand Down
4 changes: 4 additions & 0 deletions integration_test/test_registration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from unittest.mock import patch

import pytest
from os import getpid
from socket import gethostname
from ymmsl import Conduit, Operator, Port, Reference

from libmuscle.mmp_client import MMPClient
Expand All @@ -18,6 +20,8 @@ def test_registration(log_file_in_tmpdir, mmp_server):
assert registry.get_locations(instance_name) == ['tcp:localhost:10000']
assert registry.get_ports(instance_name)[0].name == 'test_in'
assert registry.get_ports(instance_name)[0].operator == Operator.S
assert registry.get_pid(instance_name) == getpid()
assert registry.get_hostname(instance_name) == gethostname()
client.close()


Expand Down
2 changes: 1 addition & 1 deletion libmuscle/cpp/src/libmuscle/instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ void Instance::Impl::register_() {
ProfileEvent register_event(ProfileEventType::register_, ProfileTimestamp());
auto locations = communicator_->get_locations();
auto port_list = list_declared_ports_();
manager_->register_instance(locations, port_list);
manager_->register_instance(locations, port_list, get_process_id(), get_hostname());
profiler_->record_event(std::move(register_event));
log_info("Registered with the manager");
}
Expand Down
7 changes: 5 additions & 2 deletions libmuscle/cpp/src/libmuscle/mmp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ void MMPClient::submit_snapshot_metadata(

void MMPClient::register_instance(
std::vector<std::string> const & locations,
std::vector<::ymmsl::Port> const & ports)
std::vector<::ymmsl::Port> const & ports,
int const process_id,
std::string const & hostname)
{
auto encoded_locs = encode_vector(locations);
auto encoded_ports = Data::nils(ports.size());
Expand All @@ -191,7 +193,8 @@ void MMPClient::register_instance(
auto request = Data::list(
static_cast<int>(RequestType::register_instance),
static_cast<std::string>(instance_id_), encoded_locs,
encoded_ports, MUSCLE3_VERSION);
encoded_ports, process_id, hostname,
MUSCLE3_VERSION);

auto response = call_manager_(request);

Expand Down
4 changes: 3 additions & 1 deletion libmuscle/cpp/src/libmuscle/mmp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ class MMPClient {
*/
void register_instance(
std::vector<std::string> const & locations,
std::vector<::ymmsl::Port> const & ports);
std::vector<::ymmsl::Port> const & ports,
int const process_id,
std::string const & hostname);

/** Request connection information about peers.
*
Expand Down
3 changes: 2 additions & 1 deletion libmuscle/cpp/src/libmuscle/tests/mmp_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void test_submit_log_message(MMPClient & client) {
void test_register_instance(MMPClient & client) {
client.register_instance(
{"tcp:test1", "tcp:test2"},
{Port("out", Operator::O_F), Port("in", Operator::F_INIT)});
{Port("out", Operator::O_F), Port("in", Operator::F_INIT)},
1235, "localhost");
}

void test_request_peers(MMPClient & client) {
Expand Down
4 changes: 2 additions & 2 deletions libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ class MockMMPClient : public MockClass<MockMMPClient> {

MockFun<
Void, Val<std::vector<std::string> const &>,
Val<std::vector<::ymmsl::Port> const &>
> register_instance;
Val<std::vector<::ymmsl::Port> const &>,
Val<int>, Val<std::string const &>> register_instance;

MockFun<Val<std::tuple<
std::vector<::ymmsl::Conduit>,
Expand Down
13 changes: 13 additions & 0 deletions libmuscle/cpp/src/libmuscle/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
#include <stdexcept>
#include <string>
#include <sys/stat.h>
#include <sys/stat.h>
#include <thread>
#include <unistd.h>
#include <limits.h>


using std::chrono::duration;
Expand Down Expand Up @@ -114,6 +117,16 @@ bool Retrier::should_give_up() {
const double Retrier::default_base_delay_ = 0.5;
const double Retrier::default_timeout_ = 30.0;
const double Retrier::factor_ = std::pow(2.0, 1.0 / 3.0);
int get_process_id() {
return ::getpid();
}

std::string get_hostname() {
char hostname[HOST_NAME_MAX+1];
if (::gethostname(hostname, HOST_NAME_MAX+1) != 0)
throw std::runtime_error("Could not get hostname");
return std::string(hostname);
}

} }

8 changes: 8 additions & 0 deletions libmuscle/cpp/src/libmuscle/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ std::ostream & operator<<(std::ostream & os, Optional<T> const & t);
*/
double time_monotonic();

/* Return the current process ID.
*/
int get_process_id();

/* Return the hostname.
*/
std::string get_hostname();


/* Helper class for retrying things with a delay and timeout.
*
Expand Down
20 changes: 17 additions & 3 deletions libmuscle/python/libmuscle/manager/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from libmuscle.manager.instance_registry import InstanceRegistry
from libmuscle.manager.instantiator import (
CancelAllRequest, CrashedResult, InstantiatorRequest,
InstantiationRequest, Process, ProcessStatus, ShutdownRequest)
InstantiationRequest, Process, ProcessStatus, ShutdownRequest,
MonitorRequest)
from libmuscle.manager.logger import last_lines
from libmuscle.manager.run_dir import RunDir
from libmuscle.native_instantiator.native_instantiator import NativeInstantiator
Expand Down Expand Up @@ -62,17 +63,19 @@ class InstanceManager:
"""Instantiates and manages running instances"""
def __init__(
self, configuration: Configuration, run_dir: RunDir,
instance_registry: InstanceRegistry) -> None:
instance_registry: InstanceRegistry, mlp_location: str) -> None:
"""Create an InstanceManager.

Args:
configuration: The global configuration
run_dir: Directory to run in
instance_registry: The InstanceRegistry to use
mlp_location: Location of the MUSCLE Log Protocol server
"""
self._configuration = configuration
self._run_dir = run_dir
self._instance_registry = instance_registry
self._mlp_location = mlp_location

self._resources_in: Queue[Resources] = Queue()
self._requests_out: Queue[InstantiatorRequest] = Queue()
Expand All @@ -81,7 +84,8 @@ def __init__(

self._instantiator = NativeInstantiator(
self._resources_in, self._requests_out, self._results_in,
self._log_records_in, self._run_dir.path)
self._log_records_in, self._run_dir.path,
mlp_location=self._mlp_location)
self._instantiator.start()

self._log_handler = LogHandlingThread(self._log_records_in)
Expand Down Expand Up @@ -158,6 +162,16 @@ def get_resources(self) -> Dict[Reference, ResourceAssignment]:

return self._allocations

def monitor_process(self, instance_id: str, hostname: str, process_id: int) -> None:
"""Monitor a process for resource usage.

Args:
instance_id: The ID of the instance
hostname: The hostname of the process
process_id: The process ID of the process
"""
self._requests_out.put(MonitorRequest(instance_id, hostname, process_id))

def wait(self) -> bool:
"""Waits for all instances to be done."""
all_seemingly_okay = True
Expand Down
36 changes: 34 additions & 2 deletions libmuscle/python/libmuscle/manager/instance_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ def __init__(self) -> None:
self._deregistered_one = Condition() # doubles as lock
self._locations: Dict[Reference, List[str]] = {}
self._ports: Dict[Reference, List[Port]] = {}
self._pids: Dict[Reference, int] = {}
self._hostnames: Dict[Reference, str] = {}
self._seen: Set[Reference] = set()
self._startup = True

def add(self, name: Reference, locations: List[str], ports: List[Port]
) -> None:
def add(self, name: Reference, locations: List[str], ports: List[Port],
pid: int, hostname: str) -> None:
"""Add an instance to the registry.

Args:
name: Name of the instance.
locations: Network locations where it can be reached.
ports: List of ports of this instance.
pid: PID of the instance
hostname: Hostname of the instance

Raises:
ValueError: If an instance with this name has already been
Expand All @@ -42,6 +46,8 @@ def add(self, name: Reference, locations: List[str], ports: List[Port]

self._locations[name] = locations
self._ports[name] = ports
self._pids[name] = pid
self._hostnames[name] = hostname
self._seen.add(name)
self._startup = False

Expand Down Expand Up @@ -69,6 +75,30 @@ def get_ports(self, name: Reference) -> List[Port]:
with self._deregistered_one:
return self._ports[name]

def get_pid(self, name: Reference) -> int:
"""Retrieves the PID of a registered instance.

Args:
name: The name of the instance to get the PID of.

Raises:
KeyError: If no instance with this name was registered.
"""
with self._deregistered_one:
return self._pids[name]

def get_hostname(self, name: Reference) -> str:
"""Retrieves the hostname of a registered instance.

Args:
name: The name of the instance to get the hostname of.

Raises:
KeyError: If no instance with this name was registered.
"""
with self._deregistered_one:
return self._hostnames[name]

def remove(self, name: Reference) -> None:
"""Remove an instance from the registry.

Expand All @@ -81,6 +111,8 @@ def remove(self, name: Reference) -> None:
with self._deregistered_one:
del self._locations[name]
del self._ports[name]
del self._pids[name]
del self._hostnames[name]
self._deregistered_one.notify()

def did_register(self, name: Reference) -> bool:
Expand Down
8 changes: 8 additions & 0 deletions libmuscle/python/libmuscle/manager/instantiator.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ def __init__(
self.stderr_path = stderr_path


class MonitorRequest(InstantiatorRequest):
"""Requests monitoring a process."""
def __init__(self, instance: str, hostname: str, pid: int) -> None:
self.instance = instance
self.hostname = hostname
self.pid = pid


class CancelAllRequest(InstantiatorRequest):
"""Requests stopping all running processes."""
pass
Expand Down
10 changes: 8 additions & 2 deletions libmuscle/python/libmuscle/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from libmuscle.manager.mmp_server import MMPServer
from libmuscle.manager.instance_manager import InstanceManager
from libmuscle.manager.profile_store import ProfileStore
from libmuscle.manager.mlp_server import MLPServer
from libmuscle.manager.run_dir import RunDir
from libmuscle.manager.snapshot_registry import SnapshotRegistry
from libmuscle.manager.topology_store import TopologyStore
Expand Down Expand Up @@ -67,12 +68,15 @@ def __init__(
for c in self._configuration.model.components
for instance_name in c.instances()])

self._mlp_server = MLPServer(self._logger, self._profile_store)

self._instance_manager: Optional[InstanceManager] = None
try:
configuration = self._configuration.as_configuration()
if self._run_dir is not None:
self._instance_manager = InstanceManager(
configuration, self._run_dir, self._instance_registry)
configuration, self._run_dir, self._instance_registry,
mlp_location=self._mlp_server.get_location())
except ValueError:
pass

Expand All @@ -85,7 +89,8 @@ def __init__(
self._server = MMPServer(
self._logger, self._profile_store, self._configuration,
self._instance_registry, self._topology_store,
self._snapshot_registry, self._deadlock_detector, run_dir)
self._snapshot_registry, self._deadlock_detector, run_dir,
instance_manager=self._instance_manager)

if self._instance_manager:
self._instance_manager.set_manager_location(
Expand Down Expand Up @@ -124,6 +129,7 @@ def stop(self) -> None:
if self._instance_manager:
self._instance_manager.shutdown()
self._server.stop()
self._mlp_server.stop()
self._snapshot_registry.shutdown()
self._snapshot_registry.join()
self._profile_store.shutdown()
Expand Down
Loading
Loading