-
Notifications
You must be signed in to change notification settings - Fork 23
monitor lifecycle conductor #2723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development/9.3
Are you sure you want to change the base?
Changes from 9 commits
7857978
a1661ea
592d911
d064eec
5561284
43d3f90
8a19b71
d7000eb
4520f5a
5d84665
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,16 +1,60 @@ | ||||||
| const { ZenkoMetrics } = require('arsenal').metrics; | ||||||
|
|
||||||
| const LIFECYCLE_LABEL_ORIGIN = 'origin'; | ||||||
| const LIFECYCLE_LABEL_ORIGIN = 'origin'; | ||||||
| const LIFECYCLE_LABEL_OP = 'op'; | ||||||
| const LIFECYCLE_LABEL_STATUS = 'status'; | ||||||
| const LIFECYCLE_LABEL_LOCATION = 'location'; | ||||||
| const LIFECYCLE_LABEL_TYPE = 'type'; | ||||||
| const LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID = 'conductor_scan_id'; | ||||||
|
|
||||||
| const LIFECYCLE_MARKER_METRICS_LOCATION = '-delete-marker-'; | ||||||
|
|
||||||
| // Keep per-scan series long enough for scraping and debugging recent overlap, | ||||||
| // but remove them from prom-client after a configurable retention interval. | ||||||
| // Prometheus retains scraped scan-id series until TSDB retention expires. | ||||||
| const DEFAULT_SCAN_METRIC_RETENTION_MS = 24 * 60 * 60 * 1000; | ||||||
| const CONDUCTOR_ORIGIN = 'conductor'; | ||||||
| const BUCKET_PROCESSOR_ORIGIN = 'bucket_processor'; | ||||||
| let scanMetricRetentionMs = DEFAULT_SCAN_METRIC_RETENTION_MS; | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
— Claude Code |
||||||
|
|
||||||
| // Conductor scheduling heartbeat: timestamp (ms since epoch) of the | ||||||
| // instant the conductor most recently *started* a scan. Use this to | ||||||
| // detect "the conductor is no longer scheduling scans" via the | ||||||
| // LifecycleLateScan alert; do NOT subtract it from latest_batch_end_time | ||||||
| // to derive the scan duration: while a scan is in progress, end_time is | ||||||
| // from the previous run and start_time has just been refreshed, so the | ||||||
| // difference is negative. Use s3_lifecycle_conductor_last_batch_duration_seconds | ||||||
| // instead. | ||||||
| const conductorLatestBatchStartTime = ZenkoMetrics.createGauge({ | ||||||
| name: 's3_lifecycle_latest_batch_start_time', | ||||||
| help: 'Timestamp of latest lifecycle batch start time', | ||||||
| help: 'Conductor scheduling heartbeat: ms-since-epoch timestamp of ' + | ||||||
| 'the most recent scan start. Use to detect that the conductor is ' + | ||||||
| 'still scheduling scans (LifecycleLateScan alert). Do NOT use to ' + | ||||||
| 'derive scan duration; use ' + | ||||||
| 's3_lifecycle_conductor_last_batch_duration_seconds for that.', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
| }); | ||||||
|
|
||||||
| // Conductor scan-completion timestamp (ms since epoch) of the last | ||||||
| // successfully completed scan. Useful as a "scan completed at all" | ||||||
| // signal; combine with conductor_last_batch_duration_seconds to know | ||||||
| // "the most recent scan finished N seconds ago and took M seconds". | ||||||
| const conductorLatestBatchEndTime = ZenkoMetrics.createGauge({ | ||||||
| name: 's3_lifecycle_latest_batch_end_time', | ||||||
| help: 'Timestamp (ms since epoch) of the most recent successful ' + | ||||||
| 'lifecycle conductor scan completion.', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
| }); | ||||||
|
|
||||||
| // Duration of the latest conductor scan, computed by the conductor itself | ||||||
| // at scan completion. Exposed as a gauge so dashboards can render the most | ||||||
| // recent batch duration directly, without computing end - start in PromQL | ||||||
| // (which would yield negative values mid-scan, when end is from the | ||||||
| // previous batch and start has just been refreshed). | ||||||
| const conductorLastBatchDurationSeconds = ZenkoMetrics.createGauge({ | ||||||
| name: 's3_lifecycle_conductor_last_batch_duration_seconds', | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this redundant with if we are really interested in duration, should be an histogram
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kept the dedicated latest-duration gauge. It is not exactly redundant with start/end timestamps because latest_batch_start_time is refreshed at scan start while latest_batch_end_time is only refreshed when a scan completes. During an in-progress scan, computing end - start can be negative or misleading. The gauge records the last completed scan duration directly. I kept it as a gauge because it represents the latest completed batch duration, not a distribution of per-bucket or per-object durations. |
||||||
| help: 'Duration in seconds of the latest lifecycle conductor batch, ' + | ||||||
| 'as measured by the conductor at scan completion.', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
| }); | ||||||
|
|
||||||
|
|
@@ -50,6 +94,102 @@ const lifecycleLegacyTask = ZenkoMetrics.createCounter({ | |||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_STATUS], | ||||||
| }); | ||||||
|
|
||||||
| const conductorLatestBatchBucketCount = ZenkoMetrics.createGauge({ | ||||||
| name: 's3_lifecycle_latest_batch_bucket_count', | ||||||
| help: 'Number of buckets listed in the latest lifecycle conductor batch', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
|
francoisferrand marked this conversation as resolved.
|
||||||
| }); | ||||||
|
|
||||||
| const bucketProcessorScanMessagesProcessed = ZenkoMetrics.createCounter({ | ||||||
| name: 's3_lifecycle_bucket_processor_scan_messages_processed_total', | ||||||
| help: 'Total number of bucket-tasks topic messages picked up by this ' + | ||||||
| 'bucket processor, grouped by conductor scan id. Each message ' + | ||||||
| 'corresponds to a single listing slice (initial or continuation), not ' + | ||||||
| 'a unique bucket: a bucket with multiple listings (truncated v1, or ' + | ||||||
| 'current/noncurrent/orphan splits in v2) increments this counter once ' + | ||||||
| 'per slice. Multiple conductor_scan_id label values over the same ' + | ||||||
| 'query window indicate that bucket processors recently handled work ' + | ||||||
| 'from different scans. Normal operation is expected to expose one ' + | ||||||
| 'scan id at a time; scan-id series are removed locally after the ' + | ||||||
| 'configured bucket processor retention interval without update to ' + | ||||||
| 'avoid unbounded process memory growth. ' + | ||||||
| 'Prometheus retains scraped scan-id series until TSDB retention.', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID], | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unbounded label cardinality:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benzekrimaha this is an issue.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we actually don't need the same approach for both metrics: having a metrics whose value is the ScanID (which would be an integer, not a UUID) would provide this info if we simply record the metrics when we detect a new ScanID (the "time" is recorded implicitly by prometheus) yet to track each "bucket" event, we may need to increment some counter... and thus would need to have a timer (in lifecyleMetrics!) to ensure we remove the series once we've not updated the series for a "long" time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
@benzekrimaha The "24h" should also be configurable (in backbeat), so that the compromise could be tweaked on a platform, based on cronjob value and typical scan duration.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. The local per-scan series cleanup is now configurable with bucketProcessor.scanMetricRetentionMs, defaulting to 24h. I also kept the metric help text concise: it says series are removed locally after the configured bucket-processor retention interval and that Prometheus keeps scraped series until TSDB retention.
benzekrimaha marked this conversation as resolved.
benzekrimaha marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an intentional trade-off for the overlapping-scan alert: the alert needs to distinguish scan IDs to detect bucket processors handling work from more than one conductor scan. Local prom-client cleanup is configurable through bucketProcessor.scanMetricRetentionMs, and the metric help text now states that Prometheus retains scraped scan-id series until TSDB retention. I kept the wording concise to avoid over-explaining this internal troubleshooting detail in user-facing monitoring text.
benzekrimaha marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a known trade-off and matches the compromise discussed with François earlier. We keep
benzekrimaha marked this conversation as resolved.
|
||||||
| }); | ||||||
|
francoisferrand marked this conversation as resolved.
francoisferrand marked this conversation as resolved.
benzekrimaha marked this conversation as resolved.
francoisferrand marked this conversation as resolved.
benzekrimaha marked this conversation as resolved.
|
||||||
|
|
||||||
| const bucketProcessorScanMessageAgeSeconds = ZenkoMetrics.createHistogram({ | ||||||
| name: 's3_lifecycle_bucket_processor_scan_message_age_seconds', | ||||||
| help: 'Age in seconds of bucket-tasks topic messages when they finish ' + | ||||||
| 'processing in the bucket processor, measured from the conductor scan ' + | ||||||
| 'start timestamp propagated in the message context.', | ||||||
| labelNames: [LIFECYCLE_LABEL_ORIGIN], | ||||||
| buckets: [60, 300, 600, 1800, 3600, 7200, 14400, 28800, 43200, 86400], | ||||||
| }); | ||||||
|
|
||||||
| const scanMetricTimers = new Map(); | ||||||
|
|
||||||
| function removeBucketProcessorScanMetrics(conductorScanId) { | ||||||
| try { | ||||||
| bucketProcessorScanMessagesProcessed.remove({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, | ||||||
| [LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId, | ||||||
| }); | ||||||
| } catch { | ||||||
| // Best-effort cleanup: metrics are observational only. | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| function setScanMetricTimeout(conductorScanId) { | ||||||
| const previousTimer = scanMetricTimers.get(conductorScanId); | ||||||
| if (previousTimer) { | ||||||
| clearTimeout(previousTimer); | ||||||
| } | ||||||
|
|
||||||
| const cleanupTimer = setTimeout(() => { | ||||||
| removeBucketProcessorScanMetrics(conductorScanId); | ||||||
| scanMetricTimers.delete(conductorScanId); | ||||||
| }, scanMetricRetentionMs); | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
— Claude Code |
||||||
| if (typeof cleanupTimer.unref === 'function') { | ||||||
| cleanupTimer.unref(); | ||||||
| } | ||||||
| scanMetricTimers.set(conductorScanId, cleanupTimer); | ||||||
| } | ||||||
|
|
||||||
| function observeBucketProcessorScanMessageAge(conductorScanStartTimestamp) { | ||||||
| // Messages produced before this field existed can still be consumed during | ||||||
| // rolling upgrades, so skip invalid timestamps instead of logging noise. | ||||||
| if (typeof conductorScanStartTimestamp !== 'number' || | ||||||
| !Number.isFinite(conductorScanStartTimestamp) || | ||||||
| conductorScanStartTimestamp <= 0) { | ||||||
| return; | ||||||
| } | ||||||
|
Comment on lines
+161
to
+165
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should we make an explicit check, or just catch error (like done in multiple places already) ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kept the explicit guard here intentionally. Missing/invalid conductorScanStartTimestamp can happen during rolling upgrades for messages produced before this field existed, and that should simply skip the age observation rather than log a metric error. The rest of the metric update is still protected by the outer catch. |
||||||
|
|
||||||
| const ageSeconds = (Date.now() - conductorScanStartTimestamp) / 1000; | ||||||
| if (ageSeconds >= 0) { | ||||||
| bucketProcessorScanMessageAgeSeconds.observe({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, | ||||||
| }, ageSeconds); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| function clearScanMetricTimers() { | ||||||
| scanMetricTimers.forEach(timer => clearTimeout(timer)); | ||||||
| scanMetricTimers.clear(); | ||||||
| } | ||||||
|
|
||||||
| function resetLifecycleScanMetricCleanupTimers() { | ||||||
| clearScanMetricTimers(); | ||||||
| scanMetricRetentionMs = DEFAULT_SCAN_METRIC_RETENTION_MS; | ||||||
| } | ||||||
|
|
||||||
| function configureLifecycleScanMetricRetention(retentionMs) { | ||||||
| if (typeof retentionMs === 'number' && | ||||||
| Number.isFinite(retentionMs) && | ||||||
| retentionMs > 0) { | ||||||
| scanMetricRetentionMs = retentionMs; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| const lifecycleS3Operations = ZenkoMetrics.createCounter({ | ||||||
| name: 's3_lifecycle_s3_operations_total', | ||||||
| help: 'Total number of S3 operations by the lifecycle processes', | ||||||
|
|
@@ -113,11 +253,26 @@ class LifecycleMetrics { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| static onProcessBuckets(log) { | ||||||
| /** | ||||||
| * Update the conductor scheduling heartbeat. Called at the start of | ||||||
| * every conductor scan; consumed by the LifecycleLateScan alert to | ||||||
| * detect that the conductor has stopped scheduling. Does NOT mark a | ||||||
| * scan as in progress and is NOT meant to be subtracted from | ||||||
| * latest_batch_end_time to derive a duration: use | ||||||
| * onConductorScanComplete's durationSeconds for that. | ||||||
| * | ||||||
| * @param {Object} log - logger | ||||||
| * @param {number} scanStartTimestamp - scan start timestamp in ms | ||||||
| */ | ||||||
| static onProcessBuckets(log, scanStartTimestamp = Date.now()) { | ||||||
| try { | ||||||
| conductorLatestBatchStartTime.set({ origin: 'conductor' }, Date.now()); | ||||||
| conductorLatestBatchStartTime.set( | ||||||
| { [LIFECYCLE_LABEL_ORIGIN]: CONDUCTOR_ORIGIN }, | ||||||
| scanStartTimestamp); | ||||||
| } catch (err) { | ||||||
| LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onProcessBuckets'); | ||||||
| LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onProcessBuckets', { | ||||||
| scanStartTimestamp, | ||||||
| }); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -172,6 +327,79 @@ class LifecycleMetrics { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Record metrics at the end of a full conductor scan. | ||||||
| * @param {Object} log - logger | ||||||
| * @param {number} bucketCount - total buckets listed | ||||||
| * @param {number} [durationSeconds] - duration of the scan in seconds, | ||||||
| * as measured by the conductor. When provided and finite, sets the | ||||||
| * s3_lifecycle_conductor_last_batch_duration_seconds gauge. Optional | ||||||
| * for forward-compatibility with callers that do not measure it. | ||||||
| */ | ||||||
|
benzekrimaha marked this conversation as resolved.
|
||||||
| static onConductorScanComplete(log, bucketCount, durationSeconds) { | ||||||
| try { | ||||||
| const endTimestamp = Date.now(); | ||||||
| conductorLatestBatchEndTime.set({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: CONDUCTOR_ORIGIN, | ||||||
| }, endTimestamp); | ||||||
| conductorLatestBatchBucketCount.set({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: CONDUCTOR_ORIGIN, | ||||||
| }, bucketCount); | ||||||
| if (typeof durationSeconds === 'number' && | ||||||
| Number.isFinite(durationSeconds) && | ||||||
| durationSeconds >= 0) { | ||||||
| conductorLastBatchDurationSeconds.set({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: CONDUCTOR_ORIGIN, | ||||||
| }, durationSeconds); | ||||||
| } | ||||||
| } catch (err) { | ||||||
| LifecycleMetrics.handleError( | ||||||
| log, err, 'LifecycleMetrics.onConductorScanComplete', { | ||||||
| bucketCount, | ||||||
| durationSeconds, | ||||||
| } | ||||||
| ); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Increment the count of bucket-tasks topic messages picked up by this | ||||||
| * bucket processor for a specific conductor scan. Called before the task | ||||||
| * is dispatched to the scheduler, once per Kafka message regardless of how | ||||||
| * many objects it covers or whether processing eventually succeeds. | ||||||
| * | ||||||
| * Note: this counts messages (initial + continuation/listing slices), | ||||||
| * not unique buckets. Keep one time series per conductor_scan_id so that | ||||||
| * overlapping scans remain visible. Old scan series are removed by a | ||||||
| * timer after the configured scanMetricRetentionMs interval without | ||||||
| * update to avoid unbounded prom-client memory growth. | ||||||
| * | ||||||
| * @param {Object} log - logger | ||||||
| * @param {string} conductorScanId - conductor scan id from contextInfo | ||||||
| * @param {number} [conductorScanStartTimestamp] - conductor scan start | ||||||
| * timestamp from contextInfo | ||||||
| */ | ||||||
| static onBucketProcessorScanMessageReceived( | ||||||
| log, conductorScanId, conductorScanStartTimestamp) { | ||||||
| if (!conductorScanId) { | ||||||
| return; | ||||||
| } | ||||||
| try { | ||||||
| bucketProcessorScanMessagesProcessed.inc({ | ||||||
| [LIFECYCLE_LABEL_ORIGIN]: BUCKET_PROCESSOR_ORIGIN, | ||||||
| [LIFECYCLE_LABEL_CONDUCTOR_SCAN_ID]: conductorScanId, | ||||||
| }); | ||||||
| observeBucketProcessorScanMessageAge(conductorScanStartTimestamp); | ||||||
| setScanMetricTimeout(conductorScanId); | ||||||
| } catch (err) { | ||||||
|
francoisferrand marked this conversation as resolved.
|
||||||
| LifecycleMetrics.handleError( | ||||||
| log, err, | ||||||
| 'LifecycleMetrics.onBucketProcessorScanMessageReceived', | ||||||
| { conductorScanId, conductorScanStartTimestamp } | ||||||
| ); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| static onLifecycleTriggered(log, process, type, location, latencyMs) { | ||||||
| try { | ||||||
| lifecycleTriggerLatency.observe({ | ||||||
|
|
@@ -249,4 +477,6 @@ class LifecycleMetrics { | |||||
| module.exports = { | ||||||
| LifecycleMetrics, | ||||||
| LIFECYCLE_MARKER_METRICS_LOCATION, | ||||||
| configureLifecycleScanMetricRetention, | ||||||
| resetLifecycleScanMetricCleanupTimers, | ||||||
| }; | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Module-level mutable state (
let scanMetricRetentionMs) shared across tests. TheresetLifecycleScanMetricCleanupTimersfunction handles this for tests, but in production, ifconfigureLifecycleScanMetricRetentionis called multiple times (e.g. multipleLifecycleBucketProcessorinstances in the same process), the last call wins silently. This is likely fine given the current single-instance architecture, but worth noting.— Claude Code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is process-wide state by design for the current bucket processor deployment model: metrics are process-wide and there is a single lifecycle bucket processor service configuration per process. Tests reset it through resetLifecycleScanMetricCleanupTimers(). If we ever run multiple differently configured lifecycle bucket processors in one process, this should be revisited with instance-scoped metric state.