-
Notifications
You must be signed in to change notification settings - Fork 23
monitor lifecycle conductor #2723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development/9.3
Are you sure you want to change the base?
Changes from all commits
7857978
a1661ea
592d911
d064eec
5561284
43d3f90
8a19b71
d7000eb
4520f5a
5d84665
13537ec
7ab1387
a24c6c2
86cfaf1
d8cf09a
c00900b
0a2db86
32f8197
516907c
560d382
89c323b
e9353d6
de1170d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,16 +1,42 @@ | ||||||
| const { ZenkoMetrics } = require('arsenal').metrics; | ||||||
|
|
||||||
| const LIFECYCLE_LABEL_ORIGIN = 'origin'; | ||||||
| const LIFECYCLE_LABEL_ORIGIN = 'origin'; | ||||||
| const LIFECYCLE_LABEL_OP = 'op'; | ||||||
| const LIFECYCLE_LABEL_STATUS = 'status'; | ||||||
| const LIFECYCLE_LABEL_LOCATION = 'location'; | ||||||
| const LIFECYCLE_LABEL_TYPE = 'type'; | ||||||
| const LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID = 'conductor_scan_id'; | ||||||
|
|
||||||
| const LIFECYCLE_MARKER_METRICS_LOCATION = '-delete-marker-'; | ||||||
|
|
||||||
| // Keep per-scan series long enough for scraping and debugging recent overlap, | ||||||
| // but remove them from prom-client after a configurable retention interval. | ||||||
| // We intentionally do not cap the number of tracked scan IDs: if overlapping | ||||||
| // scans happen, hiding older IDs would remove the signal this metric provides. | ||||||
| // Prometheus retains scraped scan-id series until TSDB retention expires. | ||||||
| const DEFAULT_SCAN_METRIC_RETENTION_S = 24 * 60 * 60; | ||||||
| const CONDUCTOR_ORIGIN = 'conductor'; | ||||||
| const BUCKET_PROCESSOR_ORIGIN = 'bucket_processor'; | ||||||
| let scanMetricRetentionMs = DEFAULT_SCAN_METRIC_RETENTION_S * 1000; | ||||||
|
|
||||||
| // Conductor scheduling heartbeat: timestamp (ms since epoch) of the | ||||||
| // instant the conductor most recently *started* a scan. Use this to | ||||||
| // detect "the conductor is no longer scheduling scans" via the | ||||||
| // LifecycleLateScan alert. | ||||||
| const conductorLatestBatchStartTime = ZenkoMetrics.createGauge({ | ||||||
| name: 's3_lifecycle_latest_batch_start_time', | ||||||
| help: 'Timestamp of latest lifecycle batch start time', | ||||||
| help: 'Conductor scheduling heartbeat: ms-since-epoch timestamp of ' + | ||||||
| 'the most recent scan start. Use to detect that the conductor is ' + | ||||||
| 'still scheduling scans (LifecycleLateScan alert).', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
| }); | ||||||
|
|
||||||
| // Conductor scan-end timestamp (ms since epoch) of the last scan that reached | ||||||
| // the listing phase. | ||||||
| const conductorLatestBatchEndTime = ZenkoMetrics.createGauge({ | ||||||
| name: 's3_lifecycle_latest_batch_end_time', | ||||||
| help: 'Timestamp (ms since epoch) of the most recent lifecycle ' + | ||||||
| 'conductor scan end after the scan reached bucket listing.', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
| }); | ||||||
|
|
||||||
|
|
@@ -50,6 +76,102 @@ const lifecycleLegacyTask = ZenkoMetrics.createCounter({ | |||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_STATUS], | ||||||
| }); | ||||||
|
|
||||||
| const conductorLatestBatchBucketCount = ZenkoMetrics.createGauge({ | ||||||
| name: 's3_lifecycle_latest_batch_bucket_count', | ||||||
| help: 'Number of buckets listed in the latest lifecycle conductor batch', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
| }); | ||||||
|
|
||||||
| const bucketProcessorScanMessagesProcessed = ZenkoMetrics.createCounter({ | ||||||
| name: 's3_lifecycle_bucket_processor_scan_messages_processed_total', | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naming nit: this counter is incremented at message receipt, before processing and regardless of success or object count (the JSDoc on |
||||||
| help: 'Total number of bucket-tasks topic messages picked up by this ' + | ||||||
| 'bucket processor, grouped by conductor scan id. Each message ' + | ||||||
| 'corresponds to a single listing slice (initial or continuation), not ' + | ||||||
| 'a unique bucket: a bucket with multiple listings (truncated v1, or ' + | ||||||
| 'current/noncurrent/orphan splits in v2) increments this counter once ' + | ||||||
| 'per slice. Multiple conductor_scan_id label values over the same ' + | ||||||
| 'query window indicate that bucket processors recently handled work ' + | ||||||
| 'from different scans. Normal operation is expected to expose one ' + | ||||||
| 'scan id at a time; scan-id series are removed locally after the ' + | ||||||
| 'configured bucket processor retention interval without update to ' + | ||||||
| 'avoid unbounded process memory growth. ' + | ||||||
| 'Prometheus retains scraped scan-id series until TSDB retention.', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID], | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
@benzekrimaha The "24h" should also be configurable (in backbeat), so that the compromise could be tweaked on a platform, based on cronjob value and typical scan duration.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. The local per-scan series cleanup is now configurable with bucketProcessor.scanMetricRetentionMs, defaulting to 24h. I also kept the metric help text concise: it says series are removed locally after the configured bucket-processor retention interval and that Prometheus keeps scraped series until TSDB retention.
benzekrimaha marked this conversation as resolved.
benzekrimaha marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an intentional trade-off for the overlapping-scan alert: the alert needs to distinguish scan IDs to detect bucket processors handling work from more than one conductor scan. Local prom-client cleanup is configurable through bucketProcessor.scanMetricRetentionMs, and the metric help text now states that Prometheus retains scraped scan-id series until TSDB retention. I kept the wording concise to avoid over-explaining this internal troubleshooting detail in user-facing monitoring text.
benzekrimaha marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a known trade-off and matches the compromise discussed with François earlier. We keep
benzekrimaha marked this conversation as resolved.
|
||||||
| }); | ||||||
|
francoisferrand marked this conversation as resolved.
francoisferrand marked this conversation as resolved.
benzekrimaha marked this conversation as resolved.
francoisferrand marked this conversation as resolved.
benzekrimaha marked this conversation as resolved.
|
||||||
|
|
||||||
| const bucketProcessorScanMessageAgeSeconds = ZenkoMetrics.createHistogram({ | ||||||
| name: 's3_lifecycle_bucket_processor_scan_message_age_seconds', | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The help text says the age is measured "when they finish processing in the bucket processor," but Either fix the help text to describe what's measured, or move the observation to actual task completion if processing time is what we want. |
||||||
| help: 'Age in seconds of bucket-tasks topic messages when they finish ' + | ||||||
| 'processing in the bucket processor, measured from the conductor scan ' + | ||||||
| 'start timestamp propagated in the message context.', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
| buckets: [60, 300, 600, 1800, 3600, 7200, 14400, 28800, 43200, 86400], | ||||||
| }); | ||||||
|
|
||||||
| const scanMetricTimers = new Map(); | ||||||
|
benzekrimaha marked this conversation as resolved.
|
||||||
|
|
||||||
| function removeBucketProcessorScanMetrics(conductorScanId) { | ||||||
| try { | ||||||
| bucketProcessorScanMessagesProcessed.remove({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, | ||||||
| [LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId, | ||||||
| }); | ||||||
| } catch { | ||||||
| // Best-effort cleanup: metrics are observational only. | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| function setScanMetricTimeout(conductorScanId) { | ||||||
| const previousTimer = scanMetricTimers.get(conductorScanId); | ||||||
| if (previousTimer) { | ||||||
| clearTimeout(previousTimer); | ||||||
| } | ||||||
|
|
||||||
| // Reset retention on every message so an active scan remains observable. | ||||||
| // Cleanup starts only after the scan stops producing bucket-task messages. | ||||||
| const cleanupTimer = setTimeout(() => { | ||||||
| removeBucketProcessorScanMetrics(conductorScanId); | ||||||
| scanMetricTimers.delete(conductorScanId); | ||||||
| }, scanMetricRetentionMs); | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
— Claude Code |
||||||
| if (typeof cleanupTimer.unref === 'function') { | ||||||
| cleanupTimer.unref(); | ||||||
| } | ||||||
| scanMetricTimers.set(conductorScanId, cleanupTimer); | ||||||
| } | ||||||
|
|
||||||
| function observeBucketProcessorScanMessageAge(conductorScanStartTimestamp) { | ||||||
| // Messages produced before this field existed can still be consumed during | ||||||
| // rolling upgrades, so skip invalid timestamps instead of logging noise. | ||||||
| if (typeof conductorScanStartTimestamp !== 'number' || | ||||||
| !Number.isFinite(conductorScanStartTimestamp) || | ||||||
| conductorScanStartTimestamp <= 0) { | ||||||
| return; | ||||||
| } | ||||||
|
Comment on lines
+145
to
+149
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should we make an explicit check, or just catch error (like done in multiple places already) ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kept the explicit guard here intentionally. Missing/invalid conductorScanStartTimestamp can happen during rolling upgrades for messages produced before this field existed, and that should simply skip the age observation rather than log a metric error. The rest of the metric update is still protected by the outer catch.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we are introducing observability:
we should also ensure we keep the try/catch block, it is a good safety measure (we often have bad/missing metrics due to bug in the code) |
||||||
|
|
||||||
| const ageSeconds = (Date.now() - conductorScanStartTimestamp) / 1000; | ||||||
| if (ageSeconds >= 0) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||||||
| bucketProcessorScanMessageAgeSeconds.observe({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, | ||||||
| }, ageSeconds); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| function resetLifecycleScanMetricCleanupTimers() { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not clear why we would ever want to clear the timers BUT not the associated metrics : it will leave many series forever, leaving to the unbounded labels we want to manage with the timers....
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Timer reset now also removes associated metric series. |
||||||
| scanMetricTimers.forEach((timer, conductorScanId) => { | ||||||
| clearTimeout(timer); | ||||||
| removeBucketProcessorScanMetrics(conductorScanId); | ||||||
| }); | ||||||
| scanMetricTimers.clear(); | ||||||
| scanMetricRetentionMs = DEFAULT_SCAN_METRIC_RETENTION_S * 1000; | ||||||
| } | ||||||
|
|
||||||
| function configureLifecycleScanMetricRetention(retentionS) { | ||||||
| // Called during bucket-processor startup before scan messages are consumed. | ||||||
| // Runtime config reload is not supported, so existing timers are not | ||||||
| // rescheduled when this value is set. | ||||||
| scanMetricRetentionMs = retentionS * 1000; | ||||||
|
benzekrimaha marked this conversation as resolved.
|
||||||
| } | ||||||
|
|
||||||
| const lifecycleS3Operations = ZenkoMetrics.createCounter({ | ||||||
| name: 's3_lifecycle_s3_operations_total', | ||||||
| help: 'Total number of S3 operations by the lifecycle processes', | ||||||
|
|
@@ -113,11 +235,23 @@ class LifecycleMetrics { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| static onProcessBuckets(log) { | ||||||
| /** | ||||||
| * Update the conductor scheduling heartbeat. Called at the start of | ||||||
| * every conductor scan; consumed by the LifecycleLateScan alert to | ||||||
| * detect that the conductor has stopped scheduling. | ||||||
| * | ||||||
| * @param {Object} log - logger | ||||||
| * @param {number} scanStartTimestamp - scan start timestamp in ms | ||||||
| */ | ||||||
| static onProcessBuckets(log, scanStartTimestamp = Date.now()) { | ||||||
| try { | ||||||
| conductorLatestBatchStartTime.set({ origin: 'conductor' }, Date.now()); | ||||||
| conductorLatestBatchStartTime.set( | ||||||
| { [LIFECYCLE_LABEL_ORIGIN]: CONDUCTOR_ORIGIN }, | ||||||
| scanStartTimestamp); | ||||||
| } catch (err) { | ||||||
| LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onProcessBuckets'); | ||||||
| LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onProcessBuckets', { | ||||||
| scanStartTimestamp, | ||||||
| }); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -172,6 +306,69 @@ class LifecycleMetrics { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Record metrics at the end of a full conductor scan. | ||||||
| * @param {Object} log - logger | ||||||
| * @param {number} bucketCount - total buckets listed | ||||||
| */ | ||||||
|
benzekrimaha marked this conversation as resolved.
|
||||||
| static onConductorScanComplete(log, bucketCount) { | ||||||
| try { | ||||||
| const endTimestamp = Date.now(); | ||||||
| conductorLatestBatchEndTime.set({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: CONDUCTOR_ORIGIN, | ||||||
| }, endTimestamp); | ||||||
| conductorLatestBatchBucketCount.set({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: CONDUCTOR_ORIGIN, | ||||||
| }, bucketCount); | ||||||
| } catch (err) { | ||||||
| LifecycleMetrics.handleError( | ||||||
| log, err, 'LifecycleMetrics.onConductorScanComplete', { | ||||||
| bucketCount, | ||||||
| } | ||||||
| ); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Increment the count of bucket-tasks topic messages picked up by this | ||||||
| * bucket processor for a specific conductor scan. Called before the task | ||||||
| * is dispatched to the scheduler, once per Kafka message regardless of how | ||||||
| * many objects it covers or whether processing eventually succeeds. | ||||||
| * | ||||||
| * Note: this counts messages (initial + continuation/listing slices), | ||||||
| * not unique buckets. Keep one time series per conductor_scan_id so that | ||||||
| * overlapping scans remain visible. Old scan series are removed by a | ||||||
| * timer after the configured scanMetricRetentionS interval without | ||||||
| * update to avoid unbounded prom-client memory growth. | ||||||
| * | ||||||
| * @param {Object} log - logger | ||||||
| * @param {string} conductorScanId - conductor scan id from contextInfo | ||||||
| * @param {number} [conductorScanStartTimestamp] - conductor scan start | ||||||
| * timestamp from contextInfo | ||||||
| */ | ||||||
| static onBucketProcessorScanMessageReceived( | ||||||
| log, conductorScanId, conductorScanStartTimestamp) { | ||||||
| // Old conductor messages produced during rolling upgrades do not have | ||||||
| // a scan id. Do not create a synthetic "undefined" scan-id series. | ||||||
| if (!conductorScanId) { | ||||||
| return; | ||||||
| } | ||||||
| try { | ||||||
| bucketProcessorScanMessagesProcessed.inc({ | ||||||
|
benzekrimaha marked this conversation as resolved.
|
||||||
| [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, | ||||||
| [LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId, | ||||||
| }); | ||||||
| observeBucketProcessorScanMessageAge(conductorScanStartTimestamp); | ||||||
| setScanMetricTimeout(conductorScanId); | ||||||
| } catch (err) { | ||||||
|
francoisferrand marked this conversation as resolved.
|
||||||
| LifecycleMetrics.handleError( | ||||||
| log, err, | ||||||
| 'LifecycleMetrics.onBucketProcessorScanMessageReceived', | ||||||
| { conductorScanId, conductorScanStartTimestamp } | ||||||
| ); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| static onLifecycleTriggered(log, process, type, location, latencyMs) { | ||||||
| try { | ||||||
| lifecycleTriggerLatency.observe({ | ||||||
|
|
@@ -249,4 +446,6 @@ class LifecycleMetrics { | |||||
| module.exports = { | ||||||
| LifecycleMetrics, | ||||||
| LIFECYCLE_MARKER_METRICS_LOCATION, | ||||||
| configureLifecycleScanMetricRetention, | ||||||
| resetLifecycleScanMetricCleanupTimers, | ||||||
| }; | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retention default now lives in three places:
86400here, andDEFAULT_SCAN_METRIC_RETENTION_S = 24 * 60 * 60duplicated in bothLifecycleMetrics.jsandLifecycleConfigValidator.js. Suggest a single exported constant (re-used by the validator's.default(...)) so these can't drift apart.