Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
45c8a76
Add port and host to register_instance for all languages
v1kko Nov 20, 2025
6d21081
Add extra assertion to tests
v1kko Nov 25, 2025
b1b2021
Add profiling event to profiling database
v1kko Nov 28, 2025
08ef79e
communicate muscle processes to be monitored
v1kko Nov 28, 2025
3bbc2c9
Implementing the Protocol for monitoring
v1kko Nov 28, 2025
d23c172
Add the last mile to get everything in the sqlite database
v1kko Nov 28, 2025
2f157c7
Merge branch 'multiscale:develop' into better_monitoring
v1kko Dec 4, 2025
f57c879
Add (skeleton) MLPServer, split events to events/usage_evenst
v1kko Dec 8, 2025
f867167
Pass resource usage through MLPServer
v1kko Dec 9, 2025
919b62d
Get MLPServer address to Agents
v1kko Dec 10, 2025
c1058a9
remove wrong implementation to monitor
v1kko Dec 10, 2025
6fb36b1
Removing leftovers
v1kko Dec 10, 2025
4a84e76
Fix mypy issues
v1kko Dec 10, 2025
d1d1a32
Make environment optional, if no environment for a new process is spe…
v1kko Dec 10, 2025
4e07f29
Fix all flake8 issues
v1kko Dec 10, 2025
b54c0e3
pass procid and hostname from the instance instead of the MMPClient
v1kko Dec 10, 2025
154f29b
Fix profile_database_test to count forward instead of backwards
v1kko Dec 10, 2025
1366ede
Fix private variable access of the instance_manager
v1kko Dec 10, 2025
0b1d965
Fix mmp_client.cpp::register_instance headers
v1kko Dec 10, 2025
9a65742
Fix report usage implementation, only report back once per second, an…
v1kko Dec 19, 2025
1f9af1a
fix mlp_tests
v1kko Dec 19, 2025
b7cccee
fix threading issue with py3.8
v1kko Dec 19, 2025
bcead64
fix last remaining threading issue by force closing the Pool
v1kko Dec 19, 2025
935fc58
Fix tests by calling MLPClient.close() at the end of the tests
v1kko Dec 20, 2025
8c9a8d3
Usage collection now non-threaded, but with simple timestamps, to sat…
v1kko Dec 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_test/test_cpp_mmp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def mock_remove(name: Reference):
# add some peers
manager._instance_registry.add(
Reference('macro'), ['tcp:test3', 'tcp:test4'],
[Port('out', Operator.O_I), Port('in', Operator.S)])
[Port('out', Operator.O_I), Port('in', Operator.S)], 1234, "test_host")

# create C++ client
# it runs through the various RPC calls
Expand Down
3 changes: 2 additions & 1 deletion libmuscle/cpp/src/libmuscle/mmp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ void MMPClient::register_instance(
auto request = Data::list(
static_cast<int>(RequestType::register_instance),
static_cast<std::string>(instance_id_), encoded_locs,
encoded_ports, MUSCLE3_VERSION);
encoded_ports, get_process_id(), get_hostname(),
Comment thread
v1kko marked this conversation as resolved.
Outdated
MUSCLE3_VERSION);

auto response = call_manager_(request);

Expand Down
13 changes: 13 additions & 0 deletions libmuscle/cpp/src/libmuscle/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
#include <stdexcept>
#include <string>
#include <sys/stat.h>
#include <sys/stat.h>
#include <thread>
#include <unistd.h>
#include <limits.h>


using std::chrono::duration;
Expand Down Expand Up @@ -114,6 +117,16 @@ bool Retrier::should_give_up() {
const double Retrier::default_base_delay_ = 0.5;
const double Retrier::default_timeout_ = 30.0;
const double Retrier::factor_ = std::pow(2.0, 1.0 / 3.0);
int get_process_id() {
return ::getpid();
}

std::string get_hostname() {
char hostname[HOST_NAME_MAX+1];
if (::gethostname(hostname, HOST_NAME_MAX+1) != 0)
throw std::runtime_error("Could not get hostname");
return std::string(hostname);
}

} }

8 changes: 8 additions & 0 deletions libmuscle/cpp/src/libmuscle/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ std::ostream & operator<<(std::ostream & os, Optional<T> const & t);
*/
double time_monotonic();

/* Return the current process ID.
*/
int get_process_id();

/* Return the hostname.
*/
std::string get_hostname();


