diff --git a/lib/torrent/scheduler/events_test.go b/lib/torrent/scheduler/events_test.go index 6753492ef..3604f1baa 100644 --- a/lib/torrent/scheduler/events_test.go +++ b/lib/torrent/scheduler/events_test.go @@ -120,7 +120,7 @@ func (m *stateMocks) newTorrent() storage.Torrent { mi := core.MetaInfoFixture() m.metainfoClient.EXPECT(). - Download(_testNamespace, mi.Digest()). + Download(gomock.Any(), _testNamespace, mi.Digest()). Return(mi, nil) t, err := m.torrentArchive.CreateTorrent(_testNamespace, mi.Digest()) diff --git a/lib/torrent/scheduler/scheduler_test.go b/lib/torrent/scheduler/scheduler_test.go index 4e5d01b2d..a8adaecf8 100644 --- a/lib/torrent/scheduler/scheduler_test.go +++ b/lib/torrent/scheduler/scheduler_test.go @@ -19,6 +19,8 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" + "github.com/uber/kraken/core" "github.com/uber/kraken/lib/hashring" "github.com/uber/kraken/lib/hostlist" @@ -47,7 +49,7 @@ func TestDownloadTorrentWithSeederAndLeecher(t *testing.T) { namespace := core.TagFixture() mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) seeder.writeTorrent(namespace, blob) require.NoError(seeder.scheduler.Download(namespace, blob.Digest)) @@ -73,7 +75,7 @@ func TestDownloadManyTorrentsWithSeederAndLeecher(t *testing.T) { blob := core.NewBlobFixture() mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) wg.Add(1) go func() { @@ -108,7 +110,7 @@ func TestDownloadManyTorrentsWithSeederAndManyLeechers(t *testing.T) { blobs[i] = blob mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(6) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(6) seeder.writeTorrent(namespace, blob) require.NoError(seeder.scheduler.Download(namespace, blob.Digest)) @@ -143,7 +145,7 @@ func TestDownloadTorrentWhenPeersAllHaveDifferentPiece(t *testing.T) { blob := core.SizedBlobFixture(uint64(len(peers)*pieceLength), uint64(pieceLength)) mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(len(peers)) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(len(peers)) var wg sync.WaitGroup for i, p := range peers { @@ -178,7 +180,7 @@ func TestSeederTTI(t *testing.T) { namespace := core.TagFixture() mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) clk := clock.NewMock() w := newEventWatcher() @@ -230,7 +232,7 @@ func TestLeecherTTI(t *testing.T) { blob := core.NewBlobFixture() namespace := core.TagFixture() - mocks.metaInfoClient.EXPECT().Download(namespace, blob.Digest).Return(blob.MetaInfo, nil) + mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil) p := mocks.newPeer(config, withEventLoop(w), withClock(clk)) errc := make(chan error) @@ -260,7 +262,7 @@ func TestMultipleDownloadsForSameTorrentSucceed(t *testing.T) { // Allow any number of downloads due to concurrency below. mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).AnyTimes() + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).AnyTimes() config := configFixture() @@ -319,7 +321,7 @@ func TestNetworkEvents(t *testing.T) { namespace := core.TagFixture() mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) seeder.writeTorrent(namespace, blob) require.NoError(seeder.scheduler.Download(namespace, blob.Digest)) @@ -373,7 +375,7 @@ func TestPullInactiveTorrent(t *testing.T) { namespace := core.TagFixture() mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) seeder := mocks.newPeer(config) @@ -407,7 +409,7 @@ func TestSchedulerReload(t *testing.T) { blob := core.NewBlobFixture() mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2) seeder.writeTorrent(namespace, blob) require.NoError(seeder.scheduler.Download(namespace, blob.Digest)) @@ -440,7 +442,7 @@ func TestSchedulerRemoveTorrent(t *testing.T) { namespace := core.TagFixture() mocks.metaInfoClient.EXPECT().Download( - namespace, blob.Digest).Return(blob.MetaInfo, nil) + gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil) errc := make(chan error) go func() { errc <- p.scheduler.Download(namespace, blob.Digest) }() diff --git a/lib/torrent/storage/agentstorage/torrent_archive.go b/lib/torrent/storage/agentstorage/torrent_archive.go index 1591fe10a..af17cd40f 100644 --- a/lib/torrent/storage/agentstorage/torrent_archive.go +++ b/lib/torrent/storage/agentstorage/torrent_archive.go @@ -14,6 +14,7 @@ package agentstorage import ( + "context" "fmt" "os" @@ -25,6 +26,11 @@ import ( "github.com/uber/kraken/lib/store/metadata" "github.com/uber/kraken/lib/torrent/storage" "github.com/uber/kraken/tracker/metainfoclient" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) // TorrentArchive is capable of initializing torrents in the download directory @@ -33,19 +39,25 @@ type TorrentArchive struct { stats tally.Scope cads *store.CADownloadStore metaInfoClient metainfoclient.Client + tracer trace.Tracer } // NewTorrentArchive creates a new TorrentArchive. func NewTorrentArchive( stats tally.Scope, cads *store.CADownloadStore, - mic metainfoclient.Client) *TorrentArchive { - + mic metainfoclient.Client, +) *TorrentArchive { stats = stats.Tagged(map[string]string{ "module": "agenttorrentarchive", }) - return &TorrentArchive{stats, cads, mic} + return &TorrentArchive{ + stats: stats, + cads: cads, + metaInfoClient: mic, + tracer: otel.Tracer("kraken-agent-storage"), + } } // Stat returns TorrentInfo for the given digest. Returns os.ErrNotExist if the @@ -74,15 +86,31 @@ func (a *TorrentArchive) Stat(namespace string, d core.Digest) (*storage.Torrent func (a *TorrentArchive) CreateTorrent(namespace string, d core.Digest) (storage.Torrent, error) { var tm metadata.TorrentMeta if err := a.cads.Any().GetMetadata(d.Hex(), &tm); os.IsNotExist(err) { + ctx, span := a.tracer.Start(context.Background(), "agent.download_metainfo", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("component", "agent-storage"), + attribute.String("operation", "download_metainfo"), + attribute.String("namespace", namespace), + attribute.String("blob.digest", d.Hex()), + ), + ) + defer span.End() + downloadTimer := a.stats.Timer("metainfo_download").Start() - mi, err := a.metaInfoClient.Download(namespace, d) + mi, err := a.metaInfoClient.Download(ctx, namespace, d) if err != nil { if err == metainfoclient.ErrNotFound { + span.RecordError(err) + span.SetStatus(codes.Error, "metainfo not found") return nil, storage.ErrNotFound } + span.RecordError(err) + span.SetStatus(codes.Error, "download metainfo failed") return nil, fmt.Errorf("download metainfo: %s", err) } downloadTimer.Stop() + span.SetStatus(codes.Ok, "metainfo downloaded") // There's a race condition here, but it's "okay"... Basically, we could // initialize a download file with metainfo that is rejected by file store, diff --git a/lib/torrent/storage/agentstorage/torrent_archive_test.go b/lib/torrent/storage/agentstorage/torrent_archive_test.go index 454153d5b..4189960b0 100644 --- a/lib/torrent/storage/agentstorage/torrent_archive_test.go +++ b/lib/torrent/storage/agentstorage/torrent_archive_test.go @@ -68,7 +68,7 @@ func TestTorrentArchiveStatBitfield(t *testing.T) { blob := core.SizedBlobFixture(4, 1) mi := blob.MetaInfo - mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil).Times(1) + mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil).Times(1) tor, err := archive.CreateTorrent(namespace, mi.Digest()) require.NoError(err) @@ -107,7 +107,7 @@ func TestTorrentArchiveCreateTorrent(t *testing.T) { mi := core.MetaInfoFixture() namespace := core.TagFixture() - mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil) + mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil) tor, err := archive.CreateTorrent(namespace, mi.Digest()) require.NoError(err) @@ -135,7 +135,7 @@ func TestTorrentArchiveCreateTorrentNotFound(t *testing.T) { mi := core.MetaInfoFixture() namespace := core.TagFixture() - mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(nil, metainfoclient.ErrNotFound) + mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(nil, metainfoclient.ErrNotFound) _, err := archive.CreateTorrent(namespace, mi.Digest()) require.Equal(storage.ErrNotFound, err) @@ -152,7 +152,7 @@ func TestTorrentArchiveDeleteTorrent(t *testing.T) { mi := core.MetaInfoFixture() namespace := core.TagFixture() - mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil) + mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil) tor, err := archive.CreateTorrent(namespace, mi.Digest()) require.NoError(err) @@ -176,7 +176,7 @@ func TestTorrentArchiveConcurrentGet(t *testing.T) { namespace := core.TagFixture() // Allow any times for concurrency below. - mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil).AnyTimes() + mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil).AnyTimes() var wg sync.WaitGroup for i := 0; i < 50; i++ { @@ -206,7 +206,7 @@ func TestTorrentArchiveGetTorrent(t *testing.T) { _, err := archive.GetTorrent(namespace, mi.Digest()) require.Error(err) - mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil) + mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil) _, err = archive.CreateTorrent(namespace, mi.Digest()) require.NoError(err) diff --git a/mocks/origin/blobclient/client.go b/mocks/origin/blobclient/client.go index bc72b2c71..e9bab2d70 100644 --- a/mocks/origin/blobclient/client.go +++ b/mocks/origin/blobclient/client.go @@ -128,18 +128,18 @@ func (mr *MockClientMockRecorder) ForceCleanup(ttl any) *gomock.Call { } // GetMetaInfo mocks base method. -func (m *MockClient) GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) { +func (m *MockClient) GetMetaInfo(arg0 context.Context, arg1 string, arg2 core.Digest) (*core.MetaInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetMetaInfo", namespace, d) + ret := m.ctrl.Call(m, "GetMetaInfo", arg0, arg1, arg2) ret0, _ := ret[0].(*core.MetaInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // GetMetaInfo indicates an expected call of GetMetaInfo. -func (mr *MockClientMockRecorder) GetMetaInfo(namespace, d any) *gomock.Call { +func (mr *MockClientMockRecorder) GetMetaInfo(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetaInfo", reflect.TypeOf((*MockClient)(nil).GetMetaInfo), namespace, d) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetaInfo", reflect.TypeOf((*MockClient)(nil).GetMetaInfo), arg0, arg1, arg2) } // GetPeerContext mocks base method. diff --git a/mocks/origin/blobclient/clusterclient.go b/mocks/origin/blobclient/clusterclient.go index f44ecd38c..bc8ad0581 100644 --- a/mocks/origin/blobclient/clusterclient.go +++ b/mocks/origin/blobclient/clusterclient.go @@ -71,18 +71,18 @@ func (mr *MockClusterClientMockRecorder) DownloadBlob(arg0, arg1, arg2, arg3 any } // GetMetaInfo mocks base method. -func (m *MockClusterClient) GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) { +func (m *MockClusterClient) GetMetaInfo(arg0 context.Context, arg1 string, arg2 core.Digest) (*core.MetaInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetMetaInfo", namespace, d) + ret := m.ctrl.Call(m, "GetMetaInfo", arg0, arg1, arg2) ret0, _ := ret[0].(*core.MetaInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // GetMetaInfo indicates an expected call of GetMetaInfo. -func (mr *MockClusterClientMockRecorder) GetMetaInfo(namespace, d any) *gomock.Call { +func (mr *MockClusterClientMockRecorder) GetMetaInfo(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetaInfo", reflect.TypeOf((*MockClusterClient)(nil).GetMetaInfo), namespace, d) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetaInfo", reflect.TypeOf((*MockClusterClient)(nil).GetMetaInfo), arg0, arg1, arg2) } // OverwriteMetaInfo mocks base method. diff --git a/mocks/tracker/metainfoclient/client.go b/mocks/tracker/metainfoclient/client.go index 88be3eb6f..1c46ee7c4 100644 --- a/mocks/tracker/metainfoclient/client.go +++ b/mocks/tracker/metainfoclient/client.go @@ -5,6 +5,7 @@ package mockmetainfoclient import ( + "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -35,16 +36,16 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder { } // Download mocks base method -func (m *MockClient) Download(arg0 string, arg1 core.Digest) (*core.MetaInfo, error) { +func (m *MockClient) Download(ctx context.Context, arg0 string, arg1 core.Digest) (*core.MetaInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Download", arg0, arg1) + ret := m.ctrl.Call(m, "Download", ctx, arg0, arg1) ret0, _ := ret[0].(*core.MetaInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // Download indicates an expected call of Download -func (mr *MockClientMockRecorder) Download(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) Download(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockClient)(nil).Download), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockClient)(nil).Download), arg0, arg1, arg2) } diff --git a/nginx/config/tracker.go b/nginx/config/tracker.go index 5bd2eede6..bd8c75507 100644 --- a/nginx/config/tracker.go +++ b/nginx/config/tracker.go @@ -29,6 +29,10 @@ server { access_log {{.access_log_path}}; error_log {{.error_log_path}}; + proxy_set_header traceparent $http_traceparent; + proxy_set_header tracestate $http_tracestate; + proxy_set_header jaeger-debug-id $http_jaeger_debug_id; + {{healthEndpoint "tracker"}} location / { diff --git a/origin/blobclient/client.go b/origin/blobclient/client.go index 7e05cc024..117268573 100644 --- a/origin/blobclient/client.go +++ b/origin/blobclient/client.go @@ -53,7 +53,7 @@ type Client interface { Stat(namespace string, d core.Digest) (*core.BlobInfo, error) StatLocal(namespace string, d core.Digest) (*core.BlobInfo, error) - GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) + GetMetaInfo(ctx context.Context, namespace string, d core.Digest) (*core.MetaInfo, error) OverwriteMetaInfo(d core.Digest, pieceLength int64) error UploadBlob(ctx context.Context, namespace string, d core.Digest, blob io.Reader) error @@ -312,24 +312,43 @@ func (c *HTTPClient) ReplicateToRemote(namespace string, d core.Digest, remoteDN // (i.e. still downloading), returns a 202 httputil.StatusError, indicating that // the request should be retried later. If no blob exists for d, returns a 404 // httputil.StatusError. -func (c *HTTPClient) GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) { +func (c *HTTPClient) GetMetaInfo(ctx context.Context, namespace string, d core.Digest) (*core.MetaInfo, error) { + ctx, span := c.tracer.Start(ctx, "blobclient.get_metainfo", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("component", "origin-client"), + attribute.String("operation", "get_metainfo"), + attribute.String("namespace", namespace), + attribute.String("blob.digest", d.Hex()), + ), + ) + defer span.End() + r, err := httputil.Get( fmt.Sprintf("http://%s/internal/namespace/%s/blobs/%s/metainfo", c.addr, url.PathEscape(namespace), d), httputil.SendTimeout(15*time.Second), - httputil.SendTLS(c.tls)) + httputil.SendTLS(c.tls), + httputil.SendTracingContext(ctx)) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "get metainfo failed") return nil, err } defer closers.Close(r.Body) raw, err := io.ReadAll(r.Body) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "read body failed") return nil, fmt.Errorf("read body: %s", err) } mi, err := core.DeserializeMetaInfo(raw) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "deserialize metainfo failed") return nil, fmt.Errorf("deserialize metainfo: %s", err) } + span.SetStatus(codes.Ok, "metainfo retrieved") return mi, nil } diff --git a/origin/blobclient/cluster_client.go b/origin/blobclient/cluster_client.go index 50230ef8c..939017f43 100644 --- a/origin/blobclient/cluster_client.go +++ b/origin/blobclient/cluster_client.go @@ -91,7 +91,7 @@ type ClusterClient interface { UploadBlob(ctx context.Context, namespace string, d core.Digest, blob io.ReadSeeker) error DownloadBlob(ctx context.Context, namespace string, d core.Digest, dst io.Writer) error PrefetchBlob(ctx context.Context, namespace string, d core.Digest) error - GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) + GetMetaInfo(ctx context.Context, namespace string, d core.Digest) (*core.MetaInfo, error) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) OverwriteMetaInfo(d core.Digest, pieceLength int64) error Owners(d core.Digest) ([]core.PeerContext, error) @@ -192,19 +192,40 @@ func (c *clusterClient) UploadBlob(ctx context.Context, namespace string, d core } // GetMetaInfo returns the metainfo for d. Does not handle polling. -func (c *clusterClient) GetMetaInfo(namespace string, d core.Digest) (mi *core.MetaInfo, err error) { +func (c *clusterClient) GetMetaInfo(ctx context.Context, namespace string, d core.Digest) (mi *core.MetaInfo, err error) { + ctx, span := otel.Tracer("kraken-origin-cluster").Start(ctx, "cluster.get_metainfo", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("component", "origin-cluster-client"), + attribute.String("namespace", namespace), + attribute.String("blob.digest", d.Hex()), + ), + ) + defer span.End() + + logger := log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex()) + logger.Debug("Getting metainfo from origin cluster") + clients, err := c.resolver.Resolve(d) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "resolve clients failed") return nil, fmt.Errorf("resolve clients: %s", err) } for _, client := range clients { - mi, err = client.GetMetaInfo(namespace, d) + mi, err = client.GetMetaInfo(ctx, namespace, d) // Do not try the next replica on 202 errors. if err != nil && !httputil.IsAccepted(err) { continue } break } + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "get metainfo failed") + } else { + span.SetStatus(codes.Ok, "metainfo retrieved") + } return mi, err } diff --git a/origin/blobserver/cluster_client_test.go b/origin/blobserver/cluster_client_test.go index c4e427a40..d2e18eb9d 100644 --- a/origin/blobserver/cluster_client_test.go +++ b/origin/blobserver/cluster_client_test.go @@ -64,7 +64,7 @@ func TestClusterClientResilientToUnavailableMasters(t *testing.T) { require.NotNil(bi) require.Equal(int64(256), bi.Size) - mi, err := cc.GetMetaInfo(backend.NoopNamespace, blob.Digest) + mi, err := cc.GetMetaInfo(context.Background(), backend.NoopNamespace, blob.Digest) require.NoError(err) require.NotNil(mi) @@ -97,7 +97,7 @@ func TestClusterClientReturnsErrorOnNoAvailability(t *testing.T) { _, err := cc.Stat(backend.NoopNamespace, blob.Digest) require.Error(err) - _, err = cc.GetMetaInfo(backend.NoopNamespace, blob.Digest) + _, err = cc.GetMetaInfo(context.Background(), backend.NoopNamespace, blob.Digest) require.Error(err) require.Error(cc.DownloadBlob(context.Background(), backend.NoopNamespace, blob.Digest, io.Discard)) @@ -205,10 +205,10 @@ func TestClusterClientReturnsErrorOnNoAvailableOrigins(t *testing.T) { mockClient2 := mockblobclient.NewMockClient(ctrl) mockResolver.EXPECT().Resolve(blob.Digest).Return([]blobclient.Client{mockClient1, mockClient2}, nil) - mockClient1.EXPECT().GetMetaInfo(namespace, blob.Digest).Return(nil, httputil.NetworkError{}) - mockClient2.EXPECT().GetMetaInfo(namespace, blob.Digest).Return(nil, httputil.NetworkError{}) + mockClient1.EXPECT().GetMetaInfo(gomock.Any(), namespace, blob.Digest).Return(nil, httputil.NetworkError{}) + mockClient2.EXPECT().GetMetaInfo(gomock.Any(), namespace, blob.Digest).Return(nil, httputil.NetworkError{}) - _, err := cc.GetMetaInfo(namespace, blob.Digest) + _, err := cc.GetMetaInfo(context.Background(), namespace, blob.Digest) require.Error(err) } diff --git a/origin/blobserver/server_test.go b/origin/blobserver/server_test.go index af937ef04..566095552 100644 --- a/origin/blobserver/server_test.go +++ b/origin/blobserver/server_test.go @@ -351,16 +351,16 @@ func TestGetMetaInfoDownloadsBlobAndReplicates(t *testing.T) { blob.Digest.Hex()).Return(core.NewBlobInfo(int64(len(blob.Content))), nil).AnyTimes() backendClient.EXPECT().Download(namespace, blob.Digest.Hex(), mockutil.MatchWriter(blob.Content)).Return(nil) - mi, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest) + mi, err := cp.Provide(master1).GetMetaInfo(context.Background(), namespace, blob.Digest) require.True(httputil.IsAccepted(err)) require.Nil(mi) require.NoError(testutil.PollUntilTrue(5*time.Second, func() bool { - _, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest) + _, err := cp.Provide(master1).GetMetaInfo(context.Background(), namespace, blob.Digest) return !httputil.IsAccepted(err) })) - mi, err = cp.Provide(master1).GetMetaInfo(namespace, blob.Digest) + mi, err = cp.Provide(master1).GetMetaInfo(context.Background(), namespace, blob.Digest) require.NoError(err) require.NotNil(mi) require.Equal(len(blob.Content), int(mi.Length())) @@ -386,7 +386,7 @@ func TestGetMetaInfoBlobNotFound(t *testing.T) { backendClient := s.backendClient(namespace, false) backendClient.EXPECT().Stat(namespace, d.Hex()).Return(nil, backenderrors.ErrBlobNotFound) - mi, err := cp.Provide(master1).GetMetaInfo(namespace, d) + mi, err := cp.Provide(master1).GetMetaInfo(context.Background(), namespace, d) require.True(httputil.IsNotFound(err)) require.Nil(mi) } @@ -569,14 +569,14 @@ func TestOverwriteMetainfo(t *testing.T) { err := cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content)) require.NoError(err) - mi, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest) + mi, err := cp.Provide(master1).GetMetaInfo(context.Background(), namespace, blob.Digest) require.NoError(err) require.Equal(int64(4), mi.PieceLength()) err = cp.Provide(master1).OverwriteMetaInfo(blob.Digest, 16) require.NoError(err) - mi, err = cp.Provide(master1).GetMetaInfo(namespace, blob.Digest) + mi, err = cp.Provide(master1).GetMetaInfo(context.Background(), namespace, blob.Digest) require.NoError(err) require.Equal(int64(16), mi.PieceLength()) } diff --git a/proxy/proxyserver/preheat.go b/proxy/proxyserver/preheat.go index fa22672c4..1930f06a3 100644 --- a/proxy/proxyserver/preheat.go +++ b/proxy/proxyserver/preheat.go @@ -79,7 +79,7 @@ func (ph *PreheatHandler) process(repo, digest string) error { } f := func() { log.With("repo", repo).Debugf("trigger origin cache: %+v", d) - _, err = ph.clusterClient.GetMetaInfo(repo, d) + _, err = ph.clusterClient.GetMetaInfo(context.Background(), repo, d) if err != nil && !httputil.IsAccepted(err) { log.With("repo", repo, "digest", digest).Errorf("notify origin cache: %s", err) } diff --git a/proxy/proxyserver/server_test.go b/proxy/proxyserver/server_test.go index 8c1cae131..8932f84f6 100644 --- a/proxy/proxyserver/server_test.go +++ b/proxy/proxyserver/server_test.go @@ -139,9 +139,9 @@ func TestPreheat(t *testing.T) { b, _ := json.Marshal(notification) mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), repo, manifest, mockutil.MatchWriter(bs)).Return(nil) - mocks.originClient.EXPECT().GetMetaInfo(repo, layers[0]).Return(nil, nil) - mocks.originClient.EXPECT().GetMetaInfo(repo, layers[1]).Return(nil, nil) - mocks.originClient.EXPECT().GetMetaInfo(repo, layers[2]).Return(nil, nil) + mocks.originClient.EXPECT().GetMetaInfo(gomock.Any(), repo, layers[0]).Return(nil, nil) + mocks.originClient.EXPECT().GetMetaInfo(gomock.Any(), repo, layers[1]).Return(nil, nil) + mocks.originClient.EXPECT().GetMetaInfo(gomock.Any(), repo, layers[2]).Return(nil, nil) _, err := httputil.Post( fmt.Sprintf("http://%s/registry/notifications", addr), httputil.SendBody(bytes.NewReader(b))) diff --git a/tracker/metainfoclient/client.go b/tracker/metainfoclient/client.go index 7cced3e43..e630d480c 100644 --- a/tracker/metainfoclient/client.go +++ b/tracker/metainfoclient/client.go @@ -14,6 +14,7 @@ package metainfoclient import ( + "context" "crypto/tls" "errors" "fmt" @@ -36,7 +37,7 @@ var ( // Client defines operations on torrent metainfo. type Client interface { - Download(namespace string, d core.Digest) (*core.MetaInfo, error) + Download(ctx context.Context, namespace string, d core.Digest) (*core.MetaInfo, error) } type client struct { @@ -51,7 +52,7 @@ func New(ring hashring.PassiveRing, tls *tls.Config) Client { // Download returns the MetaInfo associated with name. Returns ErrNotFound if // no torrent exists under name. -func (c *client) Download(namespace string, d core.Digest) (*core.MetaInfo, error) { +func (c *client) Download(ctx context.Context, namespace string, d core.Digest) (*core.MetaInfo, error) { var resp *http.Response var err error for _, addr := range c.ring.Locations(d) { @@ -68,7 +69,8 @@ func (c *client) Download(namespace string, d core.Digest) (*core.MetaInfo, erro Clock: backoff.SystemClock, }, httputil.SendTimeout(10*time.Second), - httputil.SendTLS(c.tls)) + httputil.SendTLS(c.tls), + httputil.SendTracingContext(ctx)) if err != nil { if httputil.IsNetworkError(err) { c.ring.Failed(addr) diff --git a/tracker/metainfoclient/testing.go b/tracker/metainfoclient/testing.go index 3c4d86c22..cb9d962e4 100644 --- a/tracker/metainfoclient/testing.go +++ b/tracker/metainfoclient/testing.go @@ -14,6 +14,7 @@ package metainfoclient import ( + "context" "errors" "sync" @@ -44,7 +45,7 @@ func (c *TestClient) Upload(mi *core.MetaInfo) error { } // Download returns the metainfo for digest. Ignores namespace. -func (c *TestClient) Download(namespace string, d core.Digest) (*core.MetaInfo, error) { +func (c *TestClient) Download(ctx context.Context, namespace string, d core.Digest) (*core.MetaInfo, error) { c.Lock() defer c.Unlock() mi, ok := c.m[d] diff --git a/tracker/trackerserver/metainfo.go b/tracker/trackerserver/metainfo.go index f91352d0d..0d375ea59 100644 --- a/tracker/trackerserver/metainfo.go +++ b/tracker/trackerserver/metainfo.go @@ -32,7 +32,7 @@ func (s *Server) getMetaInfoHandler(w http.ResponseWriter, r *http.Request) erro } timer := s.stats.Timer("get_metainfo").Start() - mi, err := s.originCluster.GetMetaInfo(namespace, d) + mi, err := s.originCluster.GetMetaInfo(r.Context(), namespace, d) if err != nil { if serr, ok := err.(httputil.StatusError); ok { // Propagate errors received from origin. diff --git a/tracker/trackerserver/metainfo_test.go b/tracker/trackerserver/metainfo_test.go index 0ebcb32cf..b634ab791 100644 --- a/tracker/trackerserver/metainfo_test.go +++ b/tracker/trackerserver/metainfo_test.go @@ -14,8 +14,11 @@ package trackerserver import ( + "context" "testing" + "github.com/golang/mock/gomock" + "github.com/uber/kraken/core" "github.com/uber/kraken/lib/hashring" "github.com/uber/kraken/lib/hostlist" @@ -42,11 +45,11 @@ func TestGetMetaInfoHandlerFetchesFromOrigin(t *testing.T) { namespace := core.TagFixture() mi := core.MetaInfoFixture() - mocks.originCluster.EXPECT().GetMetaInfo(namespace, mi.Digest()).Return(mi, nil) + mocks.originCluster.EXPECT().GetMetaInfo(gomock.Any(), namespace, mi.Digest()).Return(mi, nil) client := newMetaInfoClient(addr) - result, err := client.Download(namespace, mi.Digest()) + result, err := client.Download(context.Background(), namespace, mi.Digest()) require.NoError(err) require.Equal(mi, result) } @@ -64,11 +67,11 @@ func TestGetMetaInfoHandlerPropagatesOriginError(t *testing.T) { mi := core.MetaInfoFixture() mocks.originCluster.EXPECT().GetMetaInfo( - namespace, mi.Digest()).Return(nil, httputil.StatusError{Status: 599}).MinTimes(1) + gomock.Any(), namespace, mi.Digest()).Return(nil, httputil.StatusError{Status: 599}).MinTimes(1) client := newMetaInfoClient(addr) - _, err := client.Download(namespace, mi.Digest()) + _, err := client.Download(context.Background(), namespace, mi.Digest()) require.Error(err) require.True(httputil.IsStatus(err, 599)) } diff --git a/tracker/trackerserver/server.go b/tracker/trackerserver/server.go index f3ed3744e..739558263 100644 --- a/tracker/trackerserver/server.go +++ b/tracker/trackerserver/server.go @@ -51,8 +51,8 @@ func New( policy *peerhandoutpolicy.PriorityPolicy, peerStore peerstore.Store, originStore originstore.Store, - originCluster blobclient.ClusterClient) *Server { - + originCluster blobclient.ClusterClient, +) *Server { config = config.applyDefaults() stats = stats.Tagged(map[string]string{