Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions fdbserver/core/BulkLoadUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

#include "fdbclient/BulkLoading.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/S3Client.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/core/BulkLoadUtil.h"
#include "fdbserver/core/Knobs.h"
#include "fdbserver/core/RocksDBCheckpointUtils.h"
Expand Down Expand Up @@ -181,6 +183,83 @@ Future<BulkLoadTaskState> getBulkLoadTaskStateFromDataMove(Database cx,
}
}

// Look up BulkLoadTaskState directly from bulkLoadTaskKeys by range, without going through DataMoveMetaData.
// This allows bulk load to work without SHARD_ENCODE_LOCATION_METADATA / dataMoveId.
// The function retries until the read version >= atLeastVersion and a valid task is found.
// If no task exists for the range, blocks forever (caller is expected to cancel via fetchKeys cancellation).
Future<BulkLoadTaskState> getBulkLoadTaskStateByRange(Database cx, KeyRange range, Version atLeastVersion, UID logId) {
Transaction tr(cx);
int retryCount = 0;
int metadataRetryCount = 0;
double startTime = now();
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
while (true) {
Error err;
try {
RangeResult result = co_await krmGetRanges(&tr, bulkLoadTaskPrefix, range);
ASSERT(tr.getReadVersion().isReady());
if (tr.getReadVersion().get() < atLeastVersion) {
retryCount++;
if (retryCount % 100 == 0) {
TraceEvent(SevWarn, "SSBulkLoadTaskByRangeWaitingForVersion", logId)
.detail("Range", range)
.detail("ReadVersion", tr.getReadVersion().get())
.detail("AtLeastVersion", atLeastVersion)
.detail("RetryCount", retryCount)
.detail("ElapsedSec", now() - startTime);
}
co_await delay(0.1);
tr.reset();
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
continue;
}
if (result.size() >= 2 && !result[0].value.empty()) {
BulkLoadTaskState bulkLoadTaskState = decodeBulkLoadTaskState(result[0].value);
if (bulkLoadTaskState.isValid()) {
if (metadataRetryCount > 0 || retryCount > 0) {
TraceEvent(SevInfo, "SSBulkLoadTaskByRangeGotMetadata", logId)
.detail("Range", range)
.detail("TaskID", bulkLoadTaskState.getTaskId())
.detail("TaskRange", bulkLoadTaskState.getRange())
.detail("MetadataRetryCount", metadataRetryCount)
.detail("VersionRetryCount", retryCount)
.detail("ElapsedSec", now() - startTime);
}
// The task range may not exactly match the requested range if the shard was
// split within the task's range. This is expected — the SS will only load
// keys within its own assigned range from the task's data.
if (bulkLoadTaskState.getRange() != range) {
TraceEvent(SevInfo, "SSBulkLoadTaskByRangeRangeMismatch", logId)
.detail("RequestedRange", range)
.detail("TaskRange", bulkLoadTaskState.getRange())
.detail("TaskID", bulkLoadTaskState.getTaskId());
}
co_return bulkLoadTaskState;
}
}

metadataRetryCount++;
if (metadataRetryCount % 100 == 0) {
TraceEvent(SevWarn, "SSBulkLoadTaskByRangeWaitingForMetadata", logId)
.detail("Range", range)
.detail("MetadataRetryCount", metadataRetryCount)
.detail("ElapsedSec", now() - startTime)
.detail("Message", "BulkLoadTaskState not yet written for this range");
}
co_await delay(0.1);
tr.reset();
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
continue;
} catch (Error& e) {
err = e;
}
co_await tr.onError(err);
}
}

