Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fdbserver/core/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( AUDIT_RESTORE_BATCH_KEY_LIMIT, 100000 ); // 100K keys per batch (was hardcoded 10K)
init( AUDIT_PROGRESS_PERSIST_BYTES_INTERVAL, 100000000 ); // 100MB - only persist progress after this many bytes
init( ENABLE_AUDIT_VERBOSE_TRACE, false );
// Disabled in simulation: audit_storage locationmetadata already runs at controlled times in sim,
// and periodic triggering interferes with DD quiescence checks and the test harness audit checks.
init( AUDIT_LOCATION_METADATA_INTERVAL, 3600.0 ); if ( isSimulated ) AUDIT_LOCATION_METADATA_INTERVAL = 0;
init( LOGGING_STORAGE_COMMIT_WHEN_IO_TIMEOUT, true );
init( LOGGING_RECENT_STORAGE_COMMIT_SIZE, 20 );
init( LOGGING_COMPLETE_STORAGE_COMMIT_PROBABILITY, 0.001 );
Expand Down
1 change: 1 addition & 0 deletions fdbserver/core/include/fdbserver/core/Knobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,7 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
bool ENABLE_AUDIT_VERBOSE_TRACE;
int AUDIT_RESTORE_BATCH_KEY_LIMIT;
int64_t AUDIT_PROGRESS_PERSIST_BYTES_INTERVAL;
double AUDIT_LOCATION_METADATA_INTERVAL;
bool LOGGING_STORAGE_COMMIT_WHEN_IO_TIMEOUT;
double LOGGING_COMPLETE_STORAGE_COMMIT_PROBABILITY;
int LOGGING_RECENT_STORAGE_COMMIT_SIZE;
Expand Down
27 changes: 27 additions & 0 deletions fdbserver/datadistributor/DataDistribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ Future<UID> launchAudit(Reference<DataDistributor> self,
AuditType auditType,
KeyValueStoreType auditStorageEngineType);
Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
Future<Void> periodicAuditLocationMetadata(Reference<DataDistributor> self);
void loadAndDispatchAudit(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
Future<Void> dispatchAuditStorageServerShard(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor> self,
Expand Down Expand Up @@ -2968,6 +2969,8 @@ Future<Void> dataDistribution(Reference<DataDistributor> self,
.detail("InitialMode", self->initData->bulkDumpMode);
actors.push_back(bulkDumpCore(self, self->initialized.getFuture()));

actors.push_back(periodicAuditLocationMetadata(self));

co_await waitForAll(actors);
ASSERT_WE_THINK(false);
co_return;
Expand Down Expand Up @@ -3993,6 +3996,30 @@ Future<Void> cancelAuditStorage(Reference<DataDistributor> self, TriggerAuditReq
}
}

Future<Void> periodicAuditLocationMetadata(Reference<DataDistributor> self) {
if (SERVER_KNOBS->AUDIT_LOCATION_METADATA_INTERVAL <= 0) {
co_return;
}
co_await self->auditStorageInitialized.getFuture();
TraceEvent("PeriodicAuditLocationMetadataEnabled", self->ddId)
.detail("IntervalSeconds", SERVER_KNOBS->AUDIT_LOCATION_METADATA_INTERVAL);
while (true) {
co_await delay(SERVER_KNOBS->AUDIT_LOCATION_METADATA_INTERVAL);
try {
co_await self->auditStorageLocationMetadataLaunchingLock.take(TaskPriority::DefaultYield);
FlowLock::Releaser holder(self->auditStorageLocationMetadataLaunchingLock);
UID auditID =
co_await launchAudit(self, allKeys, AuditType::ValidateLocationMetadata, KeyValueStoreType::END);
TraceEvent("PeriodicAuditLocationMetadataLaunched", self->ddId).detail("AuditID", auditID);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
TraceEvent(SevWarn, "PeriodicAuditLocationMetadataError", self->ddId).error(e);
}
}
}

// Handling audit requests
// For each request, launch audit storage and reply to CC with following three replies:
// (1) auditID: reply auditID when the audit is successfully launch
Expand Down