Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 26 additions & 64 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const errors = require('arsenal').errors;
const { replicationBackends, emptyFileMd5 } = require('arsenal').constants;
const MongoClient = require('arsenal').storage
.metadata.mongoclient.MongoClientInterface;
const { ObjectMD } = require('arsenal').models;
const { ObjectMD, ReplicationConfiguration } = require('arsenal').models;
const { VersionID } = require('arsenal').versioning;
const { extractVersionId } = require('../../lib/util/versioning');

Expand Down Expand Up @@ -234,24 +234,6 @@ class MongoQueueProcessor {
});
}

/**
* get dataStoreVersionId, if exists
* @param {ObjectMDData} objMd - object md fetched from mongo
* @param {String} site - storage location name
* @return {String} dataStoreVersionId
*/
_getDataStoreVersionId(objMd, site) {
let dataStoreVersionId = '';
if (objMd.replicationInfo && objMd.replicationInfo.backends) {
const backend = objMd.replicationInfo.backends
.find(l => l.site === site);
if (backend && backend.dataStoreVersionId) {
dataStoreVersionId = backend.dataStoreVersionId;
}
}
return dataStoreVersionId;
}

/**
* Update ingested entry metadata fields: owner-id, owner-display-name
* @param {ObjectQueueEntry} entry - object queue entry object
Expand Down Expand Up @@ -340,53 +322,33 @@ class MongoQueueProcessor {
const objectMDModel = new ObjectMD();
entry.setReplicationInfo(objectMDModel.getReplicationInfo());

// TODO: refactor based off cloudserver getReplicationInfo
if (bucketRepInfo) {
const { role, destination, rules } = bucketRepInfo;
const rule = rules.find(r =>
(entry.getObjectKey().startsWith(r.prefix) && r.enabled));

if (rule) {
const replicationInfo = {};
const storageTypes = [];
const backends = [];
const storageClasses = rule.storageClass.split(',');

storageClasses.forEach(storageClass => {
const storageClassName =
storageClass.endsWith(':preferred_read') ?
storageClass.split(':')[0] : storageClass;
const location = this._bootstrapList.find(l =>
(l.site === storageClassName));
if (location && replicationBackends[location.type]) {
storageTypes.push(location.type);
}
let dataStoreVersionId = '';
if (zenkoObjMd) {
dataStoreVersionId = this._getDataStoreVersionId(
zenkoObjMd, storageClassName);
}
backends.push({
site: storageClassName,
status: 'PENDING',
dataStoreVersionId,
});
});
if (!bucketRepInfo) {
return;
}

// save updated replication info
replicationInfo.status = 'PENDING';
replicationInfo.backends = backends;
replicationInfo.content = content;
replicationInfo.destination = destination;
replicationInfo.storageClass = storageClasses.join(',');
replicationInfo.role = role;
replicationInfo.storageType = storageTypes.join(',');
replicationInfo.isNFS = bucketInfo.isNFS();

// apply changes
entry.setReplicationInfo(replicationInfo);
}
const isCloud = site => {
const location = this._bootstrapList.find(l => l.site === site);
return !!(location && replicationBackends[location.type]);
};

const backends = ReplicationConfiguration.resolveBackends(
bucketRepInfo,
entry.getObjectKey(),
isCloud,
zenkoObjMd?.replicationInfo?.backends,
);

if (backends.length === 0) {
return;
}

entry.setReplicationInfo({
status: 'PENDING',
backends,
content,
role: ReplicationConfiguration.resolveSourceRole(bucketRepInfo.role),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _updateReplicationInfo call on line 349 sets role: ReplicationConfiguration.resolveSourceRole(bucketRepInfo.role) but doesn't set destination or storageClass in the replication info object. The old code set both destination and storageClass. If these fields are now handled inside ReplicationConfiguration.resolveBackends at the backend level, this is fine — but please confirm the downstream consumers (queue populator, status processor) don't rely on the top-level destination / storageClass fields being present.

— Claude Code

isNFS: bucketInfo.isNFS(),
});
}

/**
Expand Down
89 changes: 61 additions & 28 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const async = require('async');
const { callbackify } = require('util');
const { EventEmitter } = require('events');
const Redis = require('ioredis');
const schedule = require('node-schedule');
Expand All @@ -18,8 +19,6 @@ const QueueEntry = require('../../../lib/models/QueueEntry');
const TaskScheduler = require('../../../lib/tasks/TaskScheduler');
const { getTaskSchedulerQueueKey,
getTaskSchedulerDedupeKey } = require('./taskSchedulerHelpers');
const getLocationsFromStorageClass =
require('../utils/getLocationsFromStorageClass');
const ReplicateObject = require('../tasks/ReplicateObject');
const MultipleBackendTask = require('../tasks/MultipleBackendTask');
const CopyLocationTask = require('../tasks/CopyLocationTask');
Expand Down Expand Up @@ -741,7 +740,7 @@ class QueueProcessor extends EventEmitter {
}
this._consumer = this._createConsumer(
this.topic,
this.processReplicationEntry.bind(this), options);
callbackify(this.processReplicationEntry.bind(this)), options);
return this._consumer.once('canary', done);
},
done => {
Expand Down Expand Up @@ -864,41 +863,75 @@ class QueueProcessor extends EventEmitter {
* @param {function} done - callback function
* @return {undefined}
*/
processReplicationEntry(kafkaEntry, done) {
async processReplicationEntry(kafkaEntry) {
const sourceEntry = QueueEntry.createFromKafkaEntry(kafkaEntry);

if (sourceEntry.error) {
this.logger.error('error processing replication entry', { error: sourceEntry.error });
return process.nextTick(() => done(errors.InternalError));
throw errors.InternalError;
}

if (sourceEntry.skip) {
// skip message, noop
return process.nextTick(done);
return;
}
let task;

const logSkip = () => {
this.logger.debug('skip replication entry', { entry: sourceEntry.getLogInfo() });
};

// Route Bucket Queue Entries
if (sourceEntry instanceof BucketQueueEntry) {
if (this.echoMode) {
task = new EchoBucket(this);
}
// ignore bucket entry if echo mode disabled
} else if (sourceEntry instanceof ObjectQueueEntry) {
const replicationStorageClass =
sourceEntry.getReplicationStorageClass();
const sites = getLocationsFromStorageClass(replicationStorageClass);
if (sites.includes(this.site)) {
if (this.destConfig.replicationEndpoint &&
replicationBackends.includes(this.destConfig.replicationEndpoint.type)) {
task = new MultipleBackendTask(this);
} else {
task = new ReplicateObject(this);
}
if (!this.echoMode) {
logSkip();
return;
}
}
if (task) {

this.logger.debug('replication entry is being pushed', { entry: sourceEntry.getLogInfo() });
return this.taskScheduler.push({ task, entry: sourceEntry, kafkaEntry }, done);
await new Promise((resolve, reject) => {
this.taskScheduler.push({ task: new EchoBucket(this), entry: sourceEntry, kafkaEntry },
err => err ? reject(err) : resolve());
});
return;
}

// Route Object Queue Entries
if (!(sourceEntry instanceof ObjectQueueEntry)) {
logSkip();
return;
}

const pendingBackends = sourceEntry.getReplicationBackends()
.filter(b => b.status === 'PENDING' && b.site === this.site);

if (pendingBackends.length === 0) {
logSkip();
return;
}

const endpointType = this.destConfig.replicationEndpoint?.type;
const isCloud = endpointType && replicationBackends.includes(endpointType);

// Sequential loop over pending backends
for (const backend of pendingBackends) {
const task = new (isCloud ? MultipleBackendTask : ReplicateObject)(this);
const perBackendEntry = sourceEntry.clone()
.setSite(backend.site)
.setDestination(backend.destination)
.setRole(backend.role);

this.logger.debug('replication entry is being pushed', {
entry: perBackendEntry.getLogInfo(),
destination: backend.destination,
role: backend.role,
});

await new Promise((resolve, reject) => {
this.taskScheduler.push(
{ task, entry: perBackendEntry, kafkaEntry },
err => err ? reject(err) : resolve(),
);
});
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequential for...of loop processes backends one at a time. If one backend's taskScheduler.push rejects, the await will throw and subsequent backends will be skipped entirely (the error propagates to callbackify which calls done(err)). This means a transient failure replicating to destination A will prevent replication to destinations B and C for this Kafka entry. Consider whether each backend should be attempted independently (catching errors per-backend) so one failure doesn't block the others.

— Claude Code

this.logger.debug('skip replication entry', { entry: sourceEntry.getLogInfo() });
return process.nextTick(done);
}

/**
Expand Down
19 changes: 10 additions & 9 deletions extensions/replication/tasks/MultipleBackendTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class MultipleBackendTask extends ReplicateObject {
.then(data => {
const replicationEnabled = data.ReplicationConfiguration.Rules
.some(rule => rule.Status === 'Enabled' &&
entry.getObjectKey().startsWith(rule.Prefix));
entry.getObjectKey().startsWith(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MultipleBackendTask._setupRolesOnce still calls entry.getReplicationRoles() without passing the backend key, while the parent ReplicateObject._setupRolesOnce was updated to entry.getReplicationRoles(entry.getBackendKey()). If arsenal's getReplicationRoles(key) resolves a per-backend role when a key is provided, this cloud-backend path will silently use the top-level (legacy) role string instead of the per-backend one, defeating the purpose of this PR for cloud destinations.

— Claude Code

rule.Filter?.Prefix ?? rule.Prefix ?? ''));
if (!replicationEnabled) {
errMessage = 'replication disabled for object';
log.debug(errMessage, {
Expand Down Expand Up @@ -317,7 +318,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendAbortMPUCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several calls in MultipleBackendTask.processQueueEntry still use this.site (a plain string) for per-backend lookups instead of the key object:\n- Line 1220: refreshedEntry.getReplicationSiteStatus(this.site) should be refreshedEntry.getReplicationSiteStatus(sourceEntry.getBackendKey())\n- Line 1240: sourceEntry.getReplicationSiteDataStoreVersionId(this.site) should also use the backend key\n\nThese were not updated to match the pattern applied in ReplicateObject and UpdateReplicationStatus. If arsenal's methods now support the key object, this will miss per-backend state.

— Claude Code

StorageClass: this.site,
UploadId: uploadId,
RequestUids: log.getSerializedUids(),
Expand Down Expand Up @@ -353,7 +354,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendCompleteMPUCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
VersionId: sourceEntry.getEncodedVersionId() || 'null',
UserMetaData: sourceEntry.getUserMetadata(),
Expand Down Expand Up @@ -445,7 +446,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendPutMPUPartCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
PartNumber: partNumber,
UploadId: uploadId,
Expand Down Expand Up @@ -506,7 +507,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendInitiateMPUCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
VersionId: sourceEntry.getEncodedVersionId() || 'null',
UserMetaData: sourceEntry.getUserMetadata(),
Expand Down Expand Up @@ -951,7 +952,7 @@ class MultipleBackendTask extends ReplicateObject {
Key: sourceEntry.getObjectKey(),
CanonicalID: sourceEntry.getOwnerId(),
ContentMD5: sourceEntry.getContentMd5(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
VersionId: sourceEntry.getEncodedVersionId() || 'null',
UserMetaData: sourceEntry.getUserMetadata(),
Expand Down Expand Up @@ -1008,7 +1009,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendPutObjectTaggingCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
DataStoreVersionId:
sourceEntry.getReplicationSiteDataStoreVersionId(this.site),
Expand Down Expand Up @@ -1065,7 +1066,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendDeleteObjectTaggingCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
DataStoreVersionId:
sourceEntry.getReplicationSiteDataStoreVersionId(this.site),
Expand Down Expand Up @@ -1146,7 +1147,7 @@ class MultipleBackendTask extends ReplicateObject {
const command = new MultipleBackendDeleteObjectCommand({
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
StorageType: sourceEntry.getReplicationStorageType(),
StorageType: this._getReplicationEndpointType(),
StorageClass: this.site,
RequestUids: log.getSerializedUids(),
});
Expand Down
Loading
Loading