// Return true if generated the byte sampling file. Otherwise, return false.
// TODO(BulkDump): directly read from special key space.
Future<bool> doBytesSamplingOnDataFile(std::string dataFileFullPath, // input file
Expand Down
77 changes: 71 additions & 6 deletions fdbserver/core/MoveKeys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,8 @@ static Future<Void> startMoveKeys(Database occ,
FlowLock* startMoveKeysLock,
UID relocationIntervalId,
std::map<UID, StorageServerInterface>* tssMapping,
const DDEnabledState* ddEnabledState) {
const DDEnabledState* ddEnabledState,
UID dataMoveId = UID()) {
TraceInterval interval("RelocateShard_StartMoveKeys");
Future<Void> warningLogger = logWarningAfter("StartMoveKeysTooLong", 600, servers);
// state TraceInterval waitInterval("");
Expand Down Expand Up @@ -1130,12 +1131,45 @@ static Future<Void> startMoveKeys(Database occ,
// Since we are setting this for the entire range, serverKeys and keyServers aren't guaranteed
// to have the same shard boundaries If that invariant was important, we would have to move this
// inside the loop above and also set it for the src servers
actors.push_back(krmSetRangeCoalescing(
tr, serverKeysPrefixFor(servers[i]), currentKeys, allKeys, serverKeysTrue));
Value skValue = (dataMoveId.isValid() && dataMoveId != anonymousShardId)
? serverKeysValue(dataMoveId)
: serverKeysTrue;
actors.push_back(
krmSetRangeCoalescing(tr, serverKeysPrefixFor(servers[i]), currentKeys, allKeys, skValue));
}

co_await waitForAll(actors);

// If this is the last batch of a bulk load move, update the task phase
// to Running in the same transaction for atomicity.
if (endKey >= keys.end && dataMoveId.isValid()) {
bool assigned, emptyRange;
DataMoveType dmType;
DataMovementReason dmReason;
decodeDataMoveId(dataMoveId, assigned, emptyRange, dmType, dmReason);
if (dmType == DataMoveType::LOGICAL_BULKLOAD || dmType == DataMoveType::PHYSICAL_BULKLOAD) {
RangeResult result = co_await krmGetRanges(tr, bulkLoadTaskPrefix, keys);
if (result.size() >= 2 && !result[0].value.empty()) {
BulkLoadTaskState taskState = decodeBulkLoadTaskState(result[0].value);
if (taskState.isValid() && (taskState.phase == BulkLoadPhase::Triggered ||
taskState.phase == BulkLoadPhase::Running)) {
taskState.phase = BulkLoadPhase::Running;
taskState.setDataMoveId(dataMoveId);
taskState.startTime = now();
co_await krmSetRange(tr,
bulkLoadTaskPrefix,
taskState.getRange(),
bulkLoadTaskStateValue(taskState));
TraceEvent(SevInfo, "StartMoveKeysBulkLoadSetRunning", relocationIntervalId)
.detail("DataMoveID", dataMoveId)
.detail("TaskID", taskState.getTaskId())
.detail("TaskRange", taskState.getRange())
.detail("MoveRange", keys);
}
}
}
}

co_await tr->commit();
counters->committed->increment(1);

Expand Down Expand Up @@ -1290,7 +1324,8 @@ static Future<Void> finishMoveKeys(Database occ,
bool hasRemote,
UID relocationIntervalId,
std::map<UID, StorageServerInterface> tssMapping,
const DDEnabledState* ddEnabledState) {
const DDEnabledState* ddEnabledState,
UID dataMoveId = UID()) {
TraceInterval interval("RelocateShard_FinishMoveKeys");
TraceInterval waitInterval("");
Future<Void> warningLogger = logWarningAfter("FinishMoveKeysTooLong", 600, destinationTeam);
Expand Down Expand Up @@ -1593,6 +1628,34 @@ static Future<Void> finishMoveKeys(Database occ,
}

co_await waitForAll(actors);

// If this is the last batch of a bulk load move, update the task phase
// to Complete in the same transaction for atomicity.
if (endKey >= keys.end && dataMoveId.isValid()) {
bool assigned, emptyRange;
DataMoveType dmType;
DataMovementReason dmReason;
decodeDataMoveId(dataMoveId, assigned, emptyRange, dmType, dmReason);
if (dmType == DataMoveType::LOGICAL_BULKLOAD || dmType == DataMoveType::PHYSICAL_BULKLOAD) {
RangeResult result = co_await krmGetRanges(&tr, bulkLoadTaskPrefix, keys);
if (result.size() >= 2 && !result[0].value.empty()) {
BulkLoadTaskState taskState = decodeBulkLoadTaskState(result[0].value);
if (taskState.isValid() && taskState.phase == BulkLoadPhase::Running) {
taskState.phase = BulkLoadPhase::Complete;
co_await krmSetRange(&tr,
bulkLoadTaskPrefix,
taskState.getRange(),
bulkLoadTaskStateValue(taskState));
TraceEvent(SevInfo, "FinishMoveKeysBulkLoadSetComplete", relocationIntervalId)
.detail("DataMoveID", dataMoveId)
.detail("TaskID", taskState.getTaskId())
.detail("TaskRange", taskState.getRange())
.detail("MoveRange", keys);
}
}
}
}

co_await tr.commit();
counters->committed->increment(1);

Expand Down Expand Up @@ -3324,7 +3387,8 @@ Future<Void> rawStartMovement(Database occ,
params.startMoveKeysParallelismLock,
params.relocationIntervalId,
&tssMapping,
params.ddEnabledState);
params.ddEnabledState,
params.dataMoveId);
}

Future<Void> rawCheckFetchingState(const Database& cx,
Expand Down Expand Up @@ -3376,7 +3440,8 @@ Future<Void> rawFinishMovement(Database occ,
params.hasRemote,
params.relocationIntervalId,
tssMapping,
params.ddEnabledState);
params.ddEnabledState,
params.dataMoveId);
}

