diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 109b7f1c9..9e8341184 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -173,6 +173,7 @@ type flagConfig struct { WalCommitInterval model.Duration WalMaxSamplesPerSegment uint32 HeadRetentionTimeout model.Duration + UseBlockManagerStorage bool featureList []string memlimitRatio float64 @@ -314,7 +315,8 @@ func main() { Registerer: prometheus.DefaultRegisterer, Gatherer: prometheus.DefaultGatherer, }, - promlogConfig: promlog.Config{}, + promlogConfig: promlog.Config{}, + UseBlockManagerStorage: false, } a := kingpin.New(filepath.Base(os.Args[0]), "The Prom++ monitoring server").UsageWriter(os.Stdout) @@ -562,7 +564,7 @@ func main() { logger := promlog.New(&cfg.promlogConfig) - readPromPPFeatures(logger) + readPromPPFeatures(logger, &cfg) if err := cfg.setFeatureListOptions(logger); err != nil { fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err)) @@ -822,56 +824,110 @@ func main() { scraper := &readyScrapeManager{} // PP_CHANGES.md: rebuild on cpp start - // In server mode the persisted blocks are managed by block.Manager (read + - // retention) and block.Compactor (compaction) instead of a tsdb.DB. The - // block.Manager is wired into the fanout via a querier-only storage.Storage - // adapter; localStorage stays an empty stub. In agent mode the secondary is - // still localStorage (agent.DB set later). + // Server mode supports two historical-block storage schemes selected by the + // PROMPP_FEATURES=enable_block_manager feature flag: + // 1) enabled: block.Manager + block.Compactor for persisted blocks. + // 2) disabled (default): pre-PR-377 mode with tsdb.DB serving persisted blocks. + // In both modes, PP head manager + adapter remain the write path. var ( blockManager *block.Manager blockCompactor *block.Compactor + tsdbHistorical *tsdbHistoricalStorage compactCancel context.CancelFunc persistedStorage storage.Storage = localStorage startTimeFn func() (int64, error) = localStorage.StartTime ) if !agentMode { - retentionMs := int64(time.Duration(cfg.tsdb.RetentionDuration) / time.Millisecond) - blocksToDelete := pp_pkg_tsdb.NewBlocksToDelete( - retentionMs, - int64(cfg.tsdb.MaxBytes), - pp_pkg_tsdb.CatalogHeadsExtraSize(dataDir, headCatalog), - prometheus.DefaultRegisterer, - ) - blockManager, err = block.NewManager(localStoragePath, &block.Options{ - RetentionDuration: retentionMs, - CorruptedRetentionDuration: time.Duration(cfg.tsdb.CorruptedRetentionDuration), - EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction, - }, blocksToDelete, log.With(logger, "component", "blockmanager"), prometheus.DefaultRegisterer) - if err != nil { - level.Error(logger).Log("msg", "failed to initialize block manager", "err", err) + // Storage is constructed eagerly here for both schemes. The historical + // path does no WAL replay (the PP head + adapter is the only write path), + // so opening it is as cheap as the block manager's initial reload; there is + // no need to defer the open into the run group or gate startup on it. + if cfg.tsdb.WALSegmentSize != 0 && (cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024) { + level.Error(logger).Log("msg", "flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB") os.Exit(1) } - - var compactCtx context.Context - compactCtx, compactCancel = context.WithCancel(context.Background()) - blockCompactor, err = block.NewCompactor(compactCtx, localStoragePath, &block.CompactorOptions{ - MinBlockDuration: int64(time.Duration(cfg.tsdb.MinBlockDuration) / time.Millisecond), - MaxBlockChunkSegmentSize: int64(cfg.tsdb.MaxBlockChunkSegmentSize), - EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction, - }, blockManager, log.With(logger, "component", "blockcompactor"), prometheus.DefaultRegisterer) - if err != nil { - level.Error(logger).Log("msg", "failed to create block compactor", "err", err) + if cfg.tsdb.MaxBlockChunkSegmentSize != 0 && cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 { + level.Error(logger).Log("msg", "flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB") os.Exit(1) } + switch fsType := prom_runtime.Statfs(localStoragePath); fsType { + case "NFS_SUPER_MAGIC": + level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.") + default: + level.Info(logger).Log("fs_type", fsType) + } - bs := &blockStorage{m: blockManager, onClose: func() error { - blockCompactor.Close() - blockManager.Close() - compactCancel() - return nil - }} - persistedStorage = bs - startTimeFn = bs.StartTime + if cfg.UseBlockManagerStorage { + level.Info(logger).Log("msg", "Using block-manager storage scheme") + level.Debug(logger).Log("msg", "Block storage options", + "MinBlockDuration", cfg.tsdb.MinBlockDuration, + "MaxBytes", cfg.tsdb.MaxBytes, + "RetentionDuration", cfg.tsdb.RetentionDuration, + "CorruptedRetentionDuration", cfg.tsdb.CorruptedRetentionDuration, + "EnableOverlappingCompaction", cfg.tsdb.EnableOverlappingCompaction, + ) + retentionMs := int64(time.Duration(cfg.tsdb.RetentionDuration) / time.Millisecond) + blocksToDelete := pp_pkg_tsdb.NewBlocksToDelete( + retentionMs, + int64(cfg.tsdb.MaxBytes), + pp_pkg_tsdb.CatalogHeadsExtraSize(dataDir, headCatalog), + prometheus.DefaultRegisterer, + ) + blockManager, err = block.NewManager(localStoragePath, &block.Options{ + RetentionDuration: retentionMs, + CorruptedRetentionDuration: time.Duration(cfg.tsdb.CorruptedRetentionDuration), + EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction, + }, blocksToDelete, log.With(logger, "component", "blockmanager"), prometheus.DefaultRegisterer) + if err != nil { + level.Error(logger).Log("msg", "failed to initialize block manager", "err", err) + os.Exit(1) + } + + var compactCtx context.Context + compactCtx, compactCancel = context.WithCancel(context.Background()) + blockCompactor, err = block.NewCompactor(compactCtx, localStoragePath, &block.CompactorOptions{ + MinBlockDuration: int64(time.Duration(cfg.tsdb.MinBlockDuration) / time.Millisecond), + MaxBlockDuration: int64(time.Duration(cfg.tsdb.MaxBlockDuration) / time.Millisecond), + MaxBlockChunkSegmentSize: int64(cfg.tsdb.MaxBlockChunkSegmentSize), + EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction, + }, blockManager, log.With(logger, "component", "blockcompactor"), prometheus.DefaultRegisterer) + if err != nil { + level.Error(logger).Log("msg", "failed to create block compactor", "err", err) + os.Exit(1) + } + // Drive compaction from the manager's single reload loop so compact + // and block deletion never run concurrently. + blockManager.SetCompactor(blockCompactor) + + bs := &blockStorage{m: blockManager, onClose: func() error { + // Cancel any in-flight leveled compaction first so the manager + // loop can return promptly, then stop the loop and close blocks. + compactCancel() + blockManager.Close() + return nil + }} + persistedStorage = bs + startTimeFn = bs.StartTime + } else { + level.Info(logger).Log("msg", "Using pre-PR-377 historical TSDB storage scheme") + opts := cfg.tsdb.ToTSDBOptions() + db, err := tsdb.Open(localStoragePath, logger, prometheus.DefaultRegisterer, &opts, localStorage.stats) + if err != nil { + level.Error(logger).Log("msg", "opening storage failed", "err", err) + os.Exit(1) + } + tsdbHistorical = &tsdbHistoricalStorage{db: db} + persistedStorage = tsdbHistorical + startTimeFn = tsdbHistorical.StartTime + level.Info(logger).Log("msg", "TSDB storage started") + level.Debug(logger).Log("msg", "TSDB options", + "MinBlockDuration", cfg.tsdb.MinBlockDuration, + "MaxBlockDuration", cfg.tsdb.MaxBlockDuration, + "MaxBytes", cfg.tsdb.MaxBytes, + "RetentionDuration", cfg.tsdb.RetentionDuration, + "WALCompression", cfg.tsdb.WALCompression, + ) + } } remoteRead := pp_pkg_remote.NewRemoteRead( @@ -1197,9 +1253,8 @@ func main() { prometheus.MustRegister(configSuccess) prometheus.MustRegister(configSuccessTime) - // Start all components while we wait for TSDB to open but only load - // initial config and mark ourselves as ready after it completed. - dbOpen := make(chan struct{}) + // Storage is opened eagerly during setup, so components can start and load + // the initial config without waiting on a storage-open signal. // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded). type closeOnce struct { @@ -1397,14 +1452,6 @@ func main() { cancel := make(chan struct{}) g.Add( func() error { - select { - case <-dbOpen: - // In case a shutdown is initiated before the dbOpen is released - case <-cancel: - reloadReady.Close() - return nil - } - if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { return fmt.Errorf("error loading config from %q: %w", cfg.configFile, err) } @@ -1422,48 +1469,18 @@ func main() { ) } if !agentMode { - // Persisted block storage (block.Manager + block.Compactor). The tsdb.DB - // is disabled in server mode; persisted blocks are read via block.Manager - // and compacted by block.Compactor, both started above. This actor only - // signals readiness and tears the storage down on shutdown. + // Storage is opened eagerly during setup (see above), so this actor only + // waits for shutdown and then closes the fanout, which closes the + // historical backend. // PP_CHANGES.md: rebuild on cpp cancel := make(chan struct{}) g.Add( func() error { - level.Info(logger).Log("msg", "Starting persisted block storage ...") - if cfg.tsdb.WALSegmentSize != 0 { - if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 { - return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB") - } - } - if cfg.tsdb.MaxBlockChunkSegmentSize != 0 { - if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 { - return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB") - } - } - - switch fsType := prom_runtime.Statfs(localStoragePath); fsType { - case "NFS_SUPER_MAGIC": - level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.") - default: - level.Info(logger).Log("fs_type", fsType) - } - - level.Info(logger).Log("msg", "Persisted block storage started") - level.Debug(logger).Log("msg", "Block storage options", - "MinBlockDuration", cfg.tsdb.MinBlockDuration, - "MaxBytes", cfg.tsdb.MaxBytes, - "RetentionDuration", cfg.tsdb.RetentionDuration, - "CorruptedRetentionDuration", cfg.tsdb.CorruptedRetentionDuration, - "EnableOverlappingCompaction", cfg.tsdb.EnableOverlappingCompaction, - ) - - close(dbOpen) <-cancel return nil }, func(err error) { - // Closes adapter (head) + blockStorage (block.Manager + block.Compactor) + remoteRead. + // Closes adapter (head) + historical storage backend + remoteRead. if err := fanoutStorage.Close(); err != nil { level.Error(logger).Log("msg", "Error stopping storage", "err", err) } @@ -1514,7 +1531,6 @@ func main() { localStorage.Set(db, 0) // db.SetWriteNotified(remoteStorage) // PP_CHANGES.md: rebuild on cpp - close(dbOpen) <-cancel return nil }, @@ -1545,13 +1561,6 @@ func main() { cancel := make(chan struct{}) g.Add( func() error { - select { - case <-dbOpen: - // In case a shutdown is initiated before the dbOpen is released - case <-cancel: - return nil - } - return hManager.Run() }, func(err error) { @@ -1793,6 +1802,35 @@ func (b *blockStorage) StartTime() (int64, error) { return math.MaxInt64, nil } +// tsdbHistoricalStorage adapts a tsdb.DB to serve persisted blocks as a +// fanout secondary, while dropping appends so writes stay on the PP head path. +type tsdbHistoricalStorage struct { + db *tsdb.DB +} + +func (s *tsdbHistoricalStorage) Querier(mint, maxt int64) (storage.Querier, error) { + return s.db.Querier(mint, maxt) +} + +func (s *tsdbHistoricalStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { + return s.db.ChunkQuerier(mint, maxt) +} + +func (s *tsdbHistoricalStorage) Appender(context.Context) storage.Appender { + return noopAppender{} +} + +func (s *tsdbHistoricalStorage) Close() error { + return s.db.Close() +} + +func (s *tsdbHistoricalStorage) StartTime() (int64, error) { + if blocks := s.db.Blocks(); len(blocks) > 0 { + return blocks[0].Meta().MinTime, nil + } + return math.MaxInt64, nil +} + // noopAppender silently drops samples and reports success, so that the fanout // appender (which appends to every secondary) does not fail on the read-only // blockStorage secondary. @@ -2185,7 +2223,7 @@ func (p *rwProtoMsgFlagParser) Set(opt string) error { return nil } -func readPromPPFeatures(logger log.Logger) { +func readPromPPFeatures(logger log.Logger, cfg *flagConfig) { features := os.Getenv("PROMPP_FEATURES") if features == "" { return @@ -2313,6 +2351,12 @@ func readPromPPFeatures(logger log.Logger) { case "shrink_shard_copier": pp_storage.ShrinkShardCopier = true _ = level.Info(logger).Log("msg", "[FEATURE] Shrink shard copier is enabled.") + + case "enable_block_manager": + if cfg != nil { + cfg.UseBlockManagerStorage = true + } + _ = level.Info(logger).Log("msg", "[FEATURE] Block-manager historical storage is enabled.") } } } diff --git a/pp/go/storage/block/compactor.go b/pp/go/storage/block/compactor.go index fae30ee01..d2262d725 100644 --- a/pp/go/storage/block/compactor.go +++ b/pp/go/storage/block/compactor.go @@ -3,59 +3,57 @@ package block import ( "context" "fmt" - "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" ) -const compactionInterval = time.Minute - // CompactorOptions configures the persisted-blocks compactor. type CompactorOptions struct { // MinBlockDuration is the smallest block range, used to derive the // exponential compaction ranges. If zero, tsdb.DefaultBlockDuration is used. MinBlockDuration int64 + // MaxBlockDuration limits the largest compaction range. If zero, no limit is + // applied and all exponential ranges are used. + MaxBlockDuration int64 // MaxBlockChunkSegmentSize is the max block chunk segment size. MaxBlockChunkSegmentSize int64 // EnableOverlappingCompaction enables compaction of overlapping blocks. EnableOverlappingCompaction bool - // CompactionInterval is the period of the background compaction loop. - // If zero, compactionInterval is used. - CompactionInterval time.Duration } -// BlockSource provides the compactor with the currently loaded blocks. -// It is implemented by Manager. +// BlockSource provides the compactor with the currently loaded blocks. It is +// implemented by Manager. type BlockSource interface { // Blocks returns a snapshot of the currently loaded blocks (the open // argument for Compact). Blocks() []*tsdb.Block } -// Compactor periodically compacts persisted on-disk blocks. It does not reload -// blocks itself: the new block is loaded and the compacted parents are deleted -// by the periodic reload loop of the block source (e.g. Manager). +// Compactor compacts persisted on-disk blocks. It does not run its own loop and +// does not reload or delete blocks: a single driver goroutine (the block +// Manager) calls Compact once per tick, right after reloading. Running compact +// and reload in that one goroutine means a compaction never races with the +// deletion of its inputs the parents created by one tick's compaction are +// loaded and deleted by the next tick's reload before the next plan is computed +// (mirroring tsdb's single-goroutine compact/reload loop). type Compactor struct { dir string compactor tsdb.Compactor source BlockSource - interval time.Duration logger log.Logger metrics *compactorMetrics - - stopc chan struct{} - stoppedc chan struct{} - stopOnce sync.Once } -// NewCompactor builds a LeveledCompactor from opts and starts the background -// compaction loop. +// NewCompactor builds a LeveledCompactor from opts. It does not start any +// background goroutine; the caller drives compaction via Compact (typically the +// block Manager's reload loop after calling Manager.SetCompactor). func NewCompactor( ctx context.Context, dir string, @@ -75,12 +73,8 @@ func NewCompactor( if minBlockDuration <= 0 { minBlockDuration = tsdb.DefaultBlockDuration } - interval := opts.CompactionInterval - if interval <= 0 { - interval = compactionInterval - } - rngs := tsdb.ExponentialBlockRanges(minBlockDuration, 10, 3) + rngs := compactionRanges(minBlockDuration, opts.MaxBlockDuration) leveled, err := tsdb.NewLeveledCompactorWithOptions(ctx, r, logger, rngs, chunkenc.NewPool(), tsdb.LeveledCompactorOptions{ MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, @@ -89,73 +83,84 @@ func NewCompactor( return nil, fmt.Errorf("create compactor: %w", err) } - c := &Compactor{ + return &Compactor{ dir: dir, compactor: leveled, source: source, - interval: interval, logger: logger, metrics: newCompactorMetrics(r), - stopc: make(chan struct{}), - stoppedc: make(chan struct{}), - } - go c.loop() - return c, nil -} - -func (c *Compactor) loop() { - defer func() { - close(c.stoppedc) - }() - - ticker := time.NewTicker(c.interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - c.metrics.compactionsTriggered.Inc() - if err := c.compactBlocks(); err != nil { - c.metrics.compactionsFailed.Inc() - level.Error(c.logger).Log("msg", "compaction failed", "err", err) - } - - case <-c.stopc: - return - } - } + }, nil } -// Close stops the compaction loop and waits for it to finish. -func (c *Compactor) Close() { - c.stopOnce.Do(func() { - close(c.stopc) - }) - <-c.stoppedc -} +// Compact runs a single compaction pass: it plans one group of eligible on-disk +// blocks and compacts them. It reports whether a compaction was performed (so the +// driver can immediately reload and compact again until nothing is left) and the +// ULIDs of the blocks it created (so the driver can remove them if the following +// reload fails). It does NOT reload or delete blocks; the driver reloads between +// passes, which loads the new block and deletes the now-obsolete parents before +// the next plan. Compact must be driven by a single goroutine so it never races +// with block deletion. +func (c *Compactor) Compact() (uids []ulid.ULID, compacted bool, err error) { + logger := c.loggerOrNop() + c.metrics.compactionsTriggered.Inc() -// compactBlocks compacts at most one planned group of eligible on-disk blocks. -// It does not reload blocks: the periodic reload loop of the block source loads -// the new block and deletes the compacted parents. -func (c *Compactor) compactBlocks() error { plan, err := c.compactor.Plan(c.dir) if err != nil { - return fmt.Errorf("plan compaction: %w", err) + c.metrics.compactionsFailed.Inc() + return nil, false, fmt.Errorf("plan compaction: %w", err) } if len(plan) == 0 { - return nil + return nil, false, nil } - select { - case <-c.stopc: - return nil - default: + openBlocks := c.source.Blocks() + start := time.Now() + level.Info(logger).Log( + "msg", "starting on-disk block compaction", + "plan_len", len(plan), + "plan", fmt.Sprintf("%v", plan), + "open_blocks", len(openBlocks), + ) + + uids, err = c.compactor.Compact(c.dir, plan, openBlocks) + if err != nil { + c.metrics.compactionsFailed.Inc() + return nil, false, fmt.Errorf("compact %v: %w", plan, err) + } + level.Info(logger).Log( + "msg", "finished on-disk block compaction", + "plan_len", len(plan), + "plan", fmt.Sprintf("%v", plan), + "open_blocks", len(openBlocks), + "result_blocks", len(uids), + "duration", time.Since(start), + ) + return uids, true, nil +} + +func (c *Compactor) loggerOrNop() log.Logger { + if c.logger == nil { + return log.NewNopLogger() } + return c.logger +} - if _, err := c.compactor.Compact(c.dir, plan, c.source.Blocks()); err != nil { - return fmt.Errorf("compact %s: %w", plan, err) +func compactionRanges(minBlockDuration, maxBlockDuration int64) []int64 { + if maxBlockDuration > 0 && maxBlockDuration < minBlockDuration { + maxBlockDuration = minBlockDuration + } + + rngs := tsdb.ExponentialBlockRanges(minBlockDuration, 10, 3) + if maxBlockDuration <= 0 { + return rngs + } + + for i, v := range rngs { + if v > maxBlockDuration { + return rngs[:i] + } } - return nil + return rngs } // diff --git a/pp/go/storage/block/compactor_test.go b/pp/go/storage/block/compactor_test.go index abd939c9a..33f51b36e 100644 --- a/pp/go/storage/block/compactor_test.go +++ b/pp/go/storage/block/compactor_test.go @@ -4,20 +4,21 @@ import ( "fmt" "sync" "testing" - "time" "github.com/oklog/ulid" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/require" ) -func TestCompactorCompactBlocksUsesPlanAndSource(t *testing.T) { +func TestCompactorCompactUsesPlanAndSource(t *testing.T) { t.Parallel() + wantUID := ulid.MustNew(1, nil) fake := &fakeCompactor{ - plan: []string{"01AAA", "01BBB"}, + plan: []string{"01AAA", "01BBB"}, + result: []ulid.ULID{wantUID}, } - source := fakeBlockSource{ + source := &fakeBlockSource{ blocks: []*tsdb.Block{nil, nil}, } @@ -25,57 +26,91 @@ func TestCompactorCompactBlocksUsesPlanAndSource(t *testing.T) { dir: "/tmp/data", compactor: fake, source: source, - stopc: make(chan struct{}), - stoppedc: make(chan struct{}), + metrics: newCompactorMetrics(nil), } - err := c.compactBlocks() + uids, compacted, err := c.Compact() require.NoError(t, err) + require.True(t, compacted) + require.Equal(t, []ulid.ULID{wantUID}, uids) require.True(t, fake.compactCalled) require.Equal(t, "/tmp/data", fake.compactDest) require.Equal(t, []string{"01AAA", "01BBB"}, fake.compactDirs) require.Len(t, fake.compactOpen, 2) } -func TestCompactorLoopTriggersCompactions(t *testing.T) { +func TestCompactorCompactNoPlanIsNoop(t *testing.T) { t.Parallel() - fake := &fakeCompactor{ - plan: []string{"01AAA"}, - } - + fake := &fakeCompactor{plan: nil} c := &Compactor{ dir: "/tmp/data", compactor: fake, - source: fakeBlockSource{}, - interval: 10 * time.Millisecond, + source: &fakeBlockSource{}, metrics: newCompactorMetrics(nil), - stopc: make(chan struct{}), - stoppedc: make(chan struct{}), } - go c.loop() - t.Cleanup(c.Close) + uids, compacted, err := c.Compact() + require.NoError(t, err) + require.False(t, compacted) + require.Nil(t, uids) + require.Equal(t, 1, fake.planCalls) + require.False(t, fake.compactCalled) +} + +func TestCompactionRanges(t *testing.T) { + t.Parallel() - require.Eventually(t, func() bool { - fake.mu.Lock() - defer fake.mu.Unlock() - return fake.planCalls > 0 && fake.compactCalls > 0 - }, time.Second, 10*time.Millisecond) + t.Run("without max duration", func(t *testing.T) { + t.Parallel() + ranges := compactionRanges(2*60*60*1000, 0) + require.Equal(t, []int64{ + 2 * 60 * 60 * 1000, + 6 * 60 * 60 * 1000, + 18 * 60 * 60 * 1000, + 54 * 60 * 60 * 1000, + 162 * 60 * 60 * 1000, + 486 * 60 * 60 * 1000, + 1458 * 60 * 60 * 1000, + 4374 * 60 * 60 * 1000, + 13122 * 60 * 60 * 1000, + 39366 * 60 * 60 * 1000, + }, ranges) + }) + + t.Run("with max duration", func(t *testing.T) { + t.Parallel() + ranges := compactionRanges(2*60*60*1000, 31*24*60*60*1000) + require.Equal(t, []int64{ + 2 * 60 * 60 * 1000, + 6 * 60 * 60 * 1000, + 18 * 60 * 60 * 1000, + 54 * 60 * 60 * 1000, + 162 * 60 * 60 * 1000, + 486 * 60 * 60 * 1000, + }, ranges) + }) + + t.Run("max lower than min is normalized", func(t *testing.T) { + t.Parallel() + ranges := compactionRanges(2*60*60*1000, 60*60*1000) + require.Equal(t, []int64{2 * 60 * 60 * 1000}, ranges) + }) } type fakeBlockSource struct { blocks []*tsdb.Block } -func (f fakeBlockSource) Blocks() []*tsdb.Block { +func (f *fakeBlockSource) Blocks() []*tsdb.Block { return f.blocks } type fakeCompactor struct { mu sync.Mutex - plan []string + plan []string + result []ulid.ULID planCalls int @@ -107,5 +142,5 @@ func (f *fakeCompactor) Compact(dest string, dirs []string, open []*tsdb.Block) f.compactDest = dest f.compactDirs = append([]string(nil), dirs...) f.compactOpen = append([]*tsdb.Block(nil), open...) - return nil, nil + return append([]ulid.ULID(nil), f.result...), nil } diff --git a/pp/go/storage/block/manager.go b/pp/go/storage/block/manager.go index 3a26f0b91..d48733f0d 100644 --- a/pp/go/storage/block/manager.go +++ b/pp/go/storage/block/manager.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "slices" + "strconv" "sync" "time" @@ -28,6 +29,7 @@ var ( const ( tmpForDeletionBlockDirSuffix = ".tmp-for-deletion" reloadBlocksInterval = time.Minute + blockDurationMinuteMS = int64(time.Minute / time.Millisecond) ) // Options configures block reload, mirroring the relevant tsdb.Options fields. @@ -51,12 +53,23 @@ type Manager struct { mtx sync.RWMutex blocks []*tsdb.Block + // compactor, when set via SetCompactor, runs a single compaction pass after + // each reload in the loop goroutine. Guarded by mtx. + compactor compactionRunner stopc chan struct{} stoppedc chan struct{} stopOnce sync.Once } +// compactionRunner runs a single compaction pass over the on-disk blocks, +// reporting whether a compaction was performed and the ULIDs of the blocks it +// created (so the driver can remove them if the following reload fails). +// Implemented by *Compactor. +type compactionRunner interface { + Compact() (uids []ulid.ULID, compacted bool, err error) +} + // NewManager init new [Manager] and starts its periodic reload loop. // // blocksToDelete is the retention filter (e.g. built via pp-pkg/tsdb.NewBlocksToDelete); @@ -81,14 +94,15 @@ func NewManager( blocksToDelete: blocksToDelete, logger: logger, chunkPool: chunkenc.NewPool(), - metrics: newMetrics(r), stopc: make(chan struct{}), stoppedc: make(chan struct{}), } + m.metrics = newMetrics(m, r) if err := m.reloadBlocks(); err != nil { return nil, fmt.Errorf("initial reload blocks: %w", err) } + m.logLoadedBlocks() level.Info(logger).Log("msg", "Block manager started", "dir", dir) go m.loop() @@ -106,9 +120,7 @@ func (m *Manager) loop() { for { select { case <-ticker.C: - if err := m.reloadBlocks(); err != nil { - level.Error(m.logger).Log("msg", "periodic reload blocks failed", "err", err) - } + m.reloadAndCompact() case <-m.stopc: return @@ -116,6 +128,67 @@ func (m *Manager) loop() { } } +// SetCompactor sets the compactor driven by the reload loop. Passing nil +// disables compaction. It is typically called right after construction, before +// the first tick. +func (m *Manager) SetCompactor(c compactionRunner) { + m.mtx.Lock() + m.compactor = c + m.mtx.Unlock() +} + +// reloadAndCompact reloads blocks and then compacts repeatedly until there is +// nothing left to compact, reloading between passes. Everything runs in this one +// goroutine, so a compaction never races with the deletion of its inputs: after +// each compaction the reload loads the new block and deletes the now-obsolete +// parents before the next plan is computed (mirroring tsdb's single-goroutine +// compact/reload loop). Compacting until exhaustion within a single tick avoids +// waiting a full ticker interval per compaction step. +func (m *Manager) reloadAndCompact() { + if err := m.reloadBlocks(); err != nil { + level.Error(m.logger).Log("msg", "periodic reload blocks failed", "err", err) + } + + m.mtx.RLock() + c := m.compactor + m.mtx.RUnlock() + if c == nil { + return + } + + for { + uids, compacted, err := c.Compact() + if err != nil { + level.Error(m.logger).Log("msg", "compaction failed", "err", err) + return + } + if !compacted { + return + } + // Reload to load the freshly created block and delete the obsolete + // parents before planning the next compaction. If the reload fails, + // remove the freshly compacted block(s) so a half-applied compaction + // does not leave orphaned blocks on disk (mirroring tsdb). + if err := m.reloadBlocks(); err != nil { + level.Error(m.logger).Log("msg", "reload blocks after compaction failed", "err", err) + m.deleteCompactedBlocks(uids) + return + } + } +} + +// deleteCompactedBlocks removes the given block directories from disk. It is used +// to clean up freshly compacted blocks when the reload that would have loaded +// them fails, so a half-applied compaction does not leave orphaned blocks behind +// (mirroring tsdb). +func (m *Manager) deleteCompactedBlocks(uids []ulid.ULID) { + for _, uid := range uids { + if err := os.RemoveAll(filepath.Join(m.dir, uid.String())); err != nil { + level.Error(m.logger).Log("msg", "delete compacted block after failed reload", "block", uid, "err", err) + } + } +} + // Close stops the reload loop and waits for it to finish. func (m *Manager) Close() { m.stopOnce.Do(func() { @@ -205,6 +278,25 @@ func (m *Manager) Blocks() []*tsdb.Block { return slices.Clone(m.blocks) } +// logLoadedBlocks logs the set of currently loaded blocks, mirroring the +// "Found healthy block" output of legacy tsdb so operators can see the on-disk +// block layout at startup. +func (m *Manager) logLoadedBlocks() { + m.mtx.RLock() + defer m.mtx.RUnlock() + + for _, b := range m.blocks { + meta := b.Meta() + level.Info(m.logger).Log( + "msg", "Found healthy block", + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "duration_minutes", normalizeBlockDurationMinutes(meta.MaxTime-meta.MinTime), + ) + } +} + // reloadBlocks reloads blocks from disk and deletes the ones past retention. // //revive:disable-next-line:cyclomatic // ported from tsdb.DB.reloadBlocks. @@ -269,8 +361,9 @@ func (m *Manager) reloadBlocks() (err error) { } var ( - toLoad []*tsdb.Block - blocksSize int64 + toLoad []*tsdb.Block + blocksSize int64 + blocksByDurationMins = map[int64]int{} ) // All deletable blocks should be unloaded. // NOTE: We need to loop through loadable one more time as there might be loadable ready to be removed (replaced by compacted block). @@ -282,8 +375,14 @@ func (m *Manager) reloadBlocks() (err error) { toLoad = append(toLoad, block) blocksSize += block.Size() + durationMinutes := normalizeBlockDurationMinutes(block.Meta().MaxTime - block.Meta().MinTime) + blocksByDurationMins[durationMinutes]++ } m.metrics.blocksBytes.Set(float64(blocksSize)) + m.metrics.loadedBlocksByDuration.Reset() + for durationMinutes, count := range blocksByDurationMins { + m.metrics.loadedBlocksByDuration.WithLabelValues(strconv.FormatInt(durationMinutes, 10)).Set(float64(count)) + } slices.SortFunc(toLoad, func(a, b *tsdb.Block) int { switch { @@ -358,19 +457,53 @@ func (m *Manager) isOutdatedBlock(id ulid.ULID, retentionDuration time.Duration) return id.Time() < uint64(time.Now().Add(-retentionDuration).UnixMilli()) } +func normalizeBlockDurationMinutes(durationMS int64) int64 { + if durationMS <= 0 { + return 0 + } + return (durationMS + blockDurationMinuteMS/2) / blockDurationMinuteMS +} + // // metrics // type metrics struct { - reloads prometheus.Counter - reloadsFailed prometheus.Counter - corruptedBlocks prometheus.Gauge - blocksBytes prometheus.Gauge + loadedBlocks prometheus.GaugeFunc + loadedBlocksByDuration *prometheus.GaugeVec + symbolTableSize prometheus.GaugeFunc + reloads prometheus.Counter + reloadsFailed prometheus.Counter + corruptedBlocks prometheus.Gauge + blocksBytes prometheus.Gauge } -func newMetrics(r prometheus.Registerer) *metrics { +func newMetrics(manager *Manager, r prometheus.Registerer) *metrics { m := &metrics{ + loadedBlocks: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_blocks_loaded", + Help: "Number of currently loaded data blocks.", + }, func() float64 { + manager.mtx.RLock() + defer manager.mtx.RUnlock() + return float64(len(manager.blocks)) + }), + loadedBlocksByDuration: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_blocks_loaded_by_duration", + Help: "Number of currently loaded blocks grouped by block duration in minutes.", + }, []string{"duration_minutes"}), + symbolTableSize: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_symbol_table_size_bytes", + Help: "Size of symbol table in memory for loaded blocks.", + }, func() float64 { + manager.mtx.RLock() + defer manager.mtx.RUnlock() + var symTblSize uint64 + for _, b := range manager.blocks { + symTblSize += b.GetSymbolTableSize() + } + return float64(symTblSize) + }), reloads: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_reloads_total", Help: "Number of times the database reloaded block data from disk.", @@ -391,6 +524,9 @@ func newMetrics(r prometheus.Registerer) *metrics { if r != nil { r.MustRegister( + m.loadedBlocks, + m.loadedBlocksByDuration, + m.symbolTableSize, m.reloads, m.reloadsFailed, m.corruptedBlocks, diff --git a/pp/go/storage/block/manager_test.go b/pp/go/storage/block/manager_test.go index e329f25a4..62f9418cb 100644 --- a/pp/go/storage/block/manager_test.go +++ b/pp/go/storage/block/manager_test.go @@ -3,10 +3,13 @@ package block import ( "os" "path/filepath" + "strconv" "testing" "github.com/go-kit/log" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -72,6 +75,35 @@ func TestManagerReturnsErrorOnInitialReloadFailure(t *testing.T) { require.Nil(t, m) } +func TestManagerExportsLoadedBlocksMetrics(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + createTestBlock(t, dir, 1000, "metric_a") + createTestBlock(t, dir, 5000, "metric_b") + + reg := prometheus.NewRegistry() + m, err := NewManager(dir, nil, nil, log.NewNopLogger(), reg) + require.NoError(t, err) + t.Cleanup(m.Close) + + require.Equal(t, float64(2), testutil.ToFloat64(m.metrics.loadedBlocks)) + require.Greater(t, testutil.ToFloat64(m.metrics.symbolTableSize), 0.0) + + durationCounts := map[int64]int{} + for _, b := range m.Blocks() { + duration := normalizeBlockDurationMinutes(b.Meta().MaxTime - b.Meta().MinTime) + durationCounts[duration]++ + } + for duration, count := range durationCounts { + require.Equal( + t, + float64(count), + testutil.ToFloat64(m.metrics.loadedBlocksByDuration.WithLabelValues(strconv.FormatInt(duration, 10))), + ) + } +} + func createTestBlock(t *testing.T, dir string, startTS int, metric string) { t.Helper() diff --git a/tsdb/db.go b/tsdb/db.go index f1f5e30a9..83d2c178a 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -25,6 +25,7 @@ import ( "os" "path/filepath" "slices" + "strconv" "strings" "sync" "time" @@ -60,7 +61,8 @@ const ( tmpForDeletionBlockDirSuffix = ".tmp-for-deletion" tmpForCreationBlockDirSuffix = ".tmp-for-creation" // Pre-2.21 tmp dir suffix, used in clean-up functions. - tmpLegacy = ".tmp" + tmpLegacy = ".tmp" + blockDurationMinuteMS = int64(time.Minute / time.Millisecond) ) // ErrNotReady is returned if the underlying storage is not ready yet. @@ -280,21 +282,22 @@ type DB struct { } type dbMetrics struct { - loadedBlocks prometheus.GaugeFunc - symbolTableSize prometheus.GaugeFunc - reloads prometheus.Counter - reloadsFailed prometheus.Counter - compactionsFailed prometheus.Counter - compactionsTriggered prometheus.Counter - compactionsSkipped prometheus.Counter - sizeRetentionCount prometheus.Counter - timeRetentionCount prometheus.Counter - startTime prometheus.GaugeFunc - tombCleanTimer prometheus.Histogram - blocksBytes prometheus.Gauge - maxBytes prometheus.Gauge - retentionDuration prometheus.Gauge - corruptedBlocks prometheus.Gauge + loadedBlocks prometheus.GaugeFunc + loadedBlocksByDuration *prometheus.GaugeVec + symbolTableSize prometheus.GaugeFunc + reloads prometheus.Counter + reloadsFailed prometheus.Counter + compactionsFailed prometheus.Counter + compactionsTriggered prometheus.Counter + compactionsSkipped prometheus.Counter + sizeRetentionCount prometheus.Counter + timeRetentionCount prometheus.Counter + startTime prometheus.GaugeFunc + tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + maxBytes prometheus.Gauge + retentionDuration prometheus.Gauge + corruptedBlocks prometheus.Gauge } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -308,6 +311,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { defer db.mtx.RUnlock() return float64(len(db.blocks)) }) + m.loadedBlocksByDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_blocks_loaded_by_duration", + Help: "Number of currently loaded blocks grouped by block duration in minutes.", + }, []string{"duration_minutes"}) m.symbolTableSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_symbol_table_size_bytes", Help: "Size of symbol table in memory for loaded blocks", @@ -387,6 +394,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { if r != nil { r.MustRegister( m.loadedBlocks, + m.loadedBlocksByDuration, m.symbolTableSize, m.reloads, m.reloadsFailed, @@ -1634,8 +1642,9 @@ func (db *DB) reloadBlocks() (err error) { // PP_CHANGES.md: rebuild on cpp end var ( - toLoad []*Block - blocksSize int64 + toLoad []*Block + blocksSize int64 + blocksByDurationMin = map[int64]int{} ) // All deletable blocks should be unloaded. // NOTE: We need to loop through loadable one more time as there might be loadable ready to be removed (replaced by compacted block). @@ -1647,8 +1656,14 @@ func (db *DB) reloadBlocks() (err error) { toLoad = append(toLoad, block) blocksSize += block.Size() + durationMinutes := normalizeBlockDurationMinutes(block.Meta().MaxTime - block.Meta().MinTime) + blocksByDurationMin[durationMinutes]++ } db.metrics.blocksBytes.Set(float64(blocksSize)) + db.metrics.loadedBlocksByDuration.Reset() + for durationMinutes, count := range blocksByDurationMin { + db.metrics.loadedBlocksByDuration.WithLabelValues(strconv.FormatInt(durationMinutes, 10)).Set(float64(count)) + } slices.SortFunc(toLoad, func(a, b *Block) int { switch { @@ -1731,6 +1746,13 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po return blocks, corrupted, nil } +func normalizeBlockDurationMinutes(durationMS int64) int64 { + if durationMS <= 0 { + return 0 + } + return (durationMS + blockDurationMinuteMS/2) / blockDurationMinuteMS +} + // DefaultBlocksToDelete returns a filter which decides time based and size based // retention from the options of the db. func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc {