diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index c97a5387c08..5c5ec7d8275 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1082,6 +1082,8 @@ const KeyRef backupRangePartitionedMapUploadedPrefix = "\xff\x02/backupRangePart const KeyRangeRef backupRangePartitionedProgressKeys("\xff\x02/backupRangePartitionedProgress/"_sr, "\xff\x02/backupRangePartitionedProgress0"_sr); const KeyRef backupRangePartitionedProgressPrefix = backupRangePartitionedProgressKeys.begin; +const KeyRef backupRangePartitionedStartedKey = "\xff\x02/backupRangePartitionedStarted"_sr; +extern const KeyRef backupRangePartitionedPausedKey = "\xff\x02/backupRangePartitionedPaused"_sr; Key backupRangePartitionedMapUploadedKeyFor(Version v) { return backupRangePartitionedMapUploadedPrefix.withSuffix(format("%lld", v)); @@ -1114,6 +1116,20 @@ WorkerBackupStatus decodeBackupRangePartitionedProgressValue(const ValueRef& val return status; } +Value encodeBackupRangePartitionedStartedValue(const std::vector>& ids) { + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBackupStartValue())); + wr << ids; + return wr.toValue(); +} + +std::vector> decodeBackupRangePartitionedStartedValue(const ValueRef& value) { + std::vector> ids; + BinaryReader reader(value, IncludeVersion()); + if (!value.empty()) + reader >> ids; + return ids; +} + const KeyRef previousCoordinatorsKey = "\xff/previousCoordinators"_sr; const KeyRef coordinatorsKey = "\xff/coordinators"_sr; const KeyRef logsKey = "\xff/logs"_sr; diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index c7357baa1cd..78ddc9050a1 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -419,6 +419,9 @@ Value backupProgressValue(const WorkerBackupStatus& status); UID decodeBackupProgressKey(const KeyRef& key); WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value); +//============ System Keys used by Range Partitioned Backup ============// + +// The key to signal backup workers a new backup job is submitted. // "\xff\x02/backupRangePartitionedProgress/[[workerID]]" := "[[WorkerBackupStatus]]" extern const KeyRangeRef backupRangePartitionedProgressKeys; extern const KeyRef backupRangePartitionedProgressPrefix; @@ -432,6 +435,20 @@ WorkerBackupStatus decodeBackupRangePartitionedProgressValue(const ValueRef& val extern const KeyRef backupRangePartitionedMapUploadedPrefix; Key backupRangePartitionedMapUploadedKeyFor(Version v); +// The key to signal backup workers a new backup job is submitted. +// "\xff\x02/backupRangePartitionedStarted" := "[[vector]]" +extern const KeyRef backupRangePartitionedStartedKey; +Value encodeBackupRangePartitionedStartedValue(const std::vector>& ids); +std::vector> decodeBackupRangePartitionedStartedValue(const ValueRef& value); + +// The key to signal backup workers that they should resume or pause. +// "\xff\x02/backupRangePartitionedPaused" := "[[0|1]]" +// 0 = Send a signal to resume/already resumed. +// 1 = Send a signal to pause/already paused. +extern const KeyRef backupRangePartitionedPausedKey; + +//============ End of System Keys used by Range Partitioned Backup ============// + // The key to signal backup workers a new backup job is submitted. // "\xff\x02/backupStarted" := "[[vector]]" extern const KeyRef backupStartedKey; diff --git a/fdbserver/backupworker/BackupRangePartitionedProgress.cpp b/fdbserver/backupworker/BackupRangePartitionedProgress.cpp index 120a9aba644..0429a63788a 100644 --- a/fdbserver/backupworker/BackupRangePartitionedProgress.cpp +++ b/fdbserver/backupworker/BackupRangePartitionedProgress.cpp @@ -71,3 +71,9 @@ Future getBackupRangePartitionedProgress(Database cx, co_await tr.onError(err); } } + +// TODO akanksha: Placeholder -> Implement when recovery code is implemented. +std::map, std::map> +BackupRangePartitionedProgress::getUnfinishedBackup() { + return {}; +} diff --git a/fdbserver/backupworker/BackupRangePartitionedProgress.h b/fdbserver/backupworker/BackupRangePartitionedProgress.h index a7fd9b75d2a..4987f7a4706 100644 --- a/fdbserver/backupworker/BackupRangePartitionedProgress.h +++ b/fdbserver/backupworker/BackupRangePartitionedProgress.h @@ -49,6 +49,8 @@ class BackupRangePartitionedProgress : NonCopyable, ReferenceCounted::delref(); } + std::map, std::map> getUnfinishedBackup(); + private: // Used for logging and debugging purpose to identify which backup progress it is. const UID dbgid; diff --git a/fdbserver/backupworker/BackupWorkerRangePartitioned.cpp b/fdbserver/backupworker/BackupWorkerRangePartitioned.cpp index 91fb59fa23d..305c4828d17 100644 --- a/fdbserver/backupworker/BackupWorkerRangePartitioned.cpp +++ b/fdbserver/backupworker/BackupWorkerRangePartitioned.cpp @@ -80,7 +80,7 @@ struct BackupRangePartitionedData { NotifiedVersion pulledVersion; Version logFolderBaseVersion; AsyncVar> logSystem; - AsyncVar paused; // Track if "backupPausedKey" is set. + AsyncVar paused; // Track if "backupRangePartitionedPausedKey" is set. Reference lock; AsyncTrigger doneTrigger; AsyncTrigger changedTrigger; @@ -632,6 +632,29 @@ Future addMutation(Reference logFile, co_await logFile->append(mutation.begin(), mutation.size()); } +static Future updateLogBytesWritten(BackupRangePartitionedData* self, std::map bytesPerBackup) { + Reference tr(new ReadYourWritesTransaction(self->cx)); + + while (true) { + Error err; + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + for (const auto& [uid, bytes] : bytesPerBackup) { + BackupConfig config(uid); + config.logBytesWritten().atomicOp(tr, bytes, MutationRef::AddValue); + } + co_await tr->commit(); + co_return; + } catch (Error& e) { + err = e; + } + co_await tr->onError(err); + } +} + Future saveMutationsToFile(BackupRangePartitionedData* self, Version lastVersionInFile, int numMsg) { // Make sure all backups are ready, otherwise mutations will be lost. while (!self->isAllInfoReady()) { @@ -760,35 +783,172 @@ Future saveMutationsToFile(BackupRangePartitionedData* self, Version lastV } co_await waitForAll(finished); + std::map bytesPerBackup; for (auto& lf : activeFiles) { self->backups[lf.backupUid].nextFileBeginVersion = lastVersionInFile + 1; + bytesPerBackup[lf.backupUid] += lf.file->size(); } -} -static Future updateLogBytesWritten(BackupRangePartitionedData* self, - std::vector backupUids, - std::vector> logFiles) { - // TODO akanksha: Implement in next PR. - co_return; + co_await updateLogBytesWritten(self, std::move(bytesPerBackup)); } +// It closes the race between getMinBackupVersion's snapshot at master-recruit time and the actual state of +// backupStartedKey when the old epoch backup worker comes up — specifically the case where backup configuration changed +// during that window so the backup worker is no longer needed. static Future shouldBackupWorkerExitEarly(BackupRangePartitionedData* self) { - // TODO akanksha: Implement in next PR. - co_return true; + while (true) { + ReadYourWritesTransaction tr(self->cx); + while (true) { + Error err; + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + Optional value = co_await tr.get(backupRangePartitionedStartedKey); + std::vector> uidVersions; + if (value.present()) { + bool shouldExit = self->endVersion.present(); + uidVersions = decodeBackupRangePartitionedStartedValue(value.get()); + TraceEvent e("BWRangePartitionedGotStartKey", self->myId); + int i = 1; + for (auto [uid, version] : uidVersions) { + e.detail(format("BackupID%d", i), uid).detail(format("Version%d", i), version); + i++; + if (shouldExit && version < self->endVersion.get()) { + shouldExit = false; + } + } + co_await onBackupChanges(self, uidVersions); + co_return shouldExit; + } + + TraceEvent("BWRangePartitionedEmptyStartKey", self->myId); + Future watchFuture = tr.watch(backupRangePartitionedStartedKey); + co_await tr.commit(); + co_await watchFuture; + break; + } catch (Error& e) { + err = e; + } + co_await tr.onError(err); + } + } } static Future monitorBackupStartedKeyChanges(BackupRangePartitionedData* self) { - // TODO akanksha: Implement in next PR. - co_return; + while (true) { + ReadYourWritesTransaction tr(self->cx); + while (true) { + Error err; + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + Optional value = co_await tr.get(backupRangePartitionedStartedKey); + std::vector> uidVersions; + if (value.present()) { + uidVersions = decodeBackupRangePartitionedStartedValue(value.get()); + TraceEvent e("BWRangePartitionedGotStartKey", self->myId); + int i = 1; + for (auto [uid, version] : uidVersions) { + e.detail(format("BackupID%d", i), uid).detail(format("Version%d", i), version); + i++; + } + } + + onBackupChanges(self, uidVersions); + Future watchFuture = tr.watch(backupRangePartitionedStartedKey); + co_await tr.commit(); + co_await watchFuture; + break; + } catch (Error& e) { + err = e; + } + co_await tr.onError(err); + } + } } +// This function is used to set backup worker's saved version latestBackupWorkerSavedVersion in BackupConfig. Future setBackupKeys(BackupRangePartitionedData* self, std::map savedLogVersions) { - // TODO akanksha: Implement in next PR. - co_return; + Reference tr(new ReadYourWritesTransaction(self->cx)); + + while (true) { + Error err; + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + std::vector>> prevBackupWorkerSavedVersions; + std::vector versionConfigs; + std::vector>> allWorkersReady; + for (const auto& [uid, version] : savedLogVersions) { + BackupConfig config(uid); + versionConfigs.emplace_back(config); + prevBackupWorkerSavedVersions.push_back(config.latestBackupWorkerSavedVersion().get(tr)); + allWorkersReady.push_back(config.allWorkerStarted().get(tr)); + } + co_await (waitForAll(prevBackupWorkerSavedVersions) && waitForAll(allWorkersReady)); + + for (int i = 0; i < prevBackupWorkerSavedVersions.size(); i++) { + if (!allWorkersReady[i].get().present() || !allWorkersReady[i].get().get()) { + continue; + } + + const Version current = savedLogVersions[versionConfigs[i].getUid()]; + if (prevBackupWorkerSavedVersions[i].get().present()) { + const Version prev = prevBackupWorkerSavedVersions[i].get().get(); + if (prev > current) { + TraceEvent(SevWarn, "BWRangePartitionedVersionInverse", self->myId) + .detail("Prev", prev) + .detail("Current", current); + } + } + if (self->backupEpoch == self->oldestBackupEpoch && + (!prevBackupWorkerSavedVersions[i].get().present() || + prevBackupWorkerSavedVersions[i].get().get() < current)) { + TraceEvent("BWRangePartitionedSetVersion", self->myId) + .detail("BackupID", versionConfigs[i].getUid()) + .detail("Version", current); + versionConfigs[i].latestBackupWorkerSavedVersion().set(tr, current); + } + } + co_await tr->commit(); + co_return; + } catch (Error& e) { + err = e; + } + co_await tr->onError(err); + } } static Future monitorWorkerPause(BackupRangePartitionedData* self) { - co_return; + Reference tr(new ReadYourWritesTransaction(self->cx)); + Future watch; + + while (true) { + Error err; + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + Optional value = co_await tr->get(backupRangePartitionedPausedKey); + bool paused = value.present() && value.get() == "1"_sr; + if (self->paused.get() != paused) { + TraceEvent(paused ? "BWRangePartitionedPaused" : "BWRangePartitionedResumed", self->myId).log(); + self->paused.set(paused); + } + + watch = tr->watch(backupRangePartitionedPausedKey); + co_await tr->commit(); + co_await watch; + tr->reset(); + continue; + } catch (Error& e) { + err = e; + } + co_await tr->onError(err); + } } Future monitorBackupRangePartitionedProgress(BackupRangePartitionedData* self) { @@ -800,8 +960,7 @@ Future monitorBackupRangePartitionedProgress(BackupRangePartitionedData* s co_await (self->changedTrigger.onTrigger() || self->logSystem.onChange()); } - // check all workers have started by checking their progress is larger - // than the backup's start version. + // Check all workers have started by checking their progress is larger than the backup's start version. Reference progress(new BackupRangePartitionedProgress(self->myId)); co_await getBackupRangePartitionedProgress(self->cx, self->myId, progress, SevDebug); @@ -828,7 +987,6 @@ Future monitorBackupRangePartitionedProgress(BackupRangePartitionedData* s } } - // TODO akanksha: Implement and explain what setBackupKeys does in next PR. Future setKeys = savedLogVersions.empty() ? Void() : setBackupKeys(self, savedLogVersions); co_await (interval && setKeys); } @@ -968,6 +1126,12 @@ Future backupWorkerRangePartitioned(BackupInterface interf, addActor.send(monitorBackupRangePartitionedProgress(&self)); } + addActor.send(monitorWorkerPause(&self)); + + // First need to call waitAndProcessPartitionMap before starting to pull data, because we need to know the + // partition assignment. + co_await waitAndProcessPartitionMap(&self); + // If the worker is on an old epoch and all backups starts a version >= the endVersion bool exitEarly = co_await shouldBackupWorkerExitEarly(&self); TraceEvent("BWRangePartitionedExitEarly", self.myId).detail("ExitEarly", exitEarly);