Future<Void> moveKeys(Database occ, MoveKeysParams params) {
Expand Down
4 changes: 4 additions & 0 deletions fdbserver/core/include/fdbserver/core/BulkLoadUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ Future<BulkLoadTaskState> getBulkLoadTaskStateFromDataMove(Database cx,
Version atLeastVersion,
UID logId);

// Look up BulkLoadTaskState directly from bulkLoadTaskKeys by range.
// Does not require DataMoveMetaData or a valid dataMoveId.
Future<BulkLoadTaskState> getBulkLoadTaskStateByRange(Database cx, KeyRange range, Version atLeastVersion, UID logId);

Future<BulkLoadFileSet> bulkLoadDownloadTaskFileSet(BulkLoadTransportMethod transportMethod,
BulkLoadFileSet fromRemoteFileSet,
std::string toLocalRoot,
Expand Down
70 changes: 68 additions & 2 deletions fdbserver/datadistributor/DDRelocationQueue.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1181,13 +1181,21 @@ void DDQueue::launchQueuedWork(std::set<RelocateData, std::greater<RelocateData>
.detail("DataMoveReason", static_cast<int>(rrs.dmReason));
}
} else {
rrs.dataMoveId = anonymousShardId;
if (rrs.bulkLoadTask.present()) {
// Bulk load needs a proper dataMoveId to encode LOGICAL_BULKLOAD type,
// even without SHARD_ENCODE_LOCATION_METADATA. Defer assignment until
// after prevCleanup in dataDistributionRelocator.
rrs.dataMoveId = UID();
} else {
rrs.dataMoveId = anonymousShardId;
}
TraceEvent(SevInfo, "NewDataMoveWithAnonymousDestID", this->distributorId)
.detail("DataMoveID", rrs.dataMoveId.toString())
.detail("TrackID", rrs.randomId)
.detail("Range", rrs.keys)
.detail("Reason", rrs.reason.toString())
.detail("DataMoveReason", static_cast<int>(rrs.dmReason));
.detail("DataMoveReason", static_cast<int>(rrs.dmReason))
.detail("BulkLoad", rrs.bulkLoadTask.present());
}
}

