diff --git a/proxy/proxyserver/prefetch.go b/proxy/proxyserver/prefetch.go index 9a5cde202..76e84ae8e 100644 --- a/proxy/proxyserver/prefetch.go +++ b/proxy/proxyserver/prefetch.go @@ -22,7 +22,11 @@ import ( "github.com/uber/kraken/origin/blobclient" "github.com/uber/kraken/utils/httputil" "github.com/uber/kraken/utils/log" - "go.uber.org/zap" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) // Constants for prefetch status. @@ -43,6 +47,7 @@ type PrefetchHandler struct { metrics tally.Scope getManifestLatency tally.Histogram getTagLatency tally.Histogram + tracer trace.Tracer } // blobInfo holds digest and size information for a blob. @@ -53,8 +58,7 @@ type blobInfo struct { // Request and response payloads. type prefetchBody struct { - Tag string `json:"tag"` - TraceId string `json:"trace_id"` + Tag string `json:"tag"` } type prefetchResponse struct { @@ -62,7 +66,6 @@ type prefetchResponse struct { Prefetched bool `json:"prefetched"` Status string `json:"status"` Message string `json:"message"` - TraceId string `json:"trace_id"` } type prefetchError struct { @@ -70,7 +73,6 @@ type prefetchError struct { Prefetched bool `json:"prefetched"` Status string `json:"status"` Message string `json:"message"` - TraceId string `json:"trace_id,omitempty"` } type TagParser interface { @@ -121,28 +123,27 @@ func NewPrefetchHandler( metrics: m, getManifestLatency: m.Histogram("download_manifest_latency", tally.MustMakeExponentialDurationBuckets(1*time.Second, 2, 12)), getTagLatency: m.Histogram("get_tag_latency", tally.MustMakeExponentialDurationBuckets(100*time.Millisecond, 2, 10)), + tracer: otel.Tracer("kraken-proxy-prefetch"), } } // newPrefetchSuccessResponse constructs a successful response. -func newPrefetchSuccessResponse(tag, msg, traceId string) *prefetchResponse { +func newPrefetchSuccessResponse(tag, msg string) *prefetchResponse { return &prefetchResponse{ Tag: tag, Prefetched: true, Status: StatusSuccess, Message: msg, - TraceId: traceId, } } // newPrefetchError constructs an error response. -func newPrefetchError(status int, msg, traceId string) *prefetchError { +func newPrefetchError(status int, msg string) *prefetchError { return &prefetchError{ Error: http.StatusText(status), Prefetched: false, Status: StatusFailure, Message: msg, - TraceId: traceId, } } @@ -164,16 +165,16 @@ func writeJSON(w http.ResponseWriter, status int, payload interface{}) { } } -func writeBadRequestError(w http.ResponseWriter, msg, traceId string) { - writeJSON(w, http.StatusBadRequest, newPrefetchError(http.StatusBadRequest, msg, traceId)) +func writeBadRequestError(w http.ResponseWriter, msg string) { + writeJSON(w, http.StatusBadRequest, newPrefetchError(http.StatusBadRequest, msg)) } -func writeInternalError(w http.ResponseWriter, msg, traceId string) { - writeJSON(w, http.StatusInternalServerError, newPrefetchError(http.StatusInternalServerError, msg, traceId)) +func writeInternalError(w http.ResponseWriter, msg string) { + writeJSON(w, http.StatusInternalServerError, newPrefetchError(http.StatusInternalServerError, msg)) } -func writePrefetchResponse(w http.ResponseWriter, tag, msg, traceId string) { - writeJSON(w, http.StatusOK, newPrefetchSuccessResponse(tag, msg, traceId)) +func writePrefetchResponse(w http.ResponseWriter, tag, msg string) { + writeJSON(w, http.StatusOK, newPrefetchSuccessResponse(tag, msg)) } // HandleV1 processes the prefetch request. @@ -184,91 +185,124 @@ func (ph *PrefetchHandler) HandleV1(w http.ResponseWriter, r *http.Request) { } ph.metrics.Counter("initiated").Inc(1) - writePrefetchResponse(w, input.tag, "prefetching initiated successfully", input.traceID) + writePrefetchResponse(w, input.tag, "prefetching initiated successfully") if ph.v1Synchronous { - ph.downloadBlobs(input) + ph.downloadBlobs(r.Context(), input) } else { // Download blobs asynchronously. - go ph.downloadBlobs(input) + go ph.downloadBlobs(r.Context(), input) } } type prefetchInput struct { blobs []blobInfo namespace string - logger *zap.SugaredLogger tag string - traceID string } // preparePrefetch parses the request, calls build-index to get the image manifest SHA, // downloads the manifest(s) from the origin cluster, parses them, and returns the blobs layers to prefetch. // If an error occurs, preparePrefetch returns the appropriate HTTP response. func (ph *PrefetchHandler) preparePrefetch(w http.ResponseWriter, r *http.Request) (res *prefetchInput, errOccurred bool) { + ctx, span := ph.tracer.Start(r.Context(), "prefetch.prepare", + trace.WithAttributes( + attribute.String("component", "proxy-prefetch"), + attribute.String("operation", "prepare_prefetch"), + ), + ) + defer span.End() + ph.metrics.Counter("requests").Inc(1) var reqBody prefetchBody if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil { - writeBadRequestError(w, fmt.Sprintf("failed to decode request body: %s", err), "") - log.With("error", err).Error("Failed to decode request body") + span.RecordError(err) + span.SetStatus(codes.Error, "failed to decode request body") + writeBadRequestError(w, fmt.Sprintf("failed to decode request body: %s", err)) + log.WithTraceContext(ctx).With("error", err).Error("Failed to decode request body") return nil, true } - logger := log. - With("trace_id", reqBody.TraceId). - With("image_tag", reqBody.Tag) + + span.SetAttributes(attribute.String("image.tag", reqBody.Tag)) namespace, tag, err := ph.tagParser.ParseTag(reqBody.Tag) if err != nil { - writeBadRequestError(w, fmt.Sprintf("tag: %s, invalid tag format: %s", reqBody.Tag, err), reqBody.TraceId) + span.RecordError(err) + span.SetStatus(codes.Error, "invalid tag format") + writeBadRequestError(w, fmt.Sprintf("tag: %s, invalid tag format: %s", reqBody.Tag, err)) return nil, true } + span.SetAttributes( + attribute.String("image.namespace", namespace), + attribute.String("image.name", tag), + ) + tagRequest := url.QueryEscape(fmt.Sprintf("%s/%s", namespace, tag)) startTime := time.Now() digest, err := ph.tagClient.Get(tagRequest) if err != nil { ph.metrics.Counter("get_tag_error").Inc(1) - logger.With("error", err).Error("Failed to get manifest tag") - writeInternalError(w, fmt.Sprintf("tag request: %s, failed to get tag: %s", tagRequest, err), reqBody.TraceId) + span.RecordError(err) + span.SetStatus(codes.Error, "failed to get manifest tag") + log.WithTraceContext(ctx).With("error", err).Error("Failed to get manifest tag") + writeInternalError(w, fmt.Sprintf("tag request: %s, failed to get tag: %s", tagRequest, err)) return nil, true } ph.getTagLatency.RecordDuration(time.Since(startTime)) - logger.Infof("Namespace: %s, Tag: %s", namespace, tag) + span.SetAttributes(attribute.String("manifest.digest", digest.Hex())) + log.WithTraceContext(ctx).Infof("Namespace: %s, Tag: %s", namespace, tag) buf := &bytes.Buffer{} startTime = time.Now() - if err := ph.clusterClient.DownloadBlob(context.Background(), namespace, digest, buf); err != nil { + if err := ph.clusterClient.DownloadBlob(ctx, namespace, digest, buf); err != nil { ph.metrics.Counter("download_manifest_error").Inc(1) - logger.With("error", err).Error("Failed to download manifest blob") - writeInternalError(w, fmt.Sprintf("error downloading manifest blob: %s", err), reqBody.TraceId) + span.RecordError(err) + span.SetStatus(codes.Error, "failed to download manifest") + log.WithTraceContext(ctx).With("error", err).Error("Failed to download manifest blob") + writeInternalError(w, fmt.Sprintf("error downloading manifest blob: %s", err)) return nil, true } ph.getManifestLatency.RecordDuration(time.Since(startTime)) // Process manifest (ManifestList or single Manifest) - blobs, err := ph.processManifest(logger, namespace, buf.Bytes()) + blobs, err := ph.processManifest(ctx, namespace, buf.Bytes()) if err != nil { - writeInternalError(w, fmt.Sprintf("failed to process manifest: %s", err), reqBody.TraceId) + span.RecordError(err) + span.SetStatus(codes.Error, "failed to process manifest") + writeInternalError(w, fmt.Sprintf("failed to process manifest: %s", err)) return nil, true } + span.SetAttributes(attribute.Int("blobs.count", len(blobs))) + span.SetStatus(codes.Ok, "prepare completed") + return &prefetchInput{ blobs: blobs, namespace: namespace, - logger: logger, tag: tag, - traceID: reqBody.TraceId, }, false } // downloadBlobs downloads blobs in parallel. -func (ph *PrefetchHandler) downloadBlobs(input *prefetchInput) { +func (ph *PrefetchHandler) downloadBlobs(ctx context.Context, input *prefetchInput) { + ctx, span := ph.tracer.Start(ctx, "prefetch.download_blobs", + trace.WithAttributes( + attribute.String("component", "proxy-prefetch"), + attribute.String("operation", "download_blobs"), + attribute.String("image.namespace", input.namespace), + attribute.String("image.tag", input.tag), + attribute.Int("blobs.total", len(input.blobs)), + ), + ) + defer span.End() + var wg sync.WaitGroup var mu sync.Mutex var errList []error for _, b := range input.blobs { - if ph.shouldSkipPrefetch(b, input.logger) { + if ph.shouldSkipPrefetch(ctx, b) { continue } @@ -276,7 +310,7 @@ func (ph *PrefetchHandler) downloadBlobs(input *prefetchInput) { go func(blob blobInfo) { defer wg.Done() blobStart := time.Now() - err := ph.clusterClient.DownloadBlob(context.Background(), input.namespace, blob.digest, io.Discard) + err := ph.clusterClient.DownloadBlob(ctx, input.namespace, blob.digest, io.Discard) blobDuration := time.Since(blobStart) ph.metrics.Timer("blob_download_time").Record(blobDuration) ph.metrics.Counter("bytes_downloaded").Inc(blob.size) @@ -297,16 +331,20 @@ func (ph *PrefetchHandler) downloadBlobs(input *prefetchInput) { if len(errList) > 0 { ph.metrics.Counter("failed").Inc(1) + span.RecordError(errors.Join(errList...)) + span.SetStatus(codes.Error, fmt.Sprintf("%d blob downloads failed", len(errList))) for _, err := range errList { - input.logger.With("error", err).Error("Error downloading blob") + log.WithTraceContext(ctx).With("error", err).Error("Error downloading blob") } + } else { + span.SetStatus(codes.Ok, "all blobs downloaded") } } // Skip blobs that are outside the size range [min, max] -func (ph *PrefetchHandler) shouldSkipPrefetch(b blobInfo, logger *zap.SugaredLogger) bool { +func (ph *PrefetchHandler) shouldSkipPrefetch(ctx context.Context, b blobInfo) bool { if b.size < ph.minBlobSizeBytes { - logger.With( + log.WithTraceContext(ctx).With( "digest", b.digest, "size", b.size, "min_threshold", ph.minBlobSizeBytes, @@ -315,7 +353,7 @@ func (ph *PrefetchHandler) shouldSkipPrefetch(b blobInfo, logger *zap.SugaredLog return true } if b.size > ph.maxBlobSizeBytes { - logger.With( + log.WithTraceContext(ctx).With( "digest", b.digest, "size", b.size, "max_threshold", ph.maxBlobSizeBytes, @@ -327,9 +365,9 @@ func (ph *PrefetchHandler) shouldSkipPrefetch(b blobInfo, logger *zap.SugaredLog } // processManifest handles both ManifestLists and single Manifests. -func (ph *PrefetchHandler) processManifest(logger *zap.SugaredLogger, namespace string, manifestBytes []byte) ([]blobInfo, error) { +func (ph *PrefetchHandler) processManifest(ctx context.Context, namespace string, manifestBytes []byte) ([]blobInfo, error) { // Attempt to process as a manifest list. - blobs, err := ph.tryProcessManifestList(logger, namespace, manifestBytes) + blobs, err := ph.tryProcessManifestList(ctx, namespace, manifestBytes) if err == nil && len(blobs) > 0 { return blobs, nil } @@ -337,24 +375,24 @@ func (ph *PrefetchHandler) processManifest(logger *zap.SugaredLogger, namespace // Fallback to single manifest. var manifest schema2.Manifest if err := json.NewDecoder(bytes.NewReader(manifestBytes)).Decode(&manifest); err != nil { - logger.With("namespace", namespace).Errorf("Failed to parse single manifest: %v", err) + log.WithTraceContext(ctx).With("namespace", namespace).Errorf("Failed to parse single manifest: %v", err) return nil, fmt.Errorf("invalid single manifest: %w", err) } return ph.processLayers(manifest.Layers) } // tryProcessManifestList attempts to decode a manifest list. -func (ph *PrefetchHandler) tryProcessManifestList(logger *zap.SugaredLogger, namespace string, manifestBytes []byte) ([]blobInfo, error) { +func (ph *PrefetchHandler) tryProcessManifestList(ctx context.Context, namespace string, manifestBytes []byte) ([]blobInfo, error) { var manifestList manifestlist.ManifestList if err := json.NewDecoder(bytes.NewReader(manifestBytes)).Decode(&manifestList); err != nil || len(manifestList.Manifests) == 0 { return nil, fmt.Errorf("not a valid manifest list") } - logger.With("namespace", namespace).Info("Processing manifest list") - return ph.processManifestList(logger, namespace, manifestList) + log.WithTraceContext(ctx).With("namespace", namespace).Info("Processing manifest list") + return ph.processManifestList(ctx, namespace, manifestList) } // processManifestList processes a manifest list. -func (ph *PrefetchHandler) processManifestList(logger *zap.SugaredLogger, namespace string, manifestList manifestlist.ManifestList) ([]blobInfo, error) { +func (ph *PrefetchHandler) processManifestList(ctx context.Context, namespace string, manifestList manifestlist.ManifestList) ([]blobInfo, error) { var allBlobs []blobInfo for _, descriptor := range manifestList.Manifests { manifestDigestHex := descriptor.Digest.Hex() @@ -364,9 +402,9 @@ func (ph *PrefetchHandler) processManifestList(logger *zap.SugaredLogger, namesp } buf := &bytes.Buffer{} startTime := time.Now() - if err := ph.clusterClient.DownloadBlob(context.Background(), namespace, digest, buf); err != nil { + if err := ph.clusterClient.DownloadBlob(ctx, namespace, digest, buf); err != nil { ph.metrics.Counter("download_manifest_error").Inc(1) - logger.With("error", err).Error("Failed to download manifest blob") + log.WithTraceContext(ctx).With("error", err).Error("Failed to download manifest blob") continue } ph.getManifestLatency.RecordDuration(time.Since(startTime)) @@ -411,25 +449,36 @@ func (ph *PrefetchHandler) HandleV2(w http.ResponseWriter, r *http.Request) { return } - err := ph.triggerPrefetchBlobs(input) + err := ph.triggerPrefetchBlobs(r.Context(), input) if err != nil { - writeInternalError(w, fmt.Sprintf("failed to trigger image prefetch: %s", err), input.traceID) - input.logger.Errorf("Failed to trigger image prefetch") + writeInternalError(w, fmt.Sprintf("failed to trigger image prefetch: %s", err)) + log.WithTraceContext(r.Context()).Errorf("Failed to trigger image prefetch") return } ph.metrics.Counter("initiated").Inc(1) - writePrefetchResponse(w, input.tag, "prefetching initiated successfully", input.traceID) + writePrefetchResponse(w, input.tag, "prefetching initiated successfully") } // triggerPrefetchBlobs triggers a blob prefetch for all blobs in parallel. -func (ph *PrefetchHandler) triggerPrefetchBlobs(input *prefetchInput) error { +func (ph *PrefetchHandler) triggerPrefetchBlobs(ctx context.Context, input *prefetchInput) error { + ctx, span := ph.tracer.Start(ctx, "prefetch.trigger_prefetch", + trace.WithAttributes( + attribute.String("component", "proxy-prefetch"), + attribute.String("operation", "trigger_prefetch"), + attribute.String("image.namespace", input.namespace), + attribute.String("image.tag", input.tag), + attribute.Int("blobs.total", len(input.blobs)), + ), + ) + defer span.End() + var wg sync.WaitGroup var mu sync.Mutex var errList []error for _, b := range input.blobs { - if ph.shouldSkipPrefetch(b, input.logger) { + if ph.shouldSkipPrefetch(ctx, b) { continue } @@ -447,7 +496,12 @@ func (ph *PrefetchHandler) triggerPrefetchBlobs(input *prefetchInput) error { wg.Wait() if len(errList) != 0 { - return fmt.Errorf("at least one layer could not be prefetched: %w", errors.Join(errList...)) + err := fmt.Errorf("at least one layer could not be prefetched: %w", errors.Join(errList...)) + span.RecordError(err) + span.SetStatus(codes.Error, fmt.Sprintf("%d prefetch requests failed", len(errList))) + return err } + + span.SetStatus(codes.Ok, "all prefetch requests triggered") return nil } diff --git a/proxy/proxyserver/server.go b/proxy/proxyserver/server.go index 0e53e0790..5d9062ec9 100644 --- a/proxy/proxyserver/server.go +++ b/proxy/proxyserver/server.go @@ -27,6 +27,9 @@ import ( "github.com/uber/kraken/lib/middleware" "github.com/uber/kraken/origin/blobclient" "github.com/uber/kraken/utils/handler" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" ) // Server defines the proxy HTTP server. @@ -62,9 +65,13 @@ func (s *Server) Handler() http.Handler { r.Get("/health", handler.Wrap(s.healthHandler)) + // Add tracing middleware for prefetch/preheat endpoints + tracingMiddleware := otelhttp.NewMiddleware("kraken-proxy", + otelhttp.WithTracerProvider(otel.GetTracerProvider())) + r.Post("/registry/notifications", handler.Wrap(s.preheatHandler.Handle)) - r.Post("/proxy/v1/registry/prefetch", s.prefetchHandler.HandleV1) - r.Post("/proxy/v2/registry/prefetch", s.prefetchHandler.HandleV2) + r.With(tracingMiddleware).Post("/proxy/v1/registry/prefetch", s.prefetchHandler.HandleV1) + r.With(tracingMiddleware).Post("/proxy/v2/registry/prefetch", s.prefetchHandler.HandleV2) // Serves /debug/pprof endpoints. r.Mount("/", http.DefaultServeMux) diff --git a/proxy/proxyserver/server_test.go b/proxy/proxyserver/server_test.go index a5160ad3d..92e57b45d 100644 --- a/proxy/proxyserver/server_test.go +++ b/proxy/proxyserver/server_test.go @@ -171,8 +171,7 @@ func TestPrefetchV1MalformedTag(t *testing.T) { addr := mocks.startServer() b, err := json.Marshal(prefetchBody{ - TraceId: "abc", - Tag: "invalid", + Tag: "invalid", }) require.NoError(err) @@ -200,8 +199,7 @@ func TestPrefetchV1(t *testing.T) { manifest, bs := dockerutil.ManifestFixture(layers[0], layers[1], layers[2]) b, err := json.Marshal(prefetchBody{ - Tag: fmt.Sprintf("%s/%s/%s", repo, namespace, tag), - TraceId: "abc", + Tag: fmt.Sprintf("%s/%s/%s", repo, namespace, tag), }) require.NoError(err) @@ -232,8 +230,7 @@ func TestPrefetchV2(t *testing.T) { manifest, bs := dockerutil.ManifestFixture(layers[0], layers[1], layers[2]) b, err := json.Marshal(prefetchBody{ - Tag: fmt.Sprintf("%s/%s/%s", repo, namespace, tag), - TraceId: "abc", + Tag: fmt.Sprintf("%s/%s/%s", repo, namespace, tag), }) require.NoError(err) @@ -256,7 +253,6 @@ func TestPrefetchV2(t *testing.T) { require.Equal(prefetchResponse{ Message: "prefetching initiated successfully", - TraceId: "abc", Status: "success", Tag: "abcdef:v1.0.0", Prefetched: true, @@ -279,8 +275,7 @@ func TestPrefetchV2OriginError(t *testing.T) { manifest, bs := dockerutil.ManifestFixture(layers[0], layers[1], layers[2]) b, err := json.Marshal(prefetchBody{ - Tag: fmt.Sprintf("%s/%s/%s", repo, namespace, tag), - TraceId: "abc", + Tag: fmt.Sprintf("%s/%s/%s", repo, namespace, tag), }) require.NoError(err) @@ -302,7 +297,6 @@ func TestPrefetchV2OriginError(t *testing.T) { require.NoError(err) require.Equal(prefetchResponse{ Message: fmt.Sprintf("failed to trigger image prefetch: at least one layer could not be prefetched: digest %q, namespace %q, blob prefetch failure: foo err", layers[1], namespace), - TraceId: "abc", Status: "failure", Prefetched: false, }, resBody)