Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions src/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -70,8 +69,8 @@ func newAsyncEventProcessor(cfg asyncProcessorConfig, logger *zap.Logger) *async
logger: logger,
}

asyncWorkerCountGauge.Set(float64(cfg.WorkerCount))
asyncQueueCapacityGauge.Set(float64(cfg.QueueSize))
defaultMetricRecorder.SetAsyncWorkerCount(cfg.WorkerCount)
defaultMetricRecorder.SetAsyncQueueCapacity(cfg.QueueSize)
return processor
}

Expand Down Expand Up @@ -99,11 +98,11 @@ func (p *asyncEventProcessor) Enqueue(ctx context.Context, eventType string, bod

select {
case p.queue <- event:
asyncQueueDepthGauge.Set(float64(len(p.queue)))
defaultMetricRecorder.SetAsyncQueueDepth(len(p.queue))
return nil
default:
asyncQueueDroppedCounter.WithLabelValues(eventType).Inc()
asyncQueueDepthGauge.Set(float64(len(p.queue)))
defaultMetricRecorder.RecordAsyncQueueDropped(eventType)
defaultMetricRecorder.SetAsyncQueueDepth(len(p.queue))
return fmt.Errorf("event queue is full")
}
}
Expand All @@ -112,19 +111,19 @@ func (p *asyncEventProcessor) runWorker(workerID int) {
defer p.wg.Done()

for event := range p.queue {
asyncQueueDepthGauge.Set(float64(len(p.queue)))
defaultMetricRecorder.SetAsyncQueueDepth(len(p.queue))
start := time.Now()

processor, ok := p.processFn[event.eventType]
if !ok {
asyncUnsupportedEventsCounter.WithLabelValues(event.eventType).Inc()
defaultMetricRecorder.RecordAsyncUnsupportedEvent(event.eventType)
continue
}

func() {
defer func() {
if recovered := recover(); recovered != nil {
asyncProcessingFailuresCounter.WithLabelValues(event.eventType).Inc()
defaultMetricRecorder.RecordAsyncProcessingFailure(event.eventType)
p.logger.Error("Recovered from async event processor panic",
zap.Int("workerID", workerID),
zap.String("eventType", event.eventType),
Expand All @@ -134,8 +133,8 @@ func (p *asyncEventProcessor) runWorker(workerID int) {
}()

processor(event.ctx, event.body)
asyncProcessedEventsCounter.WithLabelValues(event.eventType).Inc()
asyncProcessingDurationHistogram.With(prometheus.Labels{"event_type": event.eventType}).Observe(time.Since(start).Seconds())
defaultMetricRecorder.RecordAsyncProcessedEvent(event.eventType)
defaultMetricRecorder.ObserveAsyncProcessingDuration(event.eventType, time.Since(start).Seconds())
}()
}
}
30 changes: 9 additions & 21 deletions src/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func updateTrackedRunMetrics(
details runMetricDetails,
store runStoreMethods,
entityName string,
metrics runMetricSet,
metricKind runMetricKind,
) {
var storeAdapter runTransitionStore
if stateStore != nil {
Expand All @@ -193,7 +193,7 @@ func updateTrackedRunMetrics(

processor := &runTransitionProcessor{
store: storeAdapter,
recorder: prometheusRunTransitionRecorder{metrics: metrics},
recorder: metricRunTransitionRecorder{kind: metricKind, recorder: defaultMetricRecorder},
logger: logger,
entityName: entityName,
}
Expand Down Expand Up @@ -243,14 +243,8 @@ func updateWorkflowMetrics(ctx context.Context, body []byte) {
endedAt: payload.Workflow.UpdatedAt,
},
workflowRunStoreMethods(),
"workflow_run",
runMetricSet{
statusCounter: workflowStatusCounter,
queuedGauge: workflowQueuedGauge,
inProgressGauge: workflowInProgressGauge,
completedGauge: workflowCompletedGauge,
durationHistogram: workflowDurationHistogram,
},
githubEventWorkflowRun,
runMetricKindWorkflow,
)
}

Expand All @@ -275,14 +269,8 @@ func updateJobMetrics(ctx context.Context, body []byte) {
endedAt: payload.Job.CompletedAt,
},
workflowJobStoreMethods(),
"workflow_job",
runMetricSet{
statusCounter: jobStatusCounter,
queuedGauge: jobQueuedGauge,
inProgressGauge: jobInProgressGauge,
completedGauge: jobCompletedGauge,
durationHistogram: jobDurationHistogram,
},
githubEventWorkflowJob,
runMetricKindJob,
)
}

Expand All @@ -295,7 +283,7 @@ func updateCommitMetrics(body []byte) {
}

for range payload.Commits {
commitPushedCounter.WithLabelValues(payload.Repository.FullName).Inc()
defaultMetricRecorder.RecordCommitPushed(payload.Repository.FullName)
}
}

Expand All @@ -307,9 +295,9 @@ func updatePullRequestMetrics(body []byte) {
return
}

pullRequestCounter.WithLabelValues(
defaultMetricRecorder.RecordPullRequest(
payload.Repository.FullName,
payload.PullRequest.Base.Ref,
payload.Action,
).Inc()
)
}
130 changes: 130 additions & 0 deletions src/metric_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package main

