Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,8 @@ const KeyRef backupRangePartitionedMapUploadedPrefix = "\xff\x02/backupRangePart
const KeyRangeRef backupRangePartitionedProgressKeys("\xff\x02/backupRangePartitionedProgress/"_sr,
"\xff\x02/backupRangePartitionedProgress0"_sr);
const KeyRef backupRangePartitionedProgressPrefix = backupRangePartitionedProgressKeys.begin;
const KeyRef backupRangePartitionedStartedKey = "\xff\x02/backupRangePartitionedStarted"_sr;
extern const KeyRef backupRangePartitionedPausedKey = "\xff\x02/backupRangePartitionedPaused"_sr;

Key backupRangePartitionedMapUploadedKeyFor(Version v) {
return backupRangePartitionedMapUploadedPrefix.withSuffix(format("%lld", v));
Expand Down Expand Up @@ -1114,6 +1116,20 @@ WorkerBackupStatus decodeBackupRangePartitionedProgressValue(const ValueRef& val
return status;
}

Value encodeBackupRangePartitionedStartedValue(const std::vector<std::pair<UID, Version>>& ids) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBackupStartValue()));
wr << ids;
return wr.toValue();
}

std::vector<std::pair<UID, Version>> decodeBackupRangePartitionedStartedValue(const ValueRef& value) {
std::vector<std::pair<UID, Version>> ids;
BinaryReader reader(value, IncludeVersion());
if (!value.empty())
reader >> ids;
return ids;
}

const KeyRef previousCoordinatorsKey = "\xff/previousCoordinators"_sr;
const KeyRef coordinatorsKey = "\xff/coordinators"_sr;
const KeyRef logsKey = "\xff/logs"_sr;
Expand Down
17 changes: 17 additions & 0 deletions fdbclient/include/fdbclient/SystemData.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ Value backupProgressValue(const WorkerBackupStatus& status);
UID decodeBackupProgressKey(const KeyRef& key);
WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value);

//============ System Keys used by Range Partitioned Backup ============//

// The key to signal backup workers a new backup job is submitted.
// "\xff\x02/backupRangePartitionedProgress/[[workerID]]" := "[[WorkerBackupStatus]]"
extern const KeyRangeRef backupRangePartitionedProgressKeys;
extern const KeyRef backupRangePartitionedProgressPrefix;
Expand All @@ -432,6 +435,20 @@ WorkerBackupStatus decodeBackupRangePartitionedProgressValue(const ValueRef& val
extern const KeyRef backupRangePartitionedMapUploadedPrefix;
Key backupRangePartitionedMapUploadedKeyFor(Version v);

// The key to signal backup workers a new backup job is submitted.
// "\xff\x02/backupRangePartitionedStarted" := "[[vector<UID,Version1>]]"
extern const KeyRef backupRangePartitionedStartedKey;
Value encodeBackupRangePartitionedStartedValue(const std::vector<std::pair<UID, Version>>& ids);
std::vector<std::pair<UID, Version>> decodeBackupRangePartitionedStartedValue(const ValueRef& value);

// The key to signal backup workers that they should resume or pause.
// "\xff\x02/backupRangePartitionedPaused" := "[[0|1]]"
// 0 = Send a signal to resume/already resumed.
// 1 = Send a signal to pause/already paused.
extern const KeyRef backupRangePartitionedPausedKey;

//============ End of System Keys used by Range Partitioned Backup ============//

// The key to signal backup workers a new backup job is submitted.
// "\xff\x02/backupStarted" := "[[vector<UID,Version1>]]"
extern const KeyRef backupStartedKey;
Expand Down
6 changes: 6 additions & 0 deletions fdbserver/backupworker/BackupRangePartitionedProgress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,9 @@ Future<Void> getBackupRangePartitionedProgress(Database cx,
co_await tr.onError(err);
}
}

// TODO akanksha: Placeholder -> Implement when recovery code is implemented.
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>>
BackupRangePartitionedProgress::getUnfinishedBackup() {
return {};
}
2 changes: 2 additions & 0 deletions fdbserver/backupworker/BackupRangePartitionedProgress.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class BackupRangePartitionedProgress : NonCopyable, ReferenceCounted<BackupRange

void delref() { ReferenceCounted<BackupRangePartitionedProgress>::delref(); }

std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> getUnfinishedBackup();

