diff --git a/integration_test/test_cpp_mmp_client.py b/integration_test/test_cpp_mmp_client.py index 6a985764..184892a2 100644 --- a/integration_test/test_cpp_mmp_client.py +++ b/integration_test/test_cpp_mmp_client.py @@ -52,7 +52,7 @@ def mock_remove(name: Reference): # add some peers manager._instance_registry.add( Reference('macro'), ['tcp:test3', 'tcp:test4'], - [Port('out', Operator.O_I), Port('in', Operator.S)]) + [Port('out', Operator.O_I), Port('in', Operator.S)], 1234, "test_host") # create C++ client # it runs through the various RPC calls diff --git a/integration_test/test_registration.py b/integration_test/test_registration.py index f359ff53..005a538f 100644 --- a/integration_test/test_registration.py +++ b/integration_test/test_registration.py @@ -1,6 +1,8 @@ from unittest.mock import patch import pytest +from os import getpid +from socket import gethostname from ymmsl import Conduit, Operator, Port, Reference from libmuscle.mmp_client import MMPClient @@ -18,6 +20,8 @@ def test_registration(log_file_in_tmpdir, mmp_server): assert registry.get_locations(instance_name) == ['tcp:localhost:10000'] assert registry.get_ports(instance_name)[0].name == 'test_in' assert registry.get_ports(instance_name)[0].operator == Operator.S + assert registry.get_pid(instance_name) == getpid() + assert registry.get_hostname(instance_name) == gethostname() client.close() diff --git a/libmuscle/cpp/src/libmuscle/instance.cpp b/libmuscle/cpp/src/libmuscle/instance.cpp index 827af899..89ff900f 100644 --- a/libmuscle/cpp/src/libmuscle/instance.cpp +++ b/libmuscle/cpp/src/libmuscle/instance.cpp @@ -474,7 +474,7 @@ void Instance::Impl::register_() { ProfileEvent register_event(ProfileEventType::register_, ProfileTimestamp()); auto locations = communicator_->get_locations(); auto port_list = list_declared_ports_(); - manager_->register_instance(locations, port_list); + manager_->register_instance(locations, port_list, get_process_id(), get_hostname()); profiler_->record_event(std::move(register_event)); log_info("Registered with the manager"); } diff --git a/libmuscle/cpp/src/libmuscle/mmp_client.cpp b/libmuscle/cpp/src/libmuscle/mmp_client.cpp index b8005e01..3a07012d 100644 --- a/libmuscle/cpp/src/libmuscle/mmp_client.cpp +++ b/libmuscle/cpp/src/libmuscle/mmp_client.cpp @@ -181,7 +181,9 @@ void MMPClient::submit_snapshot_metadata( void MMPClient::register_instance( std::vector const & locations, - std::vector<::ymmsl::Port> const & ports) + std::vector<::ymmsl::Port> const & ports, + int const process_id, + std::string const & hostname) { auto encoded_locs = encode_vector(locations); auto encoded_ports = Data::nils(ports.size()); @@ -191,7 +193,8 @@ void MMPClient::register_instance( auto request = Data::list( static_cast(RequestType::register_instance), static_cast(instance_id_), encoded_locs, - encoded_ports, MUSCLE3_VERSION); + encoded_ports, process_id, hostname, + MUSCLE3_VERSION); auto response = call_manager_(request); diff --git a/libmuscle/cpp/src/libmuscle/mmp_client.hpp b/libmuscle/cpp/src/libmuscle/mmp_client.hpp index a9d01ec2..2e69ff15 100644 --- a/libmuscle/cpp/src/libmuscle/mmp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/mmp_client.hpp @@ -104,7 +104,9 @@ class MMPClient { */ void register_instance( std::vector const & locations, - std::vector<::ymmsl::Port> const & ports); + std::vector<::ymmsl::Port> const & ports, + int const process_id, + std::string const & hostname); /** Request connection information about peers. * diff --git a/libmuscle/cpp/src/libmuscle/tests/mmp_client_test.cpp b/libmuscle/cpp/src/libmuscle/tests/mmp_client_test.cpp index 940e31b0..81ad03a3 100644 --- a/libmuscle/cpp/src/libmuscle/tests/mmp_client_test.cpp +++ b/libmuscle/cpp/src/libmuscle/tests/mmp_client_test.cpp @@ -44,7 +44,8 @@ void test_submit_log_message(MMPClient & client) { void test_register_instance(MMPClient & client) { client.register_instance( {"tcp:test1", "tcp:test2"}, - {Port("out", Operator::O_F), Port("in", Operator::F_INIT)}); + {Port("out", Operator::O_F), Port("in", Operator::F_INIT)}, + 1235, "localhost"); } void test_request_peers(MMPClient & client) { diff --git a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp index 6e277916..9341b83e 100644 --- a/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp +++ b/libmuscle/cpp/src/libmuscle/tests/mocks/mock_mmp_client.hpp @@ -91,8 +91,8 @@ class MockMMPClient : public MockClass { MockFun< Void, Val const &>, - Val const &> - > register_instance; + Val const &>, + Val, Val> register_instance; MockFun, diff --git a/libmuscle/cpp/src/libmuscle/util.cpp b/libmuscle/cpp/src/libmuscle/util.cpp index c7bfc875..e84bf880 100644 --- a/libmuscle/cpp/src/libmuscle/util.cpp +++ b/libmuscle/cpp/src/libmuscle/util.cpp @@ -6,7 +6,10 @@ #include #include #include +#include #include +#include +#include using std::chrono::duration; @@ -114,6 +117,16 @@ bool Retrier::should_give_up() { const double Retrier::default_base_delay_ = 0.5; const double Retrier::default_timeout_ = 30.0; const double Retrier::factor_ = std::pow(2.0, 1.0 / 3.0); +int get_process_id() { + return ::getpid(); +} + +std::string get_hostname() { + char hostname[HOST_NAME_MAX+1]; + if (::gethostname(hostname, HOST_NAME_MAX+1) != 0) + throw std::runtime_error("Could not get hostname"); + return std::string(hostname); +} } } diff --git a/libmuscle/cpp/src/libmuscle/util.hpp b/libmuscle/cpp/src/libmuscle/util.hpp index f4790fa8..a7425d5a 100644 --- a/libmuscle/cpp/src/libmuscle/util.hpp +++ b/libmuscle/cpp/src/libmuscle/util.hpp @@ -227,6 +227,14 @@ std::ostream & operator<<(std::ostream & os, Optional const & t); */ double time_monotonic(); +/* Return the current process ID. + */ +int get_process_id(); + +/* Return the hostname. + */ +std::string get_hostname(); + /* Helper class for retrying things with a delay and timeout. * diff --git a/libmuscle/python/libmuscle/manager/instance_manager.py b/libmuscle/python/libmuscle/manager/instance_manager.py index 93281a37..4e56b5c6 100644 --- a/libmuscle/python/libmuscle/manager/instance_manager.py +++ b/libmuscle/python/libmuscle/manager/instance_manager.py @@ -12,7 +12,8 @@ from libmuscle.manager.instance_registry import InstanceRegistry from libmuscle.manager.instantiator import ( CancelAllRequest, CrashedResult, InstantiatorRequest, - InstantiationRequest, Process, ProcessStatus, ShutdownRequest) + InstantiationRequest, Process, ProcessStatus, ShutdownRequest, + MonitorRequest) from libmuscle.manager.logger import last_lines from libmuscle.manager.run_dir import RunDir from libmuscle.native_instantiator.native_instantiator import NativeInstantiator @@ -62,17 +63,19 @@ class InstanceManager: """Instantiates and manages running instances""" def __init__( self, configuration: Configuration, run_dir: RunDir, - instance_registry: InstanceRegistry) -> None: + instance_registry: InstanceRegistry, mlp_location: str) -> None: """Create an InstanceManager. Args: configuration: The global configuration run_dir: Directory to run in instance_registry: The InstanceRegistry to use + mlp_location: Location of the MUSCLE Log Protocol server """ self._configuration = configuration self._run_dir = run_dir self._instance_registry = instance_registry + self._mlp_location = mlp_location self._resources_in: Queue[Resources] = Queue() self._requests_out: Queue[InstantiatorRequest] = Queue() @@ -81,7 +84,8 @@ def __init__( self._instantiator = NativeInstantiator( self._resources_in, self._requests_out, self._results_in, - self._log_records_in, self._run_dir.path) + self._log_records_in, self._run_dir.path, + mlp_location=self._mlp_location) self._instantiator.start() self._log_handler = LogHandlingThread(self._log_records_in) @@ -158,6 +162,16 @@ def get_resources(self) -> Dict[Reference, ResourceAssignment]: return self._allocations + def monitor_process(self, instance_id: str, hostname: str, process_id: int) -> None: + """Monitor a process for resource usage. + + Args: + instance_id: The ID of the instance + hostname: The hostname of the process + process_id: The process ID of the process + """ + self._requests_out.put(MonitorRequest(instance_id, hostname, process_id)) + def wait(self) -> bool: """Waits for all instances to be done.""" all_seemingly_okay = True diff --git a/libmuscle/python/libmuscle/manager/instance_registry.py b/libmuscle/python/libmuscle/manager/instance_registry.py index 982246b4..8a008249 100644 --- a/libmuscle/python/libmuscle/manager/instance_registry.py +++ b/libmuscle/python/libmuscle/manager/instance_registry.py @@ -19,17 +19,21 @@ def __init__(self) -> None: self._deregistered_one = Condition() # doubles as lock self._locations: Dict[Reference, List[str]] = {} self._ports: Dict[Reference, List[Port]] = {} + self._pids: Dict[Reference, int] = {} + self._hostnames: Dict[Reference, str] = {} self._seen: Set[Reference] = set() self._startup = True - def add(self, name: Reference, locations: List[str], ports: List[Port] - ) -> None: + def add(self, name: Reference, locations: List[str], ports: List[Port], + pid: int, hostname: str) -> None: """Add an instance to the registry. Args: name: Name of the instance. locations: Network locations where it can be reached. ports: List of ports of this instance. + pid: PID of the instance + hostname: Hostname of the instance Raises: ValueError: If an instance with this name has already been @@ -42,6 +46,8 @@ def add(self, name: Reference, locations: List[str], ports: List[Port] self._locations[name] = locations self._ports[name] = ports + self._pids[name] = pid + self._hostnames[name] = hostname self._seen.add(name) self._startup = False @@ -69,6 +75,30 @@ def get_ports(self, name: Reference) -> List[Port]: with self._deregistered_one: return self._ports[name] + def get_pid(self, name: Reference) -> int: + """Retrieves the PID of a registered instance. + + Args: + name: The name of the instance to get the PID of. + + Raises: + KeyError: If no instance with this name was registered. + """ + with self._deregistered_one: + return self._pids[name] + + def get_hostname(self, name: Reference) -> str: + """Retrieves the hostname of a registered instance. + + Args: + name: The name of the instance to get the hostname of. + + Raises: + KeyError: If no instance with this name was registered. + """ + with self._deregistered_one: + return self._hostnames[name] + def remove(self, name: Reference) -> None: """Remove an instance from the registry. @@ -81,6 +111,8 @@ def remove(self, name: Reference) -> None: with self._deregistered_one: del self._locations[name] del self._ports[name] + del self._pids[name] + del self._hostnames[name] self._deregistered_one.notify() def did_register(self, name: Reference) -> bool: diff --git a/libmuscle/python/libmuscle/manager/instantiator.py b/libmuscle/python/libmuscle/manager/instantiator.py index 69b0ab2a..e10c85b2 100644 --- a/libmuscle/python/libmuscle/manager/instantiator.py +++ b/libmuscle/python/libmuscle/manager/instantiator.py @@ -106,6 +106,14 @@ def __init__( self.stderr_path = stderr_path +class MonitorRequest(InstantiatorRequest): + """Requests monitoring a process.""" + def __init__(self, instance: str, hostname: str, pid: int) -> None: + self.instance = instance + self.hostname = hostname + self.pid = pid + + class CancelAllRequest(InstantiatorRequest): """Requests stopping all running processes.""" pass diff --git a/libmuscle/python/libmuscle/manager/manager.py b/libmuscle/python/libmuscle/manager/manager.py index 69f439af..59fee32e 100644 --- a/libmuscle/python/libmuscle/manager/manager.py +++ b/libmuscle/python/libmuscle/manager/manager.py @@ -11,6 +11,7 @@ from libmuscle.manager.mmp_server import MMPServer from libmuscle.manager.instance_manager import InstanceManager from libmuscle.manager.profile_store import ProfileStore +from libmuscle.manager.mlp_server import MLPServer from libmuscle.manager.run_dir import RunDir from libmuscle.manager.snapshot_registry import SnapshotRegistry from libmuscle.manager.topology_store import TopologyStore @@ -67,12 +68,15 @@ def __init__( for c in self._configuration.model.components for instance_name in c.instances()]) + self._mlp_server = MLPServer(self._logger, self._profile_store) + self._instance_manager: Optional[InstanceManager] = None try: configuration = self._configuration.as_configuration() if self._run_dir is not None: self._instance_manager = InstanceManager( - configuration, self._run_dir, self._instance_registry) + configuration, self._run_dir, self._instance_registry, + mlp_location=self._mlp_server.get_location()) except ValueError: pass @@ -85,7 +89,8 @@ def __init__( self._server = MMPServer( self._logger, self._profile_store, self._configuration, self._instance_registry, self._topology_store, - self._snapshot_registry, self._deadlock_detector, run_dir) + self._snapshot_registry, self._deadlock_detector, run_dir, + instance_manager=self._instance_manager) if self._instance_manager: self._instance_manager.set_manager_location( @@ -124,6 +129,7 @@ def stop(self) -> None: if self._instance_manager: self._instance_manager.shutdown() self._server.stop() + self._mlp_server.stop() self._snapshot_registry.shutdown() self._snapshot_registry.join() self._profile_store.shutdown() diff --git a/libmuscle/python/libmuscle/manager/mlp_server.py b/libmuscle/python/libmuscle/manager/mlp_server.py new file mode 100644 index 00000000..ef41692d --- /dev/null +++ b/libmuscle/python/libmuscle/manager/mlp_server.py @@ -0,0 +1,135 @@ +import errno +import logging +from typing import Any, Dict, List, Tuple, cast + +import msgpack +from ymmsl import Reference + +from libmuscle.manager.logger import Logger +from libmuscle.manager.profile_store import ProfileStore +from libmuscle.mcp.protocol import RequestType, ResponseType +from libmuscle.mcp.tcp_transport_server import TcpTransportServer +from libmuscle.mcp.transport_server import RequestHandler +from libmuscle.profiling import (ProfileTimestamp, ProfileEvent, + ProfileEventType) + + +_logger = logging.getLogger(__name__) + + +class MLPRequestHandler(RequestHandler): + """Handles Manager requests.""" + def __init__( + self, + logger: Logger, + profile_store: ProfileStore + ) -> None: + """Create an MLPRequestHandler. + + Args: + logger: The Logger component to log messages to. + profile_store: The profile store to store profile events in. + """ + self._logger = logger + self._profile_store = profile_store + + def handle_request(self, request: bytes) -> bytes: + """Handles a manager request. + + Args: + request: The encoded request + + Returns: + response: An encoded response + """ + req_list = msgpack.unpackb(request, raw=False) + req_type = req_list[0] + req_args = req_list[1:] + if req_type == RequestType.REPORT_USAGE.value: + response = self._report_usage_events(*req_args) + + return cast(bytes, msgpack.packb(response, use_bin_type=True)) + + def close(self) -> None: + """Free per-thread resources. + + On shutdown of the server, this will be called by each server + thread before it shuts down. + """ + self._profile_store.close() + + def _report_usage_events( + self, node_name: str, usage: Dict[str, Tuple[float, int]]) -> Any: + """Handle a submit usage events request. + + Args: + node_name: Name of the node that sent these events + usage: Usage events to store + + Returns: + A list containing the following values on success: + + status (ResponseType): SUCCESS + """ + events: List[Tuple[str, ProfileEvent]] = [] + for instance_id, (cpu_usage, memory_usage) in usage.items(): + _logger.info(f'Adding usage event: {instance_id}: {cpu_usage}% cpu, ' + f'{memory_usage} bytes memory') + time = ProfileTimestamp() + prof_event = ProfileEvent(ProfileEventType.RESOURCE_USAGE, + start_time=time, stop_time=time, + cpu_percent=cpu_usage, memory_usage=memory_usage) + events.append((instance_id, prof_event)) + + for event in events: + self._profile_store.add_event(Reference(event[0]), event[1]) + + return [ResponseType.SUCCESS.value] + + +class MLPServer: + """The MUSCLE Logging Protocol server. + + This class accepts connections from the instances comprising + the multiscale model to be executed, and services them using an + MLPRequestHandler. + """ + def __init__( + self, + logger: Logger, + profile_store: ProfileStore + ) -> None: + """Create an MLPServer. + + This starts a TCP Transport server and connects it to an + MLPRequestHandler, which uses the given components to service + the requests. By default, we listen on port 9001, unless it's + not available in which case we use a random other one. + + Args: + logger: Logger to send log messages to + profile_store: ProfileStore to store profile data in + """ + self._handler = MLPRequestHandler(logger, profile_store) + try: + self._server = TcpTransportServer(self._handler, 9001) + except OSError as e: + if e.errno != errno.EADDRINUSE: + raise + self._server = TcpTransportServer(self._handler) + + def get_location(self) -> str: + """Returns this server's network location. + + This is a string of the form tcp::. + """ + return self._server.get_location() + + def stop(self) -> None: + """Stops the server. + + This makes the server stop serving requests, and shuts down its background + threads. By the time this gets called, the instances are down, so we don't need + to wait for any sessions to time out. + """ + self._server.close(False) diff --git a/libmuscle/python/libmuscle/manager/mmp_server.py b/libmuscle/python/libmuscle/manager/mmp_server.py index a88e37c6..b93ff9f1 100644 --- a/libmuscle/python/libmuscle/manager/mmp_server.py +++ b/libmuscle/python/libmuscle/manager/mmp_server.py @@ -25,6 +25,7 @@ ProfileEvent, ProfileEventType, ProfileTimestamp) from libmuscle.snapshot import SnapshotMetadata from libmuscle.timestamp import Timestamp +from libmuscle.manager.instance_manager import InstanceManager _logger = logging.getLogger(__name__) @@ -79,7 +80,8 @@ def __init__( topology_store: TopologyStore, snapshot_registry: SnapshotRegistry, deadlock_detector: DeadlockDetector, - run_dir: Optional[RunDir] + run_dir: Optional[RunDir], + instance_manager: Optional[InstanceManager] = None, ) -> None: """Create an MMPRequestHandler. @@ -98,6 +100,7 @@ def __init__( self._deadlock_detector = deadlock_detector self._run_dir = run_dir self._reference_time = time.monotonic() + self._instance_manager = instance_manager def handle_request(self, request: bytes) -> bytes: """Handles a manager request. @@ -146,13 +149,15 @@ def close(self) -> None: def _register_instance( self, instance_id: str, locations: List[str], - ports: List[List[str]], version: str = '') -> Any: + ports: List[List[str]], pid: int, hostname: str, version: str = '') -> Any: """Handle a register instance request. Args: instance_id: ID of the instance to register locations: Locations where it can be reached ports: Ports of this instance + pid: PID of the instance + hostname: Hostname of the instance version: Version of libmuscle that this instance uses Returns: @@ -169,11 +174,14 @@ def _register_instance( f' manager libmuscle version ({libmuscle.__version__}).' ' Please ensure that the instance and the manager use the' ' same version of libmuscle.'] - port_objs = [decode_port(p) for p in ports] instance = Reference(instance_id) + + if self._instance_manager: + self._instance_manager.monitor_process(instance_id, hostname, pid) + try: - self._instance_registry.add(instance, locations, port_objs) + self._instance_registry.add(instance, locations, port_objs, pid, hostname) _logger.info(f'Registered instance {instance_id}') return [ResponseType.SUCCESS.value] @@ -417,7 +425,8 @@ def __init__( topology_store: TopologyStore, snapshot_registry: SnapshotRegistry, deadlock_detector: DeadlockDetector, - run_dir: Optional[RunDir] + run_dir: Optional[RunDir], + instance_manager: Optional[InstanceManager] ) -> None: """Create an MMPServer. @@ -439,7 +448,8 @@ def __init__( """ self._handler = MMPRequestHandler( logger, profile_store, configuration, instance_registry, - topology_store, snapshot_registry, deadlock_detector, run_dir) + topology_store, snapshot_registry, deadlock_detector, run_dir, + instance_manager=instance_manager) try: self._server = TcpTransportServer(self._handler, 9000) except OSError as e: diff --git a/libmuscle/python/libmuscle/manager/profile_store.py b/libmuscle/python/libmuscle/manager/profile_store.py index 3ee262b8..4a58c94e 100644 --- a/libmuscle/python/libmuscle/manager/profile_store.py +++ b/libmuscle/python/libmuscle/manager/profile_store.py @@ -102,6 +102,21 @@ def store_resources(self, resources: Dict[Reference, ResourceAssignment]) -> Non cur.execute("COMMIT") cur.close() + def add_event( + self, instance_id: Reference, event: ProfileEvent + ) -> None: + """Adds a profiling event to the database. + + Args: + event: The event to add. + """ + _logger.debug(f"Add profile event for instance {instance_id}, \ + cpu: {event.cpu_percent}, memory: {event.memory_usage}") + + self._queue.put((instance_id, [event])) + if _SYNCHED: + self._confirmation_queue.get() + def add_events( self, instance_id: Reference, events: Iterable[ProfileEvent] ) -> None: @@ -122,12 +137,14 @@ def _storage_thread(self) -> None: threads. So we use a single background thread now to do the writing. """ - Record = Tuple[ + ProfileRecord = Tuple[ int, int, float, float, Optional[str], Optional[int], Optional[int], Optional[int], Optional[int], Optional[int], Optional[float]] + UsageRecord = Tuple[ + int, int, float, float, Optional[float], Optional[int]] - def to_tuple(e: ProfileEvent) -> Record: + def to_tuple(e: ProfileEvent) -> ProfileRecord: # Tell mypy this shouldn't happen assert e.start_time is not None assert e.stop_time is not None @@ -141,6 +158,14 @@ def to_tuple(e: ProfileEvent) -> Record: e.port_length, e.slot, e.message_number, e.message_size, e.message_timestamp) + def to_usage_tuple(e: ProfileEvent) -> UsageRecord: + assert e.start_time is not None + assert e.stop_time is not None + + return ( + instance_oid, e.event_type.value, e.start_time.nanoseconds, + e.stop_time.nanoseconds, e.cpu_percent, e.memory_usage) + cur = self._get_cursor() batch = self._queue.get() while batch is not None: @@ -151,13 +176,32 @@ def to_tuple(e: ProfileEvent) -> Record: instance_oid = self._get_instance_oid(cur, instance_id) - cur.executemany( - "INSERT INTO events" - " (instance_oid, event_type_oid, start_time, stop_time," - " port_name, port_operator_oid, port_length, slot," - " message_number, message_size, message_timestamp)" - " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - map(to_tuple, events)) + # Split events into events and profile events + profile_events = [] + usage_events = [] + for e in events: + if e.event_type == ProfileEventType.RESOURCE_USAGE: + usage_events.append(e) + else: + profile_events.append(e) + + if profile_events: + cur.executemany( + "INSERT INTO events" + " (instance_oid, event_type_oid, start_time, stop_time," + " port_name, port_operator_oid, port_length, slot," + " message_number, message_size, message_timestamp)" + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + map(to_tuple, profile_events)) + + if usage_events: + cur.executemany( + "INSERT INTO usage_events" + " (instance_oid, event_type_oid, start_time, stop_time," + " cpu_percent, memory_usage)" + " VALUES (?, ?, ?, ?, ?, ?)", + map(to_usage_tuple, usage_events)) + cur.execute("COMMIT") if _SYNCHED: self._confirmation_queue.put(None) @@ -239,9 +283,21 @@ def _init_database(self) -> None: " message_size INTEGER," " message_timestamp DOUBLE)") + cur.execute( + "CREATE TABLE usage_events (" + " instance_oid INTEGER NOT NULL REFERENCES instances(oid)," + " event_type_oid INTEGER NOT NULL REFERENCES event_types(oid)," + " start_time INTEGER NOT NULL," + " stop_time INTEGER NOT NULL," + " cpu_percent DOUBLE," + " memory_usage INTEGER)") + cur.execute("CREATE INDEX instances_oid_idx ON instances(oid)") cur.execute("CREATE INDEX events_start_time_idx ON events(start_time)") + cur.execute( + "CREATE INDEX usage_events_start_time_idx" + " ON usage_events(start_time)") cur.execute( "CREATE VIEW all_events" diff --git a/libmuscle/python/libmuscle/manager/test/conftest.py b/libmuscle/python/libmuscle/manager/test/conftest.py index 2f5fa13a..7255a9ac 100644 --- a/libmuscle/python/libmuscle/manager/test/conftest.py +++ b/libmuscle/python/libmuscle/manager/test/conftest.py @@ -71,12 +71,12 @@ def mmp_request_handler( @pytest.fixture def loaded_instance_registry(instance_registry): - instance_registry.add(Reference('macro'), ['direct:macro'], []) + instance_registry.add(Reference('macro'), ['direct:macro'], [], 10, 'test_host') for j in range(10): for i in range(10): name = Reference('micro') + j + i location = 'direct:{}'.format(name) - instance_registry.add(name, [location], []) + instance_registry.add(name, [location], [], 100 + j + i * 10, 'test_host') return instance_registry @@ -121,18 +121,18 @@ def snapshot_registry2(mmp_configuration2, topology_store) -> SnapshotRegistry: def loaded_instance_registry2(): instance_registry = InstanceRegistry() - instance_registry.add(Reference('macro'), ['direct:macro'], []) + instance_registry.add(Reference('macro'), ['direct:macro'], [], 10, 'test_host') for j in range(5): name = Reference('meso') + j location = 'direct:{}'.format(name) - instance_registry.add(name, [location], []) + instance_registry.add(name, [location], [], 10 + j, 'test_host') for j in range(5): for i in range(10): name = Reference('micro') + j + i location = 'direct:{}'.format(name) - instance_registry.add(name, [location], []) + instance_registry.add(name, [location], [], 100 + j*10 + i, 'test_host') return instance_registry diff --git a/libmuscle/python/libmuscle/manager/test/test_instance_registry.py b/libmuscle/python/libmuscle/manager/test/test_instance_registry.py index ab4190df..8e6c604e 100644 --- a/libmuscle/python/libmuscle/manager/test/test_instance_registry.py +++ b/libmuscle/python/libmuscle/manager/test/test_instance_registry.py @@ -21,7 +21,7 @@ def test_port(port): def test_registry_add(registry, port): - registry.add('instance1', 'tcp://localhost:6253', [port]) + registry.add('instance1', 'tcp://localhost:6253', [port], 12345, 'test_host') assert (registry._locations['instance1'] == 'tcp://localhost:6253') assert registry._ports['instance1'] == [port] @@ -31,6 +31,8 @@ def test_registry_get(registry, port): registry._locations['instance1'] = [ 'tcp://localhost:6253'] registry._ports['instance1'] = [port] + registry._pids['instance1'] = 12345 + registry._hostnames['instance1'] = 'test_host' assert registry.get_locations('instance1') == ['tcp://localhost:6253'] assert registry.get_ports('instance1') == [port] @@ -45,6 +47,8 @@ def test_registry_remove(registry, port): registry._locations['instance1'] = [ 'tcp://localhost:6253'] registry._ports['instance1'] = [port] + registry._pids['instance1'] = 12345 + registry._hostnames['instance1'] = 'test_host' registry.remove('instance1') assert 'instance1' not in registry._locations assert 'instance1' not in registry._ports diff --git a/libmuscle/python/libmuscle/manager/test/test_mlp_request_handler.py b/libmuscle/python/libmuscle/manager/test/test_mlp_request_handler.py new file mode 100644 index 00000000..4152731d --- /dev/null +++ b/libmuscle/python/libmuscle/manager/test/test_mlp_request_handler.py @@ -0,0 +1,30 @@ +import msgpack +from ymmsl import Reference + +from libmuscle.manager.mlp_server import MLPRequestHandler +from libmuscle.mcp.protocol import RequestType, ResponseType + + +def test_create_servicer(logger, profile_store): + MLPRequestHandler(logger, profile_store) + + +def test_report_usage(logger, profile_store): + profile_store.store_instances([Reference('instance1'), Reference('instance2')]) + handler = MLPRequestHandler(logger, profile_store) + + usage = { + "instance1": (0.1, 1024), + "instance2": (0.2, 2048), + } + + request = [ + RequestType.REPORT_USAGE.value, + "node_name0", usage] + encoded_request = msgpack.packb(request, use_bin_type=True) + + result = handler.handle_request(encoded_request) + + decoded_result = msgpack.unpackb(result, raw=False) + + assert decoded_result[0] == ResponseType.SUCCESS.value diff --git a/libmuscle/python/libmuscle/manager/test/test_mmp_request_handler.py b/libmuscle/python/libmuscle/manager/test/test_mmp_request_handler.py index 0286649c..3f4f6404 100644 --- a/libmuscle/python/libmuscle/manager/test/test_mmp_request_handler.py +++ b/libmuscle/python/libmuscle/manager/test/test_mmp_request_handler.py @@ -82,6 +82,8 @@ def test_register_instance(mmp_request_handler, instance_registry): 'test_instance', ['tcp://localhost:10000'], [['test_in', 'F_INIT']], + 12345, + 'test_host', libmuscle.__version__] encoded_request = msgpack.packb(request, use_bin_type=True) @@ -102,7 +104,9 @@ def test_register_instance_no_version(mmp_request_handler): RequestType.REGISTER_INSTANCE.value, 'test_instance', ['tcp://localhost:10000'], - [['test_in', 'F_INIT']]] + [['test_in', 'F_INIT']], + 12345, + 'test_host'] encoded_request = msgpack.packb(request, use_bin_type=True) result = mmp_request_handler.handle_request(encoded_request) @@ -118,6 +122,8 @@ def test_register_instance_version_mismatch(mmp_request_handler): 'test_instance', ['tcp://localhost:10000'], [['test_in', 'F_INIT']], + 12345, + 'test_host', libmuscle.__version__ + "dev"] encoded_request = msgpack.packb(request, use_bin_type=True) @@ -184,6 +190,8 @@ def test_double_register_instance(mmp_request_handler): 'test_instance', ['tcp://localhost:10000'], [['test_in', 'F_INIT']], + 12345, + 'test_host', libmuscle.__version__] encoded_request = msgpack.packb(request, use_bin_type=True) diff --git a/libmuscle/python/libmuscle/manager/test/test_profile_database.py b/libmuscle/python/libmuscle/manager/test/test_profile_database.py index b72c964a..782ef6e8 100644 --- a/libmuscle/python/libmuscle/manager/test/test_profile_database.py +++ b/libmuscle/python/libmuscle/manager/test/test_profile_database.py @@ -74,6 +74,9 @@ def t(offset: int) -> ProfileTimestamp: ProfileEvent( ProfileEventType.RECEIVE_WAIT, t(2600), t(2870), Port('in', Operator.O_I), None, None, 1000000, 0.0), + ProfileEvent( + ProfileEventType.RESOURCE_USAGE, start_time=t(2700), + stop_time=t(2800), cpu_percent=12.5, memory_usage=1024000), ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)), ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))] @@ -137,3 +140,15 @@ def test_time_taken(db_file): assert 200.0 == db.time_taken(etype='SEND') assert 200.0 == db.time_taken(etype='DEREGISTER', aggregate='sum') assert 600.0 == db.time_taken(etype='SEND', aggregate='sum') + + +def test_resource_usage_event(db_file): + with ProfileDatabase(db_file) as db: + cur = db._get_cursor() + cur.execute(f"SELECT * FROM usage_events WHERE event_type_oid = \ + '{ProfileEventType.RESOURCE_USAGE.value}'") + rows = cur.fetchall() + assert len(rows) == 1 + # Check columns + assert rows[0][4] == 12.5 + assert rows[0][5] == 1024000 diff --git a/libmuscle/python/libmuscle/manager/test/test_profile_store.py b/libmuscle/python/libmuscle/manager/test/test_profile_store.py index e4318d5d..46f1e869 100644 --- a/libmuscle/python/libmuscle/manager/test/test_profile_store.py +++ b/libmuscle/python/libmuscle/manager/test/test_profile_store.py @@ -39,6 +39,12 @@ def test_create_profile_store(tmp_path): events = cur.fetchall() assert len(events) == 0 + cur.execute( + "SELECT instance_oid, event_type_oid, start_time, stop_time," + " cpu_percent, memory_usage FROM usage_events") + usage_events = cur.fetchall() + assert len(usage_events) == 0 + cur.execute("COMMIT") cur.close() conn.close() @@ -62,7 +68,10 @@ def test_add_events(tmp_path): Port('out_port', Operator.O_I), 10, 3, 67, 12345, 13.42), ProfileEvent( ProfileEventType.DEREGISTER, ProfileTimestamp(1000000000000), - ProfileTimestamp(1100000000000))] + ProfileTimestamp(1100000000000)), + ProfileEvent( + ProfileEventType.RESOURCE_USAGE, ProfileTimestamp(200), + ProfileTimestamp(300), cpu_percent=10.0, memory_usage=1000)] def check_send_event(instance): cur.execute("BEGIN TRANSACTION") @@ -74,7 +83,6 @@ def check_send_event(instance): " AND e.port_operator_oid = o.oid AND i.name = 'instance[0]'" " AND et.name = (?)", (ProfileEventType.SEND.name,)) events2 = cur.fetchall() - assert len(events2) == 1 e = events2[0] assert e[1:11] == ( @@ -86,10 +94,29 @@ def check_send_event(instance): cur.execute("COMMIT") + def check_resource_event(instance): + cur.execute("BEGIN TRANSACTION") + cur.execute( + "SELECT *" + " FROM usage_events AS e, instances AS i, event_types AS et" + " WHERE e.instance_oid = i.oid AND e.event_type_oid = et.oid" + " AND i.name = 'instance[0]'" + " AND et.name = (?)", (ProfileEventType.RESOURCE_USAGE.name,)) + events2 = cur.fetchall() + assert len(events2) == 1 + e = events2[0] + assert e[1:6] == ( + ProfileEventType.RESOURCE_USAGE.value, 200, 300, 10.0, 1000) + assert e[7] == 'instance[0]' + assert e[9] == 'RESOURCE_USAGE' + + cur.execute("COMMIT") + db.store_instances([Reference('instance[0]'), Reference('instance[1]')]) db.add_events(Reference('instance[0]'), events) check_send_event('instance[0]') + check_resource_event('instance[0]') db.add_events(Reference('instance[1]'), events) check_send_event('instance[1]') diff --git a/libmuscle/python/libmuscle/mcp/protocol.py b/libmuscle/python/libmuscle/mcp/protocol.py index 929cbaee..b7956604 100644 --- a/libmuscle/python/libmuscle/mcp/protocol.py +++ b/libmuscle/python/libmuscle/mcp/protocol.py @@ -35,6 +35,9 @@ class RequestType(Enum): GET_COMMAND = 42 REPORT_RESULT = 43 + # MUSCLE Log Protocol + REPORT_USAGE = 51 + class ResponseType(Enum): """Identifier for different types of response @@ -57,3 +60,4 @@ class AgentCommandType(Enum): START = 1 CANCEL_ALL = 2 SHUTDOWN = 3 + ADD_MONITOR = 4 diff --git a/libmuscle/python/libmuscle/mlp_client.py b/libmuscle/python/libmuscle/mlp_client.py new file mode 100644 index 00000000..16586c57 --- /dev/null +++ b/libmuscle/python/libmuscle/mlp_client.py @@ -0,0 +1,132 @@ +from typing import Any, Dict, List, Optional, Tuple +from datetime import datetime, timedelta + +import msgpack +import psutil +from libmuscle.mcp.protocol import RequestType +from libmuscle.mcp.tcp_transport_client import TcpTransportClient + + +class Usage: + """Class to neatly contain the monitoring of the usage of instances on a node + There are try-except blocks for psutil.Error, these usually get hit when a + process/child no longer exists, this is not an error but the way of "life" + ignore those errors and continue with the still existing processes + """ + last_updated: Optional[datetime] = None + processes: Dict[str, List[psutil.Process]] = {} + + def refresh_processes(self, instance_id_and_pids: List[Tuple[str, int]]) -> None: + self.processes = {} + for instance_id_and_pid in instance_id_and_pids: + instance_id, pid = instance_id_and_pid + try: + process = psutil.Process(pid) + # Get the process and its children + self.processes[instance_id] = [ + child for child in process.children(recursive=True) + ] + [process] + # First call to cpu_percent(), to initialize the baseline I guess... + for process in self.processes[instance_id]: + try: + process.cpu_percent() + except psutil.Error: + continue + except psutil.Error: + continue + self.last_updated = datetime.now() + + def record_usage(self) -> Dict[str, Tuple[float, int]]: + usage: Dict[str, Tuple[float, int]] = {} + for instance_id, proc_list in self.processes.items(): + # Second call to cpu_percent(), to get the actual usage + cpu_usages = [] + mem_usages = [] + for process in proc_list: + try: + cpu_usages.append(process.cpu_percent()) + mem_usages.append(process.memory_info().vms) + except psutil.Error: + continue + + cpu = sum(cpu_usages) + # Sum the memories of the processes and their children, + # not sure if this is the right choice... + mem = sum(mem_usages) + + usage[instance_id] = (cpu, mem) + self.last_updated = datetime.now() + return usage + + +class MLPClient: + """The client for the MUSCLE Logging Protocol. + + This class connects to the Manager and communicates with it. + """ + def __init__(self, node_name: str, location: str) -> None: + """Create an MLPClient + + Args: + node_name: Name (hostname) of the local node + location: A connection string of the form hostname:port + """ + self._node_name = node_name + self._transport_client = TcpTransportClient(location) + self._usage = Usage() + + def close(self) -> None: + """Close the connection + + This closes the connection. After this no other member functions can be called. + """ + self._transport_client.close() + + def report_usage(self, instance_id_and_pids: List[Tuple[str, int]]) -> None: + """Report usage of resources of processes with given (instance_id, pid) + on this node. + + Args: + instance_id_and_pids: List of (instance_id, pid) tuples + """ + + if len(instance_id_and_pids) == 0: + """ Nothing to monitor, return """ + return + + refresh_processes = False + record_usage = False + + if not self._usage.last_updated: + refresh_processes = True + elif self._usage.last_updated - timedelta(seconds=1) < datetime.now(): + record_usage = True + refresh_processes = True + + usage: Dict[str, Tuple[float, int]] = {} + if record_usage: + usage = self._usage.record_usage() + if refresh_processes: + self._usage.refresh_processes(instance_id_and_pids) + + if len(usage) < 1: + """ Nothing to monitor """ + return + + request = [ + RequestType.REPORT_USAGE.value, + self._node_name, usage] + self._call_manager(request) + + def _call_manager(self, request: Any) -> Any: + """Call the manager and do en/decoding. + + Args: + request: The request to encode and send + + Returns: + The decoded response + """ + encoded_request = msgpack.packb(request, use_bin_type=True) + response, _ = self._transport_client.call(encoded_request) + return msgpack.unpackb(response, raw=False) diff --git a/libmuscle/python/libmuscle/mmp_client.py b/libmuscle/python/libmuscle/mmp_client.py index 730087fe..15d13ba3 100644 --- a/libmuscle/python/libmuscle/mmp_client.py +++ b/libmuscle/python/libmuscle/mmp_client.py @@ -1,6 +1,8 @@ import dataclasses +from os import getpid from pathlib import Path from random import uniform +from socket import gethostname from threading import get_ident, RLock from time import perf_counter, sleep from typing import Any, Dict, Iterable, List, Optional, Tuple @@ -57,7 +59,8 @@ def encode_profile_event(event: ProfileEvent) -> Any: event.event_type.value, event.start_time.nanoseconds, event.stop_time.nanoseconds, encoded_port, event.port_length, event.slot, - event.message_number, event.message_size, event.message_timestamp] + event.message_number, event.message_size, event.message_timestamp, + event.cpu_percent, event.memory_usage] def decode_checkpoint_rule(rule: Dict[str, Any]) -> CheckpointRule: @@ -232,6 +235,7 @@ def register_instance( RequestType.REGISTER_INSTANCE.value, str(self._instance_id), locations, [encode_port(p) for p in ports], + getpid(), gethostname(), libmuscle.__version__] response = self._call_manager(request) if response[0] == ResponseType.ERROR.value: diff --git a/libmuscle/python/libmuscle/native_instantiator/agent/__main__.py b/libmuscle/python/libmuscle/native_instantiator/agent/__main__.py index 42480bd1..d00602c5 100644 --- a/libmuscle/python/libmuscle/native_instantiator/agent/__main__.py +++ b/libmuscle/python/libmuscle/native_instantiator/agent/__main__.py @@ -4,36 +4,39 @@ from socket import gethostname import sys from time import sleep -from typing import Dict, Set, Tuple +from typing import Dict, Set, Tuple, List from libmuscle.native_instantiator.process_manager import ProcessManager from libmuscle.native_instantiator.agent.map_client import MAPClient +from libmuscle.mlp_client import MLPClient from libmuscle.native_instantiator.agent.agent_commands import ( - CancelAllCommand, ShutdownCommand, StartCommand) + AddMonitorCommand, CancelAllCommand, ShutdownCommand, StartCommand) from libmuscle.planner.resources import Core, CoreSet, OnNodeResources - _logger = logging.getLogger(__name__) class Agent: """Runs on a compute node and starts processes there.""" - def __init__(self, node_name: str, server_location: str) -> None: + def __init__(self, node_name: str, server_location: str, mlp_location: str) -> None: """Create an Agent. Args: node_name: Name (hostname) of this node server_location: MAP server of the manager to connect to + mlp_location: MLP server of the manager to connect to """ _logger.info(f'Agent at {node_name} starting') self._process_manager = ProcessManager() self._node_name = node_name + self._monitor_pids: List[Tuple[str, int]] = [] _logger.info(f'Connecting to manager at {server_location}') self._server = MAPClient(self._node_name, server_location) _logger.info('Connected to manager') + self._mlpclient = MLPClient(self._node_name, mlp_location) def run(self) -> None: """Execute commands and monitor processes.""" @@ -49,8 +52,10 @@ def run(self) -> None: _logger.debug(f'Env: {command.env}') self._process_manager.start( - command.name, command.work_dir, command.args, command.env, - command.stdout, command.stderr) + command.name, command.work_dir, command.args, + command.stdout, command.stderr, command.env) + elif isinstance(command, AddMonitorCommand): + self._monitor_pids.append((command.instance, command.pid)) elif isinstance(command, CancelAllCommand): _logger.info('Cancelling all instances') self._process_manager.cancel_all() @@ -66,6 +71,9 @@ def run(self) -> None: _logger.info(f'Process {name} finished with exit code {exit_code}') self._server.report_result(finished) + if not shutting_down and not finished: + self._mlpclient.report_usage(self._monitor_pids) + sleep(0.1) self._server.close() @@ -187,8 +195,9 @@ def configure_logging(node_name: str, log_level: int) -> None: node_name = gethostname() server_location = sys.argv[1] log_level = int(sys.argv[2]) + mlp_location = sys.argv[3] configure_logging(node_name, log_level) - agent = Agent(node_name, server_location) + agent = Agent(node_name, server_location, mlp_location) agent.run() diff --git a/libmuscle/python/libmuscle/native_instantiator/agent/agent_commands.py b/libmuscle/python/libmuscle/native_instantiator/agent/agent_commands.py index 56a830d1..1dcbfb3b 100644 --- a/libmuscle/python/libmuscle/native_instantiator/agent/agent_commands.py +++ b/libmuscle/python/libmuscle/native_instantiator/agent/agent_commands.py @@ -23,3 +23,10 @@ class CancelAllCommand(AgentCommand): class ShutdownCommand(AgentCommand): pass + + +@dataclass +class AddMonitorCommand(AgentCommand): + instance: str + hostname: str + pid: int diff --git a/libmuscle/python/libmuscle/native_instantiator/agent/map_client.py b/libmuscle/python/libmuscle/native_instantiator/agent/map_client.py index e402b29f..7793201a 100644 --- a/libmuscle/python/libmuscle/native_instantiator/agent/map_client.py +++ b/libmuscle/python/libmuscle/native_instantiator/agent/map_client.py @@ -6,7 +6,8 @@ from libmuscle.mcp.protocol import AgentCommandType, RequestType, ResponseType from libmuscle.mcp.tcp_transport_client import TcpTransportClient from libmuscle.native_instantiator.agent.agent_commands import ( - AgentCommand, StartCommand, CancelAllCommand, ShutdownCommand) + AgentCommand, AddMonitorCommand, StartCommand, CancelAllCommand, + ShutdownCommand) from libmuscle.planner.resources import OnNodeResources @@ -68,6 +69,9 @@ def get_command(self) -> Optional[AgentCommand]: return StartCommand(name, workdir, args, env, stdout, stderr) + elif command[0] == AgentCommandType.ADD_MONITOR.value: + return AddMonitorCommand(command[1], command[2], command[3]) + elif command[0] == AgentCommandType.CANCEL_ALL.value: return CancelAllCommand() diff --git a/libmuscle/python/libmuscle/native_instantiator/agent_manager.py b/libmuscle/python/libmuscle/native_instantiator/agent_manager.py index 80751e2c..cc02e444 100644 --- a/libmuscle/python/libmuscle/native_instantiator/agent_manager.py +++ b/libmuscle/python/libmuscle/native_instantiator/agent_manager.py @@ -7,6 +7,7 @@ from typing import Dict, List, Tuple from libmuscle.native_instantiator.agent.agent_commands import ( + AddMonitorCommand, CancelAllCommand, StartCommand, ShutdownCommand) from libmuscle.native_instantiator.iagent_manager import IAgentManager from libmuscle.native_instantiator.map_server import MAPServer @@ -29,7 +30,7 @@ class AgentManager(IAgentManager): cancel processes on nodes, and it gets called by MAPServer with requests from the agents. """ - def __init__(self, agent_dir: Path) -> None: + def __init__(self, agent_dir: Path, mlp_location: str) -> None: """Create an AgentManager. Create the object, then launch the agents and wait for them to connect and send @@ -37,6 +38,7 @@ def __init__(self, agent_dir: Path) -> None: Args: agent_dir: Directory in which agents can write log files. + mlp_location: Location of the MLP server. """ self._expected_nodes = global_resources().nodes self._nodes: Dict[str, str] = dict() @@ -47,7 +49,7 @@ def __init__(self, agent_dir: Path) -> None: self._finished_processes_lock = Lock() self._server = MAPServer(self) - self._launch_agents(agent_dir, self._server.get_location()) + self._launch_agents(agent_dir, self._server.get_location(), mlp_location) def get_resources(self) -> Resources: """Return detected resources. @@ -80,6 +82,17 @@ def start( command = StartCommand(name, work_dir, args, env, stdout, stderr) self._server.deposit_command(agent_hostname, command) + def add_monitor(self, instance: str, hostname: str, pid: int) -> None: + """Handle a monitor usage request. + + Args: + instance: Name of the instance to monitor + hostname: Hostname of the node to monitor + pid: Process id to monitor + """ + self._server.deposit_command( + hostname, AddMonitorCommand(instance, hostname, pid)) + def cancel_all(self) -> None: """Cancel all processes. @@ -164,7 +177,9 @@ def report_result(self, names_exit_codes: List[Tuple[str, int]]) -> None: with self._finished_processes_lock: self._finished_processes.extend(names_exit_codes) - def _launch_agents(self, agent_dir: Path, server_location: str) -> None: + def _launch_agents( + self, agent_dir: Path, server_location: str, mlp_location: str + ) -> None: """Actually launch the agents. This runs a local process, either to start a single agent locally, or on a @@ -174,6 +189,7 @@ def _launch_agents(self, agent_dir: Path, server_location: str) -> None: agent_dir: Working directory for the agents server_location: MAPServer network location string for the agents to connect to + mlp_location: MLPServer network location string for the agents to connect to """ _logger.info('Launching MUSCLE agents...') @@ -186,7 +202,7 @@ def _launch_agents(self, agent_dir: Path, server_location: str) -> None: args = [ sys.executable, '-m', 'libmuscle.native_instantiator.agent', - server_location, str(log_level)] + server_location, str(log_level), mlp_location] args = global_resources().agent_launch_command(args) diff --git a/libmuscle/python/libmuscle/native_instantiator/map_server.py b/libmuscle/python/libmuscle/native_instantiator/map_server.py index 8e279fbf..f47e9aa8 100644 --- a/libmuscle/python/libmuscle/native_instantiator/map_server.py +++ b/libmuscle/python/libmuscle/native_instantiator/map_server.py @@ -8,7 +8,8 @@ from libmuscle.mcp.tcp_transport_server import TcpTransportServer from libmuscle.mcp.transport_server import RequestHandler from libmuscle.native_instantiator.agent.agent_commands import ( - AgentCommand, CancelAllCommand, ShutdownCommand, StartCommand) + AgentCommand, AddMonitorCommand, CancelAllCommand, ShutdownCommand, + StartCommand) from libmuscle.native_instantiator.iagent_manager import IAgentManager from libmuscle.planner.resources import Core, CoreSet, OnNodeResources from libmuscle.post_office import PostOffice @@ -49,7 +50,8 @@ def handle_request(self, request: bytes) -> bytes: response = self._get_command(*req_args) elif req_type == RequestType.REPORT_RESULT.value: response = self._report_result(*req_args) - + else: + _logger.warning(f'Unknown request type {req_type}') return cast(bytes, msgpack.packb(response, use_bin_type=True)) def _report_resources( @@ -166,6 +168,11 @@ def deposit_command(self, node_name: str, command: AgentCommand) -> None: AgentCommandType.START.value, command.name, str(command.work_dir), command.args, command.env, str(command.stdout), str(command.stderr) ] + elif isinstance(command, AddMonitorCommand): + command_obj = [ + AgentCommandType.ADD_MONITOR.value, command.instance, + command.hostname, command.pid + ] elif isinstance(command, CancelAllCommand): command_obj = [AgentCommandType.CANCEL_ALL.value] elif isinstance(command, ShutdownCommand): diff --git a/libmuscle/python/libmuscle/native_instantiator/native_instantiator.py b/libmuscle/python/libmuscle/native_instantiator/native_instantiator.py index 3073a16e..15fa40e2 100644 --- a/libmuscle/python/libmuscle/native_instantiator/native_instantiator.py +++ b/libmuscle/python/libmuscle/native_instantiator/native_instantiator.py @@ -10,7 +10,7 @@ from libmuscle.errors import ConfigurationError from libmuscle.manager.instantiator import ( CancelAllRequest, CrashedResult, create_instance_env, InstantiationRequest, - Process, ProcessStatus, reconfigure_logging, ShutdownRequest) + MonitorRequest, Process, ProcessStatus, reconfigure_logging, ShutdownRequest) from libmuscle.native_instantiator.agent_manager import AgentManager from libmuscle.native_instantiator.global_resources import global_resources from libmuscle.native_instantiator.run_script import make_script, prep_resources @@ -25,7 +25,7 @@ class NativeInstantiator(mp.Process): """Instantiates instances on the local machine.""" def __init__( self, resources: mp.Queue, requests: mp.Queue, results: mp.Queue, - log_records: mp.Queue, run_dir: Path) -> None: + log_records: mp.Queue, run_dir: Path, mlp_location: str) -> None: """Create a NativeInstantiator Args: @@ -34,6 +34,7 @@ def __init__( results: Queue to communicate finished processes over log_messages: Queue to push log messages to run_dir: Run directory for the current run + mlp_location: MLPServer network location string for the agents to connect to """ super().__init__(name='NativeInstantiator') self._resources_out = resources @@ -41,7 +42,7 @@ def __init__( self._results_out = results self._log_records_out = log_records self._run_dir = run_dir - + self._mlp_location = mlp_location self._processes: Dict[str, Process] = dict() def run(self) -> None: @@ -50,7 +51,7 @@ def run(self) -> None: logs_dir = self._run_dir / 'logs' logs_dir.mkdir(exist_ok=True) - self._agent_manager = AgentManager(logs_dir) + self._agent_manager = AgentManager(logs_dir, self._mlp_location) reconfigure_logging(self._log_records_out) self._send_resources() @@ -83,6 +84,11 @@ def _main(self) -> None: _logger.debug('Got ShutdownRequest') shutting_down = True + if isinstance(request, MonitorRequest): + _logger.debug('Got MonitorRequest') + self._agent_manager.add_monitor( + request.instance, request.hostname, request.pid) + elif isinstance(request, CancelAllRequest): _logger.debug('Got CancelAllRequest') self._agent_manager.cancel_all() diff --git a/libmuscle/python/libmuscle/native_instantiator/process_manager.py b/libmuscle/python/libmuscle/native_instantiator/process_manager.py index bfd8f3ca..e6114e3d 100644 --- a/libmuscle/python/libmuscle/native_instantiator/process_manager.py +++ b/libmuscle/python/libmuscle/native_instantiator/process_manager.py @@ -1,7 +1,7 @@ import logging from pathlib import Path from subprocess import Popen -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple _logger = logging.getLogger(__name__) @@ -14,8 +14,8 @@ def __init__(self) -> None: self._processes: Dict[str, Popen] = dict() def start( - self, name: str, work_dir: Path, args: List[str], env: Dict[str, str], - stdout: Path, stderr: Path) -> None: + self, name: str, work_dir: Path, args: List[str], + stdout: Path, stderr: Path, env: Optional[Dict[str, str]] = None) -> None: """Start a process. The files that the output is directed to will be overwritten if they already diff --git a/libmuscle/python/libmuscle/native_instantiator/test/test_process_manager.py b/libmuscle/python/libmuscle/native_instantiator/test/test_process_manager.py index 93dabcfb..1d7cf7ae 100644 --- a/libmuscle/python/libmuscle/native_instantiator/test/test_process_manager.py +++ b/libmuscle/python/libmuscle/native_instantiator/test/test_process_manager.py @@ -24,7 +24,7 @@ def _poll_completion(lpm, num_jobs): def test_run_process(lpm, tmp_path): lpm.start( - 'test', tmp_path, ['bash', '-c', 'exit 0'], {}, + 'test', tmp_path, ['bash', '-c', 'exit 0'], tmp_path / 'out', tmp_path / 'err') completed_jobs = _poll_completion(lpm, 1) assert completed_jobs[0] == ('test', 0) @@ -32,11 +32,11 @@ def test_run_process(lpm, tmp_path): def test_existing_process(lpm, tmp_path): lpm.start( - 'test', tmp_path, ['bash', '-c', 'exit 0'], {}, + 'test', tmp_path, ['bash', '-c', 'exit 0'], tmp_path / 'out', tmp_path / 'err') with pytest.raises(RuntimeError): lpm.start( - 'test', tmp_path, ['bash', '-c', 'exit 0'], {}, + 'test', tmp_path, ['bash', '-c', 'exit 0'], tmp_path / 'out', tmp_path / 'err') completed_jobs = _poll_completion(lpm, 1) @@ -47,8 +47,8 @@ def test_existing_process(lpm, tmp_path): def test_env(lpm, tmp_path): env = {'ENVVAR': 'TESTING123'} lpm.start( - 'test', tmp_path, ['bash', '-c', 'echo ${ENVVAR}'], env, - tmp_path / 'out', tmp_path / 'err') + 'test', tmp_path, ['bash', '-c', 'echo ${ENVVAR}'], + tmp_path / 'out', tmp_path / 'err', env) _poll_completion(lpm, 1) with (tmp_path / 'out').open('r') as f: @@ -59,7 +59,7 @@ def test_env(lpm, tmp_path): def test_exit_code(lpm, tmp_path): lpm.start( - 'test_exit_code', tmp_path, ['bash', '-c', 'exit 3'], {}, + 'test_exit_code', tmp_path, ['bash', '-c', 'exit 3'], tmp_path / 'out', tmp_path / 'err') done = lpm.get_finished() while not done: @@ -72,7 +72,7 @@ def test_exit_code(lpm, tmp_path): def test_multiple(lpm, tmp_path): for i in range(3): lpm.start( - f'test_{i}', tmp_path, ['bash', '-c', 'sleep 1'], {}, + f'test_{i}', tmp_path, ['bash', '-c', '/usr/bin/env sleep 1'], tmp_path / f'out{i}', tmp_path / f'err{i}') completed_jobs = _poll_completion(lpm, 3) @@ -85,7 +85,7 @@ def test_cancel_all(lpm, tmp_path): for i in range(2): lpm.start( - f'test_{i}', tmp_path, ['bash', '-c', 'sleep 1'], {}, + f'test_{i}', tmp_path, ['bash', '-c', '/usr/bin/env sleep 1'], tmp_path / f'out{i}', tmp_path / f'err{i}') lpm.cancel_all() @@ -100,7 +100,7 @@ def test_cancel_all(lpm, tmp_path): def test_output_redirect(lpm, tmp_path): lpm.start( - 'test', tmp_path, ['bash', '-c', 'ls'], {}, + 'test', tmp_path, ['bash', '-c', '/usr/bin/env ls'], tmp_path / 'out', tmp_path / 'err') _poll_completion(lpm, 1) with (tmp_path / 'out').open('r') as f: @@ -111,7 +111,7 @@ def test_output_redirect(lpm, tmp_path): def test_error_redirect(lpm, tmp_path): lpm.start( - 'test', tmp_path, ['bash', '-c', 'ls 1>&2'], {}, + 'test', tmp_path, ['bash', '-c', '/usr/bin/env ls 1>&2'], tmp_path / 'out', tmp_path / 'err') _poll_completion(lpm, 1) with (tmp_path / 'out').open('r') as f: diff --git a/libmuscle/python/libmuscle/profiling.py b/libmuscle/python/libmuscle/profiling.py index d3c1f9a5..8eb9e69e 100644 --- a/libmuscle/python/libmuscle/profiling.py +++ b/libmuscle/python/libmuscle/profiling.py @@ -18,6 +18,7 @@ class ProfileEventType(Enum): DISCONNECT_WAIT = 8 SHUTDOWN = 10 DEREGISTER = 1 + RESOURCE_USAGE = 11 class ProfileTimestamp: @@ -62,6 +63,8 @@ class ProfileEvent: message_size: Size of the message involved, if applicable. message_timestamp: Timestamp sent with the message, if applicable. + cpu_percent: CPU usage in percent, if applicable. + memory_usage: Memory usage in bytes, if applicable. Attributes: event_type: Type of event that was measured. @@ -77,6 +80,8 @@ class ProfileEvent: message_size: Size of the message involved, if applicable. message_timestamp: Timestamp sent with the message, if applicable. + cpu_percent: CPU usage in percent, if applicable. + memory_usage: Memory usage in bytes, if applicable. """ def __init__( self, @@ -88,7 +93,9 @@ def __init__( slot: Optional[int] = None, message_number: Optional[int] = None, message_size: Optional[int] = None, - message_timestamp: Optional[float] = None + message_timestamp: Optional[float] = None, + cpu_percent: Optional[float] = None, + memory_usage: Optional[int] = None ) -> None: self.event_type = event_type @@ -100,6 +107,8 @@ def __init__( self.message_number = message_number self.message_size = message_size self.message_timestamp = message_timestamp + self.cpu_percent = cpu_percent + self.memory_usage = memory_usage def start(self) -> None: """Sets start_time to the current time. diff --git a/libmuscle/python/libmuscle/test/test_mlp_client.py b/libmuscle/python/libmuscle/test/test_mlp_client.py new file mode 100644 index 00000000..fcac165a --- /dev/null +++ b/libmuscle/python/libmuscle/test/test_mlp_client.py @@ -0,0 +1,60 @@ +import msgpack +from unittest.mock import MagicMock, patch +from datetime import datetime + +from libmuscle.mcp.protocol import RequestType +from libmuscle.mlp_client import MLPClient + + +def test_create_mlp_client(): + with patch('libmuscle.mlp_client.TcpTransportClient') as mock_ttc: + client = MLPClient('node_name', 'location') + assert client._node_name == 'node_name' + assert client._transport_client == mock_ttc.return_value + mock_ttc.assert_called_with('location') + client.close() + + +def test_close_mlp_client(): + with patch('libmuscle.mlp_client.TcpTransportClient'): + client = MLPClient('node_name', 'location') + client.close() + client._transport_client.close.assert_called_once() + + +def test_report_usage(): + with patch('libmuscle.mlp_client.TcpTransportClient') as mock_ttc: + mock_ttc.return_value.call.return_value = (msgpack.packb([0]), None) + client = MLPClient('node_name', 'location') + + with patch('libmuscle.mlp_client.psutil') as mock_psutil: + mock_process = MagicMock() + mock_process.cpu_percent.return_value = 10.0 + mock_process.memory_info.return_value.vms = 1000 + mock_psutil.Process.return_value = mock_process + + client.report_usage([('instance1', 123)]) + client._usage.last_updated = datetime(1970, 1, 1, 0, 0, 0) + client.report_usage([('instance1', 123)]) + + mock_psutil.Process.assert_called_with(123) + mock_process.cpu_percent.assert_called() + mock_process.memory_info.assert_called() + + expected_usage = {'instance1': [10.0, 1000]} + expected_request = [ + RequestType.REPORT_USAGE.value, + 'node_name', expected_usage] + + args, _ = client._transport_client.call.call_args + decoded_request = msgpack.unpackb(args[0], raw=False) + assert decoded_request == expected_request + client.close() + + +def test_report_usage_no_pids(): + with patch('libmuscle.mlp_client.TcpTransportClient'): + client = MLPClient('node_name', 'location') + client.report_usage([]) + client._transport_client.call.assert_not_called() + client.close() diff --git a/libmuscle/python/libmuscle/test/test_mmp_client.py b/libmuscle/python/libmuscle/test/test_mmp_client.py index 2d8e1b42..2f7de671 100644 --- a/libmuscle/python/libmuscle/test/test_mmp_client.py +++ b/libmuscle/python/libmuscle/test/test_mmp_client.py @@ -1,3 +1,6 @@ +from os import getpid +from socket import gethostname + from unittest.mock import patch import msgpack @@ -92,6 +95,7 @@ def test_register_instance(mocked_mmp_client, profile_data) -> None: assert sent_msg == [ RequestType.REGISTER_INSTANCE.value, 'component[13]', ['direct:test', 'tcp:test'], [['out', 'O_I'], ['in', 'S']], + getpid(), gethostname(), libmuscle.__version__]