Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 155 additions & 57 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type flagConfig struct {
WalCommitInterval model.Duration
WalMaxSamplesPerSegment uint32
HeadRetentionTimeout model.Duration
UseBlockManagerStorage bool

featureList []string
memlimitRatio float64
Expand Down Expand Up @@ -314,7 +315,8 @@ func main() {
Registerer: prometheus.DefaultRegisterer,
Gatherer: prometheus.DefaultGatherer,
},
promlogConfig: promlog.Config{},
promlogConfig: promlog.Config{},
UseBlockManagerStorage: false,
}

a := kingpin.New(filepath.Base(os.Args[0]), "The Prom++ monitoring server").UsageWriter(os.Stdout)
Expand Down Expand Up @@ -562,7 +564,7 @@ func main() {

logger := promlog.New(&cfg.promlogConfig)

readPromPPFeatures(logger)
readPromPPFeatures(logger, &cfg)

if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err))
Expand Down Expand Up @@ -822,56 +824,70 @@ func main() {
scraper := &readyScrapeManager{}

// PP_CHANGES.md: rebuild on cpp start
// In server mode the persisted blocks are managed by block.Manager (read +
// retention) and block.Compactor (compaction) instead of a tsdb.DB. The
// block.Manager is wired into the fanout via a querier-only storage.Storage
// adapter; localStorage stays an empty stub. In agent mode the secondary is
// still localStorage (agent.DB set later).
// Server mode supports two historical-block storage schemes selected by
// --storage.prompp.use-block-manager:

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using PROMPP_FEATURE is enough, this flag is unneeded

// 1) true (default): block.Manager + block.Compactor for persisted blocks.
// 2) false: pre-PR-377 mode with tsdb.DB serving persisted blocks.
// In both modes, PP head manager + adapter remain the write path.
var (
blockManager *block.Manager
blockCompactor *block.Compactor
tsdbHistorical *tsdbHistoricalStorage
compactCancel context.CancelFunc
persistedStorage storage.Storage = localStorage
startTimeFn func() (int64, error) = localStorage.StartTime
)
if !agentMode {
retentionMs := int64(time.Duration(cfg.tsdb.RetentionDuration) / time.Millisecond)
blocksToDelete := pp_pkg_tsdb.NewBlocksToDelete(
retentionMs,
int64(cfg.tsdb.MaxBytes),
pp_pkg_tsdb.CatalogHeadsExtraSize(dataDir, headCatalog),
prometheus.DefaultRegisterer,
)
blockManager, err = block.NewManager(localStoragePath, &block.Options{
RetentionDuration: retentionMs,
CorruptedRetentionDuration: time.Duration(cfg.tsdb.CorruptedRetentionDuration),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blocksToDelete, log.With(logger, "component", "blockmanager"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to initialize block manager", "err", err)
os.Exit(1)
}
if cfg.UseBlockManagerStorage {
level.Info(logger).Log("msg", "Using block-manager storage scheme")
retentionMs := int64(time.Duration(cfg.tsdb.RetentionDuration) / time.Millisecond)
blocksToDelete := pp_pkg_tsdb.NewBlocksToDelete(
retentionMs,
int64(cfg.tsdb.MaxBytes),
pp_pkg_tsdb.CatalogHeadsExtraSize(dataDir, headCatalog),
prometheus.DefaultRegisterer,
)
blockManager, err = block.NewManager(localStoragePath, &block.Options{
RetentionDuration: retentionMs,
CorruptedRetentionDuration: time.Duration(cfg.tsdb.CorruptedRetentionDuration),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blocksToDelete, log.With(logger, "component", "blockmanager"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to initialize block manager", "err", err)
os.Exit(1)
}

var compactCtx context.Context
compactCtx, compactCancel = context.WithCancel(context.Background())
blockCompactor, err = block.NewCompactor(compactCtx, localStoragePath, &block.CompactorOptions{
MinBlockDuration: int64(time.Duration(cfg.tsdb.MinBlockDuration) / time.Millisecond),
MaxBlockChunkSegmentSize: int64(cfg.tsdb.MaxBlockChunkSegmentSize),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blockManager, log.With(logger, "component", "blockcompactor"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to create block compactor", "err", err)
os.Exit(1)
var compactCtx context.Context
compactCtx, compactCancel = context.WithCancel(context.Background())
blockCompactor, err = block.NewCompactor(compactCtx, localStoragePath, &block.CompactorOptions{
MinBlockDuration: int64(time.Duration(cfg.tsdb.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(cfg.tsdb.MaxBlockDuration) / time.Millisecond),
MaxBlockChunkSegmentSize: int64(cfg.tsdb.MaxBlockChunkSegmentSize),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blockManager, log.With(logger, "component", "blockcompactor"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to create block compactor", "err", err)
os.Exit(1)
}
// Drive compaction from the manager's single reload loop so compact
// and block deletion never run concurrently.
blockManager.SetCompactor(blockCompactor)

bs := &blockStorage{m: blockManager, onClose: func() error {
// Cancel any in-flight leveled compaction first so the manager
// loop can return promptly, then stop the loop and close blocks.
compactCancel()
blockManager.Close()
return nil
}}
persistedStorage = bs
startTimeFn = bs.StartTime
} else {
level.Info(logger).Log("msg", "Using pre-PR-377 historical TSDB storage scheme")
tsdbHistorical = &tsdbHistoricalStorage{}
persistedStorage = tsdbHistorical
startTimeFn = tsdbHistorical.StartTime
}

bs := &blockStorage{m: blockManager, onClose: func() error {
blockCompactor.Close()
blockManager.Close()
compactCancel()
return nil
}}
persistedStorage = bs
startTimeFn = bs.StartTime
}

remoteRead := pp_pkg_remote.NewRemoteRead(
Expand Down Expand Up @@ -1422,15 +1438,16 @@ func main() {
)
}
if !agentMode {
// Persisted block storage (block.Manager + block.Compactor). The tsdb.DB
// is disabled in server mode; persisted blocks are read via block.Manager
// and compacted by block.Compactor, both started above. This actor only
// signals readiness and tears the storage down on shutdown.
// Server storage startup depends on selected storage scheme.
// PP_CHANGES.md: rebuild on cpp
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "Starting persisted block storage ...")
if cfg.UseBlockManagerStorage {
level.Info(logger).Log("msg", "Starting persisted block storage ...")
} else {
level.Info(logger).Log("msg", "Starting TSDB storage ...")
}
if cfg.tsdb.WALSegmentSize != 0 {
if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
Expand All @@ -1449,21 +1466,38 @@ func main() {
level.Info(logger).Log("fs_type", fsType)
}

level.Info(logger).Log("msg", "Persisted block storage started")
level.Debug(logger).Log("msg", "Block storage options",
"MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"CorruptedRetentionDuration", cfg.tsdb.CorruptedRetentionDuration,
"EnableOverlappingCompaction", cfg.tsdb.EnableOverlappingCompaction,
)
if cfg.UseBlockManagerStorage {
level.Info(logger).Log("msg", "Persisted block storage started")
level.Debug(logger).Log("msg", "Block storage options",
"MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"CorruptedRetentionDuration", cfg.tsdb.CorruptedRetentionDuration,
"EnableOverlappingCompaction", cfg.tsdb.EnableOverlappingCompaction,
)
} else {
opts := cfg.tsdb.ToTSDBOptions()
db, err := tsdb.Open(localStoragePath, logger, prometheus.DefaultRegisterer, &opts, localStorage.stats)
if err != nil {
return fmt.Errorf("opening storage failed: %w", err)
}
tsdbHistorical.Set(db)
level.Info(logger).Log("msg", "TSDB storage started")
level.Debug(logger).Log("msg", "TSDB options",
"MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBlockDuration", cfg.tsdb.MaxBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"WALCompression", cfg.tsdb.WALCompression,
)
}

close(dbOpen)
<-cancel
return nil
},
func(err error) {
// Closes adapter (head) + blockStorage (block.Manager + block.Compactor) + remoteRead.
// Closes adapter (head) + historical storage backend + remoteRead.
if err := fanoutStorage.Close(); err != nil {
level.Error(logger).Log("msg", "Error stopping storage", "err", err)
}
Expand Down Expand Up @@ -1793,6 +1827,64 @@ func (b *blockStorage) StartTime() (int64, error) {
return math.MaxInt64, nil
}

// tsdbHistoricalStorage adapts a tsdb.DB to serve persisted blocks as a
// fanout secondary, while dropping appends so writes stay on the PP head path.
type tsdbHistoricalStorage struct {
mtx sync.RWMutex
db *tsdb.DB
}

func (s *tsdbHistoricalStorage) Set(db *tsdb.DB) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.db = db
}

func (s *tsdbHistoricalStorage) get() *tsdb.DB {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.db
}

func (s *tsdbHistoricalStorage) Querier(mint, maxt int64) (storage.Querier, error) {
db := s.get()
if db == nil {
return nil, tsdb.ErrNotReady
}
return db.Querier(mint, maxt)
}

func (s *tsdbHistoricalStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
db := s.get()
if db == nil {
return nil, tsdb.ErrNotReady
}
return db.ChunkQuerier(mint, maxt)
}

func (s *tsdbHistoricalStorage) Appender(context.Context) storage.Appender {
return noopAppender{}
}

func (s *tsdbHistoricalStorage) Close() error {
db := s.get()
if db == nil {
return nil
}
return db.Close()
}

func (s *tsdbHistoricalStorage) StartTime() (int64, error) {
db := s.get()
if db == nil {
return math.MaxInt64, tsdb.ErrNotReady
}
if blocks := db.Blocks(); len(blocks) > 0 {
return blocks[0].Meta().MinTime, nil
}
return math.MaxInt64, nil
}

// noopAppender silently drops samples and reports success, so that the fanout
// appender (which appends to every secondary) does not fail on the read-only
// blockStorage secondary.
Expand Down Expand Up @@ -2185,7 +2277,7 @@ func (p *rwProtoMsgFlagParser) Set(opt string) error {
return nil
}

func readPromPPFeatures(logger log.Logger) {
func readPromPPFeatures(logger log.Logger, cfg *flagConfig) {
features := os.Getenv("PROMPP_FEATURES")
if features == "" {
return
Expand Down Expand Up @@ -2313,6 +2405,12 @@ func readPromPPFeatures(logger log.Logger) {
case "shrink_shard_copier":
pp_storage.ShrinkShardCopier = true
_ = level.Info(logger).Log("msg", "[FEATURE] Shrink shard copier is enabled.")

case "enable_block_manager":
if cfg != nil {
cfg.UseBlockManagerStorage = true
}
_ = level.Info(logger).Log("msg", "[FEATURE] Block-manager historical storage is enabled.")
}
}
}
Loading
Loading