type runMetricKind string

const (
runMetricKindWorkflow runMetricKind = "workflow"
runMetricKindJob runMetricKind = "job"
)

var defaultMetricRecorder = prometheusMetricRecorder{}

type prometheusMetricRecorder struct{}

func (prometheusMetricRecorder) RecordDuplicateDelivery(eventType string) {
duplicateDeliveriesSeenCounter.WithLabelValues(eventType).Inc()
duplicateDeliveriesDroppedCounter.WithLabelValues(eventType).Inc()
}

func (prometheusMetricRecorder) RecordCommitPushed(repository string) {
commitPushedCounter.WithLabelValues(repository).Inc()
}

func (prometheusMetricRecorder) RecordPullRequest(repository, baseBranch, status string) {
pullRequestCounter.WithLabelValues(repository, baseBranch, status).Inc()
}

func (prometheusMetricRecorder) SetAsyncWorkerCount(workers int) {
asyncWorkerCountGauge.Set(float64(workers))
}

func (prometheusMetricRecorder) SetAsyncQueueCapacity(capacity int) {
asyncQueueCapacityGauge.Set(float64(capacity))
}

func (prometheusMetricRecorder) SetAsyncQueueDepth(depth int) {
asyncQueueDepthGauge.Set(float64(depth))
}

func (prometheusMetricRecorder) RecordAsyncQueueDropped(eventType string) {
asyncQueueDroppedCounter.WithLabelValues(eventType).Inc()
}

func (prometheusMetricRecorder) RecordAsyncUnsupportedEvent(eventType string) {
asyncUnsupportedEventsCounter.WithLabelValues(eventType).Inc()
}

func (prometheusMetricRecorder) RecordAsyncProcessingFailure(eventType string) {
asyncProcessingFailuresCounter.WithLabelValues(eventType).Inc()
}

func (prometheusMetricRecorder) RecordAsyncProcessedEvent(eventType string) {
asyncProcessedEventsCounter.WithLabelValues(eventType).Inc()
}

func (prometheusMetricRecorder) ObserveAsyncProcessingDuration(eventType string, durationSeconds float64) {
asyncProcessingDurationHistogram.WithLabelValues(eventType).Observe(durationSeconds)
}

func (prometheusMetricRecorder) RecordRunStatus(kind runMetricKind, state RunState) {
switch kind {
case runMetricKindWorkflow:
workflowStatusCounter.WithLabelValues(
state.Repository,
state.Branch,
state.Name,
state.Status,
state.Conclusion,
).Inc()
case runMetricKindJob:
jobStatusCounter.WithLabelValues(
state.Repository,
state.Branch,
state.Name,
state.Status,
state.Conclusion,
).Inc()
}
}

func (prometheusMetricRecorder) AddRunGauge(kind runMetricKind, state RunState, delta float64) {
switch kind {
case runMetricKindWorkflow:
addWorkflowRunGauge(state, delta)
case runMetricKindJob:
addWorkflowJobGauge(state, delta)
}
}

func (prometheusMetricRecorder) ObserveRunDuration(kind runMetricKind, state RunState, durationSeconds float64) {
switch kind {
case runMetricKindWorkflow:
workflowDurationHistogram.WithLabelValues(
state.Repository,
state.Branch,
state.Name,
state.Status,
state.Conclusion,
).Observe(durationSeconds)
case runMetricKindJob:
jobDurationHistogram.WithLabelValues(
state.Repository,
state.Branch,
state.Name,
state.Status,
state.Conclusion,
).Observe(durationSeconds)
}
}

func addWorkflowRunGauge(state RunState, delta float64) {
switch normalizeStatus(state.Status) {
case statusQueued:
workflowQueuedGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta)
case statusInProgress:
workflowInProgressGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta)
case statusCompleted:
workflowCompletedGauge.WithLabelValues(state.Repository, state.Branch, state.Conclusion, state.Name).Add(delta)
}
}

func addWorkflowJobGauge(state RunState, delta float64) {
switch normalizeStatus(state.Status) {
case statusQueued:
jobQueuedGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta)
case statusInProgress:
jobInProgressGauge.WithLabelValues(state.Repository, state.Branch, state.Name).Add(delta)
case statusCompleted:
jobCompletedGauge.WithLabelValues(state.Repository, state.Branch, state.Conclusion, state.Name).Add(delta)
}
}
Loading
Loading