private:
// Used for logging and debugging purpose to identify which backup progress it is.
const UID dbgid;
Expand Down
198 changes: 181 additions & 17 deletions fdbserver/backupworker/BackupWorkerRangePartitioned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ struct BackupRangePartitionedData {
NotifiedVersion pulledVersion;
Version logFolderBaseVersion;
AsyncVar<Reference<LogSystem>> logSystem;
AsyncVar<bool> paused; // Track if "backupPausedKey" is set.
AsyncVar<bool> paused; // Track if "backupRangePartitionedPausedKey" is set.
Reference<FlowLock> lock;
AsyncTrigger doneTrigger;
AsyncTrigger changedTrigger;
Expand Down Expand Up @@ -632,6 +632,29 @@ Future<Void> addMutation(Reference<IBackupFile> logFile,
co_await logFile->append(mutation.begin(), mutation.size());
}

static Future<Void> updateLogBytesWritten(BackupRangePartitionedData* self, std::map<UID, int64_t> bytesPerBackup) {
Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));

while (true) {
Error err;
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);

for (const auto& [uid, bytes] : bytesPerBackup) {
BackupConfig config(uid);
config.logBytesWritten().atomicOp(tr, bytes, MutationRef::AddValue);
}
co_await tr->commit();
co_return;
} catch (Error& e) {
err = e;
}
co_await tr->onError(err);
}
}

Future<Void> saveMutationsToFile(BackupRangePartitionedData* self, Version lastVersionInFile, int numMsg) {
// Make sure all backups are ready, otherwise mutations will be lost.
while (!self->isAllInfoReady()) {
Expand Down Expand Up @@ -760,35 +783,172 @@ Future<Void> saveMutationsToFile(BackupRangePartitionedData* self, Version lastV
}
co_await waitForAll(finished);

std::map<UID, int64_t> bytesPerBackup;
for (auto& lf : activeFiles) {
self->backups[lf.backupUid].nextFileBeginVersion = lastVersionInFile + 1;
bytesPerBackup[lf.backupUid] += lf.file->size();
}
}

static Future<Void> updateLogBytesWritten(BackupRangePartitionedData* self,
std::vector<UID> backupUids,
std::vector<Reference<IBackupFile>> logFiles) {
// TODO akanksha: Implement in next PR.
co_return;
co_await updateLogBytesWritten(self, std::move(bytesPerBackup));
}

// It closes the race between getMinBackupVersion's snapshot at master-recruit time and the actual state of
// backupStartedKey when the old epoch backup worker comes up — specifically the case where backup configuration changed
// during that window so the backup worker is no longer needed.
static Future<bool> shouldBackupWorkerExitEarly(BackupRangePartitionedData* self) {
// TODO akanksha: Implement in next PR.
co_return true;
while (true) {
ReadYourWritesTransaction tr(self->cx);
while (true) {
Error err;
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> value = co_await tr.get(backupRangePartitionedStartedKey);
std::vector<std::pair<UID, Version>> uidVersions;
if (value.present()) {
bool shouldExit = self->endVersion.present();
uidVersions = decodeBackupRangePartitionedStartedValue(value.get());
TraceEvent e("BWRangePartitionedGotStartKey", self->myId);
int i = 1;
for (auto [uid, version] : uidVersions) {
e.detail(format("BackupID%d", i), uid).detail(format("Version%d", i), version);
i++;
if (shouldExit && version < self->endVersion.get()) {
shouldExit = false;
}
}
co_await onBackupChanges(self, uidVersions);
co_return shouldExit;
}

TraceEvent("BWRangePartitionedEmptyStartKey", self->myId);
Future<Void> watchFuture = tr.watch(backupRangePartitionedStartedKey);
co_await tr.commit();
co_await watchFuture;
break;
} catch (Error& e) {
err = e;
}
co_await tr.onError(err);
}
}
}

static Future<Void> monitorBackupStartedKeyChanges(BackupRangePartitionedData* self) {
// TODO akanksha: Implement in next PR.
co_return;
while (true) {
ReadYourWritesTransaction tr(self->cx);
while (true) {
Error err;
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> value = co_await tr.get(backupRangePartitionedStartedKey);
std::vector<std::pair<UID, Version>> uidVersions;
if (value.present()) {
uidVersions = decodeBackupRangePartitionedStartedValue(value.get());
TraceEvent e("BWRangePartitionedGotStartKey", self->myId);
int i = 1;
for (auto [uid, version] : uidVersions) {
e.detail(format("BackupID%d", i), uid).detail(format("Version%d", i), version);
i++;
}
}

onBackupChanges(self, uidVersions);
Future<Void> watchFuture = tr.watch(backupRangePartitionedStartedKey);
co_await tr.commit();
co_await watchFuture;
break;
} catch (Error& e) {
err = e;
}
co_await tr.onError(err);
}
}
}