Expand Down Expand Up @@ -1587,6 +1595,64 @@ Future<Void> dataDistributionRelocator(DDQueue* self,
// Currently, the relocator is triggered based on the inflightActor info.
// In future, we should assert at here that the intersecting DataMove in self->dataMoves
// are all invalid. i.e. the range of new relocators is match to the range of prevCleanup.
} else if (doBulkLoading) {
// Bulk load without SHARD_ENCODE_LOCATION_METADATA: validate task and assign dataMoveId.
// Set cancellable=false before co_await prevCleanup to prevent DDQueueValidateError13:
// the DD validation loop checks that any range marked cancellable in inFlight has a live
// relocator actor. During prevCleanup, there's a window where the old actor is cancelled
// but this actor hasn't progressed past the wait — without this, the validation would see
// a cancellable entry with no live actor and fire the error.
auto inFlightRange = self->inFlight.rangeContaining(rd.keys.begin);
ASSERT(inFlightRange.range() == rd.keys);
inFlightRange.value().cancellable = false;

co_await prevCleanup;

Transaction tr(self->txnProcessor->context());
while (true) {
Error innerErr;
try {
BulkLoadTaskState currentBulkLoadTaskState =
co_await getBulkLoadTask(&tr,
rd.bulkLoadTask.get().coreState.getRange(),
rd.bulkLoadTask.get().coreState.getTaskId(),
{ BulkLoadPhase::Triggered, BulkLoadPhase::Running });
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskDataMoveLaunched", self->distributorId)
.detail("TrackID", rd.randomId)
.detail("DataMovePriority", rd.priority)
.detail("JobID", rd.bulkLoadTask.get().coreState.getJobId())
.detail("TaskID", rd.bulkLoadTask.get().coreState.getTaskId())
.detail("TaskRange", rd.bulkLoadTask.get().coreState.getRange());
break;
} catch (Error& e) {
innerErr = e;
}
if (innerErr.code() == error_code_bulkload_task_outdated) {
if (rd.bulkLoadTask.get().completeAck.canBeSet()) {
rd.bulkLoadTask.get().completeAck.sendError(bulkload_task_outdated());
}
doBulkLoading = false;
TraceEvent(SevWarn, "DDBulkLoadTaskFallbackToNormalDataMove", self->distributorId)
.detail("TrackID", rd.randomId)
.detail("DataMovePriority", rd.priority)
.detail("JobID", rd.bulkLoadTask.get().coreState.getJobId())
.detail("TaskID", rd.bulkLoadTask.get().coreState.getTaskId())
.detail("TaskRange", rd.bulkLoadTask.get().coreState.getRange());
break;
}
co_await tr.onError(innerErr);
}
DataMoveType dataMoveType = newDataMoveType(doBulkLoading);
rd.dataMoveId = newDataMoveId(
deterministicRandom()->randomUInt64(), AssignEmptyRange::False, dataMoveType, rd.dmReason);
TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskNewDataMoveID", self->distributorId)
.detail("DataMoveID", rd.dataMoveId.toString())
.detail("TrackID", rd.randomId)
.detail("Range", rd.keys)
.detail("Priority", rd.priority)
.detail("DataMoveType", dataMoveType)
.detail("DoBulkLoading", doBulkLoading)
.detail("DataMoveReason", static_cast<int>(rd.dmReason));
}

Optional<StorageMetrics> parentMetrics;
Expand Down
13 changes: 5 additions & 8 deletions fdbserver/datadistributor/DataDistribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,9 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
}

std::vector<Key> customBoundaries;
if (bulkLoadIsEnabled(self->initData->bulkLoadMode)) {
// Bulk load does not allow boundary change
if (bulkLoadIsEnabled(self->initData->bulkLoadMode) && SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
// When SHARD_ENCODE_LOCATION_METADATA is enabled, bulk load uses the startMoveShards path
// which manages boundaries differently. Skip custom boundary enforcement in that case.
TraceEvent(SevInfo, "DDInitCustomRangeConfigDisabledByBulkLoadMode", self->ddId);
} else {
for (auto it : self->initData->userRangeConfig->ranges()) {
Expand Down Expand Up @@ -2219,12 +2220,8 @@ Future<Void> monitorBulkLoadModeAndSpawnActors(Reference<DataDistributor> self,
co_return;
}

// Only monitor if SHARD_ENCODE_LOCATION_METADATA is enabled (required for bulkload)
if (!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
TraceEvent(SevInfo, "DDBulkLoadModeMonitorSkipped", self->ddId)
.detail("Reason", "SHARD_ENCODE_LOCATION_METADATA is disabled");
co_return;
}
// Bulk load no longer requires SHARD_ENCODE_LOCATION_METADATA.
// SS looks up bulk load tasks directly by range from bulkLoadTaskKeys.

Database cx = self->txnProcessor->context();
Transaction tr(cx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ struct DDBulkLoadEngineTask {
};

inline bool bulkLoadIsEnabled(int bulkLoadModeValue) {
return SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && bulkLoadModeValue == 1;
return bulkLoadModeValue == 1;
}

inline bool bulkDumpIsEnabled(int bulkDumpModeValue) {
Expand Down
Loading