/* Helper class for retrying things with a delay and timeout.
*
Expand Down
12 changes: 10 additions & 2 deletions libmuscle/python/libmuscle/manager/instance_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ def __init__(self) -> None:
self._deregistered_one = Condition() # doubles as lock
self._locations: Dict[Reference, List[str]] = {}
self._ports: Dict[Reference, List[Port]] = {}
self._pids: Dict[Reference, int] = {}
self._hostnames: Dict[Reference, str] = {}
self._seen: Set[Reference] = set()
self._startup = True

def add(self, name: Reference, locations: List[str], ports: List[Port]
) -> None:
def add(self, name: Reference, locations: List[str], ports: List[Port],
pid: int, hostname: str) -> None:
"""Add an instance to the registry.

Args:
name: Name of the instance.
locations: Network locations where it can be reached.
ports: List of ports of this instance.
pid: PID of the instance
hostname: Hostname of the instance

Raises:
ValueError: If an instance with this name has already been
Expand All @@ -42,6 +46,8 @@ def add(self, name: Reference, locations: List[str], ports: List[Port]

self._locations[name] = locations
self._ports[name] = ports
self._pids[name] = pid
self._hostnames[name] = hostname
self._seen.add(name)
self._startup = False

Expand Down Expand Up @@ -81,6 +87,8 @@ def remove(self, name: Reference) -> None:
with self._deregistered_one:
del self._locations[name]
del self._ports[name]
del self._pids[name]
del self._hostnames[name]
self._deregistered_one.notify()

def did_register(self, name: Reference) -> bool:
Expand Down
7 changes: 4 additions & 3 deletions libmuscle/python/libmuscle/manager/mmp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ def close(self) -> None:

def _register_instance(
self, instance_id: str, locations: List[str],
ports: List[List[str]], version: str = '') -> Any:
ports: List[List[str]], pid: int, hostname: str, version: str = '') -> Any:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instances that use MPI will have a hostname/pid for each rank. How would that be passed here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the process calling "register_instance" will be monitored, if we want to also monitor all the other MPI processes, then I think that the following needs to be done:

  • The MPI Process that calls register_instance must provide all PIDS and hostnames
  • We need to call this method for each provided PID/Hostname Pair

I think the backend is flexible enough to have Many PIDs for a single Instance ID

"""Handle a register instance request.

Args:
instance_id: ID of the instance to register
locations: Locations where it can be reached
ports: Ports of this instance
pid: PID of the instance
hostname: Hostname of the instance
version: Version of libmuscle that this instance uses

Returns:
Expand All @@ -169,11 +171,10 @@ def _register_instance(
f' manager libmuscle version ({libmuscle.__version__}).'
' Please ensure that the instance and the manager use the'
' same version of libmuscle.']

port_objs = [decode_port(p) for p in ports]
instance = Reference(instance_id)
try:
self._instance_registry.add(instance, locations, port_objs)
self._instance_registry.add(instance, locations, port_objs, pid, hostname)

_logger.info(f'Registered instance {instance_id}')
return [ResponseType.SUCCESS.value]
Expand Down
10 changes: 5 additions & 5 deletions libmuscle/python/libmuscle/manager/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ def mmp_request_handler(

@pytest.fixture
def loaded_instance_registry(instance_registry):
instance_registry.add(Reference('macro'), ['direct:macro'], [])
instance_registry.add(Reference('macro'), ['direct:macro'], [], 10, 'test_host')
for j in range(10):
for i in range(10):
name = Reference('micro') + j + i
location = 'direct:{}'.format(name)
instance_registry.add(name, [location], [])
instance_registry.add(name, [location], [], 100 + j + i * 10, 'test_host')
return instance_registry


Expand Down Expand Up @@ -121,18 +121,18 @@ def snapshot_registry2(mmp_configuration2, topology_store) -> SnapshotRegistry:
def loaded_instance_registry2():
instance_registry = InstanceRegistry()

instance_registry.add(Reference('macro'), ['direct:macro'], [])
instance_registry.add(Reference('macro'), ['direct:macro'], [], 10, 'test_host')

for j in range(5):
name = Reference('meso') + j
location = 'direct:{}'.format(name)
instance_registry.add(name, [location], [])
instance_registry.add(name, [location], [], 10 + j, 'test_host')

for j in range(5):
for i in range(10):
name = Reference('micro') + j + i
location = 'direct:{}'.format(name)
instance_registry.add(name, [location], [])
instance_registry.add(name, [location], [], 100 + j*10 + i, 'test_host')
return instance_registry


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_port(port):


def test_registry_add(registry, port):
registry.add('instance1', 'tcp://localhost:6253', [port])
registry.add('instance1', 'tcp://localhost:6253', [port], 12345, 'test_host')
assert (registry._locations['instance1'] ==
'tcp://localhost:6253')
assert registry._ports['instance1'] == [port]
Expand All @@ -31,6 +31,8 @@ def test_registry_get(registry, port):
registry._locations['instance1'] = [
'tcp://localhost:6253']
registry._ports['instance1'] = [port]
registry._pids['instance1'] = 12345
registry._hostnames['instance1'] = 'test_host'
assert registry.get_locations('instance1') == ['tcp://localhost:6253']
assert registry.get_ports('instance1') == [port]

Expand All @@ -45,6 +47,8 @@ def test_registry_remove(registry, port):
registry._locations['instance1'] = [
'tcp://localhost:6253']
registry._ports['instance1'] = [port]
registry._pids['instance1'] = 12345
registry._hostnames['instance1'] = 'test_host'
registry.remove('instance1')
assert 'instance1' not in registry._locations
assert 'instance1' not in registry._ports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def test_register_instance(mmp_request_handler, instance_registry):
'test_instance',
['tcp://localhost:10000'],
[['test_in', 'F_INIT']],
12345,
'test_host',
libmuscle.__version__]
encoded_request = msgpack.packb(request, use_bin_type=True)

Expand All @@ -102,7 +104,9 @@ def test_register_instance_no_version(mmp_request_handler):
RequestType.REGISTER_INSTANCE.value,
'test_instance',
['tcp://localhost:10000'],
[['test_in', 'F_INIT']]]
[['test_in', 'F_INIT']],
12345,
'test_host']
encoded_request = msgpack.packb(request, use_bin_type=True)

result = mmp_request_handler.handle_request(encoded_request)
Expand All @@ -118,6 +122,8 @@ def test_register_instance_version_mismatch(mmp_request_handler):
'test_instance',
['tcp://localhost:10000'],
[['test_in', 'F_INIT']],
12345,
'test_host',
libmuscle.__version__ + "dev"]
encoded_request = msgpack.packb(request, use_bin_type=True)

Expand Down Expand Up @@ -184,6 +190,8 @@ def test_double_register_instance(mmp_request_handler):
'test_instance',
['tcp://localhost:10000'],
[['test_in', 'F_INIT']],
12345,
'test_host',
libmuscle.__version__]
encoded_request = msgpack.packb(request, use_bin_type=True)

Expand Down
3 changes: 3 additions & 0 deletions libmuscle/python/libmuscle/mmp_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import dataclasses
from os import getpid
from pathlib import Path
from random import uniform
from socket import gethostname
from threading import get_ident, RLock
from time import perf_counter, sleep
from typing import Any, Dict, Iterable, List, Optional, Tuple
Expand Down Expand Up @@ -232,6 +234,7 @@ def register_instance(
RequestType.REGISTER_INSTANCE.value,
str(self._instance_id), locations,
[encode_port(p) for p in ports],
getpid(), gethostname(),
libmuscle.__version__]
response = self._call_manager(request)
if response[0] == ResponseType.ERROR.value:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_exit_code(lpm, tmp_path):
def test_multiple(lpm, tmp_path):
for i in range(3):
lpm.start(
f'test_{i}', tmp_path, ['bash', '-c', 'sleep 1'], {},
f'test_{i}', tmp_path, ['bash', '-c', '/usr/bin/env sleep 1'], {},
Comment thread
v1kko marked this conversation as resolved.
Outdated
tmp_path / f'out{i}', tmp_path / f'err{i}')

completed_jobs = _poll_completion(lpm, 3)
Expand All @@ -85,7 +85,7 @@ def test_cancel_all(lpm, tmp_path):

for i in range(2):
lpm.start(
f'test_{i}', tmp_path, ['bash', '-c', 'sleep 1'], {},
f'test_{i}', tmp_path, ['bash', '-c', '/usr/bin/env sleep 1'], {},
tmp_path / f'out{i}', tmp_path / f'err{i}')

lpm.cancel_all()
Expand All @@ -100,7 +100,7 @@ def test_cancel_all(lpm, tmp_path):

def test_output_redirect(lpm, tmp_path):
lpm.start(
'test', tmp_path, ['bash', '-c', 'ls'], {},
'test', tmp_path, ['bash', '-c', '/usr/bin/env ls'], {},
tmp_path / 'out', tmp_path / 'err')
_poll_completion(lpm, 1)
with (tmp_path / 'out').open('r') as f:
Expand All @@ -111,7 +111,7 @@ def test_output_redirect(lpm, tmp_path):

def test_error_redirect(lpm, tmp_path):
lpm.start(
'test', tmp_path, ['bash', '-c', 'ls 1>&2'], {},
'test', tmp_path, ['bash', '-c', '/usr/bin/env ls 1>&2'], {},
tmp_path / 'out', tmp_path / 'err')
_poll_completion(lpm, 1)
with (tmp_path / 'out').open('r') as f:
Expand Down
5 changes: 4 additions & 1 deletion libmuscle/python/libmuscle/test/test_mmp_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from os import getpid
from socket import gethostname

from unittest.mock import patch

import msgpack
Expand Down Expand Up @@ -91,7 +94,7 @@ def test_register_instance(mocked_mmp_client, profile_data) -> None:
sent_msg = msgpack.unpackb(stub.call.call_args[0][0], raw=False)
assert sent_msg == [
RequestType.REGISTER_INSTANCE.value, 'component[13]',
['direct:test', 'tcp:test'], [['out', 'O_I'], ['in', 'S']],
['direct:test', 'tcp:test'], [['out', 'O_I'], ['in', 'S']], getpid(), gethostname(),
libmuscle.__version__]


Expand Down