// This function is used to set backup worker's saved version latestBackupWorkerSavedVersion in BackupConfig.
Future<Void> setBackupKeys(BackupRangePartitionedData* self, std::map<UID, Version> savedLogVersions) {
// TODO akanksha: Implement in next PR.
co_return;
Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));

while (true) {
Error err;
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);

std::vector<Future<Optional<Version>>> prevBackupWorkerSavedVersions;
std::vector<BackupConfig> versionConfigs;
std::vector<Future<Optional<bool>>> allWorkersReady;
for (const auto& [uid, version] : savedLogVersions) {
BackupConfig config(uid);
versionConfigs.emplace_back(config);
prevBackupWorkerSavedVersions.push_back(config.latestBackupWorkerSavedVersion().get(tr));
allWorkersReady.push_back(config.allWorkerStarted().get(tr));
}
co_await (waitForAll(prevBackupWorkerSavedVersions) && waitForAll(allWorkersReady));

for (int i = 0; i < prevBackupWorkerSavedVersions.size(); i++) {
if (!allWorkersReady[i].get().present() || !allWorkersReady[i].get().get()) {
continue;
}

const Version current = savedLogVersions[versionConfigs[i].getUid()];
if (prevBackupWorkerSavedVersions[i].get().present()) {
const Version prev = prevBackupWorkerSavedVersions[i].get().get();
if (prev > current) {
TraceEvent(SevWarn, "BWRangePartitionedVersionInverse", self->myId)
.detail("Prev", prev)
.detail("Current", current);
}
}
if (self->backupEpoch == self->oldestBackupEpoch &&
(!prevBackupWorkerSavedVersions[i].get().present() ||
prevBackupWorkerSavedVersions[i].get().get() < current)) {
TraceEvent("BWRangePartitionedSetVersion", self->myId)
.detail("BackupID", versionConfigs[i].getUid())
.detail("Version", current);
versionConfigs[i].latestBackupWorkerSavedVersion().set(tr, current);
}
}
co_await tr->commit();
co_return;
} catch (Error& e) {
err = e;
}
co_await tr->onError(err);
}
}

static Future<Void> monitorWorkerPause(BackupRangePartitionedData* self) {
co_return;
Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
Future<Void> watch;

while (true) {
Error err;
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);

Optional<Value> value = co_await tr->get(backupRangePartitionedPausedKey);
bool paused = value.present() && value.get() == "1"_sr;
if (self->paused.get() != paused) {
TraceEvent(paused ? "BWRangePartitionedPaused" : "BWRangePartitionedResumed", self->myId).log();
self->paused.set(paused);
}

watch = tr->watch(backupRangePartitionedPausedKey);
co_await tr->commit();
co_await watch;
tr->reset();
continue;
} catch (Error& e) {
err = e;
}
co_await tr->onError(err);
}
}

Future<Void> monitorBackupRangePartitionedProgress(BackupRangePartitionedData* self) {
Expand All @@ -800,8 +960,7 @@ Future<Void> monitorBackupRangePartitionedProgress(BackupRangePartitionedData* s
co_await (self->changedTrigger.onTrigger() || self->logSystem.onChange());
}

// check all workers have started by checking their progress is larger
// than the backup's start version.
// Check all workers have started by checking their progress is larger than the backup's start version.
Reference<BackupRangePartitionedProgress> progress(new BackupRangePartitionedProgress(self->myId));
co_await getBackupRangePartitionedProgress(self->cx, self->myId, progress, SevDebug);

Expand All @@ -828,7 +987,6 @@ Future<Void> monitorBackupRangePartitionedProgress(BackupRangePartitionedData* s
}
}

// TODO akanksha: Implement and explain what setBackupKeys does in next PR.
Future<Void> setKeys = savedLogVersions.empty() ? Void() : setBackupKeys(self, savedLogVersions);
co_await (interval && setKeys);
}
Expand Down Expand Up @@ -968,6 +1126,12 @@ Future<Void> backupWorkerRangePartitioned(BackupInterface interf,
addActor.send(monitorBackupRangePartitionedProgress(&self));
}

addActor.send(monitorWorkerPause(&self));

// First need to call waitAndProcessPartitionMap before starting to pull data, because we need to know the
// partition assignment.
co_await waitAndProcessPartitionMap(&self);

// If the worker is on an old epoch and all backups starts a version >= the endVersion
bool exitEarly = co_await shouldBackupWorkerExitEarly(&self);
TraceEvent("BWRangePartitionedExitEarly", self.myId).detail("ExitEarly", exitEarly);
Expand Down