2121import threading
2222from typing import Any , Callable
2323
24+ from opentelemetry ._opamp .callbacks import Callbacks , MessageData
2425from opentelemetry ._opamp .client import OpAMPClient
2526from opentelemetry ._opamp .proto import opamp_pb2
2627
2728logger = logging .getLogger (__name__ )
2829
2930
31+ def _safe_invoke (function : Callable [..., Any ], * args : Any ) -> None :
32+ function_name = "<unknown>"
33+ try :
34+ function_name = function .__name__
35+ function (* args )
36+ except Exception as exc : # pylint: disable=broad-exception-caught
37+ logger .error (
38+ "Error when invoking function '%s'" , function_name , exc_info = exc
39+ )
40+
41+
3042class _Job :
3143 """
3244 Represents a single request job, with retry/backoff metadata.
@@ -73,24 +85,22 @@ def __init__(
7385 self ,
7486 * ,
7587 interval : float = 30 ,
76- message_handler : Callable [
77- ["OpAMPAgent" , OpAMPClient , opamp_pb2 .ServerToAgent ], None
78- ],
88+ callbacks : Callbacks ,
7989 max_retries : int = 10 ,
8090 heartbeat_max_retries : int = 1 ,
8191 initial_backoff : float = 1.0 ,
8292 client : OpAMPClient ,
8393 ):
8494 """
8595 :param interval: seconds between heartbeat calls
86- :param message_handler: user provided function that takes the received ServerToAgent message
96+ :param callbacks: Callbacks instance for receiving client events
8797 :param max_retries: how many times to retry a failed job for ad-hoc messages
8898 :param heartbeat_max_retries: how many times to retry an heartbeat failed job
8999 :param initial_backoff: base seconds for exponential backoff
90100 :param client: an OpAMPClient instance
91101 """
92102 self ._interval = interval
93- self ._handler = message_handler
103+ self ._callbacks = callbacks
94104 self ._max_retries = max_retries
95105 self ._heartbeat_max_retries = heartbeat_max_retries
96106 self ._initial_backoff = initial_backoff
@@ -186,23 +196,31 @@ def _run_worker(self) -> None:
186196 while job .should_retry () and not self ._stop .is_set ():
187197 try :
188198 message = self ._client .send (job .payload )
199+ _safe_invoke (
200+ self ._callbacks .on_connect , self , self ._client
201+ )
189202 logger .debug ("Job succeeded: %r" , job .payload )
190203 break
191204 except Exception as exc :
192205 job .attempt += 1
206+ _safe_invoke (
207+ self ._callbacks .on_connect_failed ,
208+ self ,
209+ self ._client ,
210+ exc ,
211+ )
193212 logger .warning (
194213 "Job %r failed attempt %d/%d: %s" ,
195214 job .payload ,
196215 job .attempt ,
197- job .max_retries ,
216+ job .max_retries + 1 ,
198217 exc ,
199218 )
200219
201220 if not job .should_retry ():
202221 logger .error (
203222 "Job %r dropped after max retries" , job .payload
204223 )
205- logger .exception (exc )
206224 break
207225
208226 # exponential backoff with +/- 20% jitter, interruptible by stop event
@@ -216,14 +234,7 @@ def _run_worker(self) -> None:
216234 break
217235
218236 if message is not None :
219- # we can't do much if the handler fails other than logging
220- try :
221- self ._handler (self , self ._client , message )
222- logger .debug ("Called Job message handler for: %r" , message )
223- except Exception as exc :
224- logger .warning (
225- "Job %r handler failed with: %s" , job .payload , exc
226- )
237+ self ._process_message (message )
227238
228239 try :
229240 if job .callback is not None :
@@ -233,6 +244,29 @@ def _run_worker(self) -> None:
233244 finally :
234245 self ._queue .task_done ()
235246
247+ def _process_message (self , message : opamp_pb2 .ServerToAgent ) -> None :
248+ if message .HasField ("error_response" ):
249+ _safe_invoke (
250+ self ._callbacks .on_error ,
251+ self ,
252+ self ._client ,
253+ message .error_response ,
254+ )
255+ return
256+
257+ if message .flags & opamp_pb2 .ServerToAgentFlags_ReportFullState :
258+ logger .debug ("Server requested full state report" )
259+ payload = self ._client .build_full_state_message ()
260+ self .send (payload )
261+
262+ msg_data = MessageData .from_server_message (message )
263+ _safe_invoke (
264+ self ._callbacks .on_message ,
265+ self ,
266+ self ._client ,
267+ msg_data ,
268+ )
269+
236270 def stop (self , timeout : float | None = None ) -> None :
237271 """
238272 Signal server we are disconnecting and then threads to exit
0 commit comments