diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java index 1e6e270b01f..ccbb019e50f 100755 --- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java @@ -522,6 +522,9 @@ public static class Replicator { /** Replica is absent on the node and the node is not in assignments for this replica. */ public static final int REPLICA_ABSENT_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 11); + + /** Node is overloaded: in-flight partition operation byte limit reached. */ + public static final int REPLICA_OVERLOADED_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 12); } /** Storage error group. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/lang/ReplicaOverloadedException.java b/modules/core/src/main/java/org/apache/ignite/internal/lang/ReplicaOverloadedException.java new file mode 100644 index 00000000000..5f960d53c07 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/lang/ReplicaOverloadedException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lang; + +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_OVERLOADED_ERR; + +/** + * Thrown when the node has reached the in-flight partition operation byte limit + * ({@code replication.partitionOperationHeapUsagePercent}) and cannot accept new requests. + */ +public class ReplicaOverloadedException extends IgniteInternalException { + private static final long serialVersionUID = -6023736883539658779L; + + /** Constructor. */ + public ReplicaOverloadedException() { + super(REPLICA_OVERLOADED_ERR, "Node is overloaded: in-flight partition operation byte limit reached."); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionOperationInflightLimiter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionOperationInflightLimiter.java new file mode 100644 index 00000000000..f9bf0c4bde4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionOperationInflightLimiter.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntSupplier; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.IgniteThrottledLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.jetbrains.annotations.Nullable; + +/** + * Limits the total in-flight bytes of partition operations (queued or executing) across the replica manager and thin-client connector. + * + *

The byte limit is computed as a percentage of the JVM heap ({@code Runtime.getRuntime().maxMemory()}). + * When the heap percentage is zero or less, all operations are permitted unconditionally. + * + *

{@link #tryAcquire(int)} returns {@code false} once adding {@code messageBytes} would exceed the limit. + * A permit must be released via {@link #release(int)} when the operation completes. + */ +public class PartitionOperationInflightLimiter { + + /** Byte limit computed from heap percentage; {@code 0} means unlimited. */ + private volatile long byteLimit; + + private final @Nullable IntSupplier heapPercentSupplier; + + private volatile boolean initialized; + + /** Running total of in-flight bytes. */ + private final AtomicLong inFlightBytes = new AtomicLong(); + + private final IgniteLogger log = Loggers.forClass(PartitionOperationInflightLimiter.class); + + private final IgniteThrottledLogger throttledLog = Loggers.toThrottledLogger(log); + + /** + * Constructor. + * + * @param heapPercent Percentage of max JVM heap to use as the in-flight byte limit. Zero or negative disables the limit. + */ + public PartitionOperationInflightLimiter(int heapPercent) { + this.byteLimit = computeByteLimit(heapPercent); + this.heapPercentSupplier = null; + this.initialized = true; + } + + /** + * Constructor with a lazy supplier of the heap percentage. + * + * @param heapPercentSupplier Supplier of heap percentage (0 or less disables the limit). Called at most once, on first use. + */ + public PartitionOperationInflightLimiter(@Nullable IntSupplier heapPercentSupplier) { + this.heapPercentSupplier = heapPercentSupplier; + this.initialized = false; + } + + /** + * Attempts to reserve {@code messageBytes} in-flight bytes. + * + * @param messageBytes Number of bytes to reserve. + * @return {@code true} if the reservation was made or the limit is disabled; {@code false} if adding the bytes would exceed the limit. + */ + public boolean tryAcquire(int messageBytes) { + long limit = resolvedByteLimit(); + + if (limit <= 0) { + return true; + } + + while (true) { + long current = inFlightBytes.get(); + + if (current + messageBytes > limit) { + throttledLog.error("The node is overloaded, cannot permit partition operation requiring {} bytes", messageBytes); + return false; + } + + if (inFlightBytes.compareAndSet(current, current + messageBytes)) { + return true; + } + } + } + + /** + * Releases previously reserved in-flight bytes. + * Must only be called after a successful {@link #tryAcquire(int)}. + * + * @param messageBytes Number of bytes to release. + */ + public void release(int messageBytes) { + long limit = resolvedByteLimit(); + + if (limit > 0) { + inFlightBytes.addAndGet(-messageBytes); + } + } + + private long resolvedByteLimit() { + if (initialized) { + return byteLimit; + } + synchronized (this) { + if (initialized) { + return byteLimit; + } + if (heapPercentSupplier != null) { + byteLimit = computeByteLimit(heapPercentSupplier.getAsInt()); + } + initialized = true; + } + return byteLimit; + } + + private static long computeByteLimit(int heapPercent) { + if (heapPercent <= 0) { + return 0; + } + return (long) (heapPercent / 100.0 * Runtime.getRuntime().maxMemory()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/PartitionOperationInFlightLimiterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/PartitionOperationInFlightLimiterTest.java new file mode 100644 index 00000000000..83705ed4b31 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/PartitionOperationInFlightLimiterTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +class PartitionOperationInFlightLimiterTest { + private static final long MAX_MEMORY = Runtime.getRuntime().maxMemory(); + + @Test + void zeroHeapPercentAlwaysPermits() { + var limiter = new PartitionOperationInflightLimiter(0); + + for (int i = 0; i < 100; i++) { + assertTrue(limiter.tryAcquire(1000)); + } + } + + @Test + void negativeHeapPercentAlwaysPermits() { + var limiter = new PartitionOperationInflightLimiter(-1); + + for (int i = 0; i < 100; i++) { + assertTrue(limiter.tryAcquire(1000)); + } + } + + @Test + void acquireFailsWhenByteLimitExceeded() { + // Use 10% heap limit. + var limiter = new PartitionOperationInflightLimiter(10); + long limit = (long) (0.10 * MAX_MEMORY); + + // A single chunk that exceeds the limit should be rejected. + assertFalse(limiter.tryAcquire((int) Math.min(limit + 1, Integer.MAX_VALUE))); + } + + @Test + void acquireSucceedsUpToLimit() { + var limiter = new PartitionOperationInflightLimiter(10); + long limit = (long) (0.10 * MAX_MEMORY); + + // Chunk size that fits within the limit. + int chunkBytes = (int) Math.min(limit / 2, Integer.MAX_VALUE / 2); + + assertTrue(limiter.tryAcquire(chunkBytes)); + assertTrue(limiter.tryAcquire(chunkBytes)); + } + + @Test + void releaseRestoresBudget() { + var limiter = new PartitionOperationInflightLimiter(10); + long limit = (long) (0.10 * MAX_MEMORY); + int chunkBytes = (int) Math.min(limit / 2, Integer.MAX_VALUE / 2); + + assertTrue(limiter.tryAcquire(chunkBytes)); + assertTrue(limiter.tryAcquire(chunkBytes)); + // Now at or near limit; another chunk should fail. + assertFalse(limiter.tryAcquire(chunkBytes)); + + limiter.release(chunkBytes); + + assertTrue(limiter.tryAcquire(chunkBytes)); + } + + @Test + void releaseOnZeroLimitIsNoOp() { + var limiter = new PartitionOperationInflightLimiter(0); + + // Should not throw. + limiter.release(1000); + + assertTrue(limiter.tryAcquire(1000)); + } + + @Test + void supplierConstructorInitializesLazily() { + int[] callCount = {0}; + + // 100% heap — effectively unlimited for this test. + var limiter = new PartitionOperationInflightLimiter(() -> { + callCount[0]++; + return 100; + }); + + assertTrue(callCount[0] == 0, "supplier should not be called at construction time"); + + assertTrue(limiter.tryAcquire(1)); + assertTrue(callCount[0] == 1, "supplier should be called exactly once"); + + assertTrue(limiter.tryAcquire(1)); + assertTrue(callCount[0] == 1, "supplier should not be called again"); + } + + @Test + void supplierConstructorWithZeroPercentAlwaysPermits() { + var limiter = new PartitionOperationInflightLimiter(() -> 0); + + for (int i = 0; i < 100; i++) { + assertTrue(limiter.tryAcquire(1000)); + } + } + + @Test + void multipleReleasesRestoreBudget() { + var limiter = new PartitionOperationInflightLimiter(10); + long limit = (long) (0.10 * MAX_MEMORY); + int chunkBytes = (int) Math.min(limit / 4, Integer.MAX_VALUE / 4); + + // Acquire 4 chunks. + for (int i = 0; i < 4; i++) { + assertTrue(limiter.tryAcquire(chunkBytes), "acquire " + i + " should succeed"); + } + + // Release all. + for (int i = 0; i < 4; i++) { + limiter.release(chunkBytes); + } + + // Should be able to acquire again. + for (int i = 0; i < 4; i++) { + assertTrue(limiter.tryAcquire(chunkBytes), "re-acquire " + i + " should succeed after release"); + } + } +} diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 8696887c30e..9d00163d55e 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -260,6 +260,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.network.NetworkAddress; @@ -1487,6 +1488,7 @@ private class Node { Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriver, threadPoolsManager.partitionOperationsExecutor(), + new PartitionOperationInflightLimiter(0), partitionIdleSafeTimePropagationPeriodMsSupplier, new NoOpFailureManager(), new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()), diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java index 07e033f7ee4..037f938d41e 100644 --- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java +++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java @@ -283,6 +283,28 @@ public TypeSpec generateMessageImpl(MessageClass message, TypeSpec builderInterf messageImpl.addMethod(messageTypeMethod); + // messageSize field with getter/setter (overrides NetworkMessage.getMessageSize()). + // Declared transient so it does not affect serialVersionUID computation and is not included in Java serialization. + FieldSpec messageSizeField = FieldSpec.builder(int.class, "messageSize") + .addModifiers(Modifier.PRIVATE, Modifier.TRANSIENT) + .addAnnotation(IgniteToStringExclude.class) + .build(); + + messageImpl.addField(messageSizeField); + + messageImpl.addMethod(MethodSpec.methodBuilder("getMessageSize") + .addAnnotation(Override.class) + .addModifiers(Modifier.PUBLIC) + .returns(int.class) + .addStatement("return $N", messageSizeField) + .build()); + + messageImpl.addMethod(MethodSpec.methodBuilder("setMessageSize") + .addModifiers(Modifier.PUBLIC) + .addParameter(int.class, "messageSize") + .addStatement("this.messageSize = messageSize") + .build()); + // equals and hashCode generateEqualsAndHashCode(messageImpl, message); diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/NetworkMessage.java b/modules/network-api/src/main/java/org/apache/ignite/internal/network/NetworkMessage.java index 81ef928d3fd..42b898fd63d 100644 --- a/modules/network-api/src/main/java/org/apache/ignite/internal/network/NetworkMessage.java +++ b/modules/network-api/src/main/java/org/apache/ignite/internal/network/NetworkMessage.java @@ -80,4 +80,19 @@ default boolean needAck() { default String toStringForLightLogging() { return getClass().getName(); } + + /** + * Returns the wire size of this message in bytes (header + body). + * Returns {@code 0} if the message was not received from the network (e.g. created locally). + */ + default int getMessageSize() { + return 0; + } + + /** + * Sets the wire size of this message in bytes. Called by the inbound decoder after decoding. + */ + default void setMessageSize(int messageSize) { + // No-op for messages not received from the network. + } } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java index e4096e7662d..1de25fdf1ea 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java @@ -52,6 +52,9 @@ public class InboundDecoder extends ByteToMessageDecoder { /** Message group type, for partially read message headers. */ private static final AttributeKey GROUP_TYPE_KEY = AttributeKey.valueOf("GROUP_TYPE"); + /** Bytes consumed so far for the current message (header + body), accumulated across partial reads. */ + private static final AttributeKey MESSAGE_SIZE_KEY = AttributeKey.valueOf("MESSAGE_SIZE"); + private final MessageFormat messageFormat; /** Serialization service. */ @@ -84,6 +87,8 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { Attribute groupTypeAttr = ctx.channel().attr(GROUP_TYPE_KEY); + Attribute messageSizeAttr = ctx.channel().attr(MESSAGE_SIZE_KEY); + reader.setBuffer(buffer); while (buffer.hasRemaining()) { @@ -100,7 +105,8 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { groupType = reader.readHeaderShort(); if (!reader.isLastRead()) { - fixNettyBufferReaderIndex(in, buffer, initialNioBufferPosition); + int readBytes = fixNettyBufferReaderIndex(in, buffer, initialNioBufferPosition); + messageSizeAttr.set(orZero(messageSizeAttr.get()) + readBytes); break; } @@ -111,7 +117,8 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { if (!reader.isLastRead()) { groupTypeAttr.set(groupType); - fixNettyBufferReaderIndex(in, buffer, initialNioBufferPosition); + int readBytes = fixNettyBufferReaderIndex(in, buffer, initialNioBufferPosition); + messageSizeAttr.set(readBytes + orZero(messageSizeAttr.get())); break; } @@ -133,18 +140,23 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { int readBytes = fixNettyBufferReaderIndex(in, buffer, initialNioBufferPosition); if (finished) { + int totalMessageSize = orZero(messageSizeAttr.get()) + readBytes; + reader.reset(); deserializerAttr.set(null); + messageSizeAttr.set(null); NetworkMessage message = deserializer.getMessage(); if (message instanceof ClassDescriptorListMessage) { onClassDescriptorMessage((ClassDescriptorListMessage) message); } else { + message.setMessageSize(totalMessageSize); out.add(message); } } else { deserializerAttr.set(deserializer); + messageSizeAttr.set(orZero(messageSizeAttr.get()) + readBytes); } if (readBytes == 0) { @@ -175,4 +187,8 @@ private static int fixNettyBufferReaderIndex(ByteBuf in, ByteBuffer buffer, int private void onClassDescriptorMessage(ClassDescriptorListMessage msg) { serializationService.mergeDescriptors(msg.messages()); } + + private static int orZero(Integer value) { + return value == null ? 0 : value; + } } diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java index 3f87efd70c8..422722ce4a4 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java @@ -239,6 +239,83 @@ public void testPartialReadWithReuseBuffer() throws Exception { assertFalse(channel.finish()); } + /** + * Tests that the wire size reported by {@link NetworkMessage#getMessageSize()} matches the number of bytes + * actually written by the serializer for the same message. + */ + @Test + public void testMessageSizeIsTracked() { + var msg = new TestMessagesFactory().testMessage().msg("hello").build(); + + var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class)); + var perSessionSerializationService = new PerSessionSerializationService(serializationService); + var channel = new EmbeddedChannel(new InboundDecoder(messageFormat, perSessionSerializationService)); + + MessageWriter writer = messageFormat.writer(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION); + MessageSerializer serializer = registry.createSerializer(msg.groupType(), msg.messageType()); + + ByteBuffer buf = ByteBuffer.allocate(10_000); + writer.setBuffer(buf); + serializer.writeMessage(msg, writer); + buf.flip(); + int expectedSize = buf.limit(); + + ByteBuf buffer = allocator.buffer(expectedSize); + buffer.writeBytes(buf); + channel.writeInbound(buffer); + + NetworkMessage received = channel.readInbound(); + + assertFalse(channel.finish()); + + assertEquals(expectedSize, received.getMessageSize()); + } + + /** + * Tests that the wire size is tracked correctly even when the message arrives in multiple chunks. + */ + @Test + public void testMessageSizeIsTrackedAcrossPartialReads() throws Exception { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + var channel = new EmbeddedChannel(); + Mockito.doReturn(channel).when(ctx).channel(); + + var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class)); + var perSessionSerializationService = new PerSessionSerializationService(serializationService); + var decoder = new InboundDecoder(messageFormat, perSessionSerializationService); + var list = new ArrayList<>(); + + var msg = new TestMessagesFactory().testMessage().msg("abcdefghijklmn").build(); + + MessageWriter writer = messageFormat.writer(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION); + MessageSerializer serializer = registry.createSerializer(msg.groupType(), msg.messageType()); + + ByteBuffer nioBuffer = ByteBuffer.allocate(10_000); + writer.setBuffer(nioBuffer); + serializer.writeMessage(msg, writer); + nioBuffer.flip(); + int expectedSize = nioBuffer.limit(); + + ByteBuf buffer = allocator.buffer(); + + // Feed the message in two halves. + int half = expectedSize / 2; + for (int i = 0; i < half; i++) { + buffer.writeByte(nioBuffer.get()); + } + decoder.decode(ctx, buffer, list); + assertEquals(0, list.size()); + + buffer.writeBytes(nioBuffer); + decoder.decode(ctx, buffer, list); + buffer.release(); + + assertEquals(1, list.size()); + + NetworkMessage received = (NetworkMessage) list.get(0); + assertEquals(expectedSize, received.getMessageSize()); + } + /** * Source of parameters for the {@link #testAllTypes(long)} method. Creates seeds for a {@link AllTypesMessage} generation. * diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index 51a7a9ffeef..52a405d5b82 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -197,6 +197,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; @@ -678,6 +679,7 @@ public CompletableFuture invoke(Condition condition, Operation success, Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriverManager.placementDriver(), threadPoolsManager.partitionOperationsExecutor(), + new PartitionOperationInflightLimiter(0), partitionIdleSafeTimePropagationPeriodMsSupplier, new NoOpFailureManager(), new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()), diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java index c2fed07bfc4..1214f3f4844 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java @@ -121,6 +121,7 @@ import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbPartitionStorage; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; import org.apache.ignite.internal.worker.ThreadAssertions; @@ -243,6 +244,7 @@ void setUp( Set.of(), placementDriver, executorService, + new PartitionOperationInflightLimiter(0), () -> Long.MAX_VALUE, failureManager, null, diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs index 36db50e89bf..1f2915c19cf 100644 --- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs +++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs @@ -425,6 +425,9 @@ public static class Replicator /// ReplicaAbsent error. public const int ReplicaAbsent = (GroupCode << 16) | (11 & 0xFFFF); + + /// ReplicaOverloaded error. + public const int ReplicaOverloaded = (GroupCode << 16) | (12 & 0xFFFF); } /// Storage errors. diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java index e1ce8b96158..9cf56fe7de9 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java @@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.entity; +import org.apache.ignite.internal.network.annotations.MessageSerialVersionUid; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.raft.jraft.RaftMessageGroup; import org.apache.ignite.raft.jraft.rpc.Message; @@ -61,6 +62,7 @@ private FileSource(int value) { } @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_FILE_META) + @MessageSerialVersionUid(-9146300188186546998L) public interface LocalFileMeta extends Message { int sourceNumber(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalStorageOutter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalStorageOutter.java index efaff6b21ac..39519c64eac 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalStorageOutter.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalStorageOutter.java @@ -20,6 +20,7 @@ package org.apache.ignite.raft.jraft.entity; import java.util.List; +import org.apache.ignite.internal.network.annotations.MessageSerialVersionUid; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.raft.jraft.RaftMessageGroup; import org.apache.ignite.raft.jraft.rpc.Message; @@ -27,6 +28,7 @@ public final class LocalStorageOutter { @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.STABLE_PB_META) + @MessageSerialVersionUid(-5290387314865672247L) public interface StablePBMeta extends Message { long term(); @@ -34,6 +36,7 @@ public interface StablePBMeta extends Message { } @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_SNAPSHOT_PB_META) + @MessageSerialVersionUid(2413309240654251368L) public interface LocalSnapshotPbMeta extends Message { @Nullable RaftOutter.SnapshotMeta meta(); @@ -41,6 +44,7 @@ public interface LocalSnapshotPbMeta extends Message { List filesList(); @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_SNAPSHOT_META_FILE) + @MessageSerialVersionUid(-7940140526427081886L) interface File extends Message { String name(); diff --git a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java index cf8542f7aba..1074e27f9ff 100644 --- a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java +++ b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java @@ -108,6 +108,7 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.topology.TestLogicalTopologyService; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator; @@ -236,6 +237,7 @@ public void beforeTest(TestInfo testInfo) { Set.of(ReplicaMessageTestGroup.class), new TestPlacementDriver(primaryReplicaSupplier), partitionOperationsExecutor, + new PartitionOperationInflightLimiter(0), () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, mock(FailureProcessor.class), // TODO: IGNITE-22222 can't pass ThreadLocalPartitionCommandsMarshaller there due to dependency loop diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 8ff7610e762..f0022ca8cae 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -71,6 +71,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.ComponentStoppingException; import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.lang.ReplicaOverloadedException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.IgniteThrottledLogger; import org.apache.ignite.internal.logger.Loggers; @@ -124,11 +125,11 @@ import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.replicator.message.TimestampAware; -import org.apache.ignite.internal.thread.ExecutorChooser; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteStripedBusyLock; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.TrackerClosedException; import org.apache.ignite.lang.IgniteException; @@ -225,6 +226,9 @@ public class ReplicaManager extends AbstractEventProducer> messageGroupsToHandle, PlacementDriver placementDriver, Executor requestsExecutor, + PartitionOperationInflightLimiter partitionOperationInFlightLimiter, LongSupplier idleSafeTimePropagationPeriodMsSupplier, FailureProcessor failureProcessor, @Nullable Marshaller raftCommandsMarshaller, @@ -293,6 +299,7 @@ public ReplicaManager( this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived; this.placementDriver = placementDriver; this.requestsExecutor = requestsExecutor; + this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter; this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; this.failureProcessor = failureProcessor; this.raftCommandsMarshaller = raftCommandsMarshaller; @@ -321,6 +328,22 @@ public ReplicaManager( } private void onReplicaMessageReceived(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { + if (!partitionOperationInFlightLimiter.tryAcquire(message.getMessageSize())) { + clusterNetSvc.messagingService().respond( + sender.name(), + prepareReplicaErrorResponse(false, new ReplicaOverloadedException()), + correlationId); + + return; + } + try { + handleReplicaMessage(message, sender, correlationId); + } finally { + partitionOperationInFlightLimiter.release(message.getMessageSize()); + } + } + + private void handleReplicaMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { if (!(message instanceof ReplicaRequest)) { return; } @@ -983,12 +1006,10 @@ private CompletableFuture stopReplicaInternal(ReplicationGroupId replic /** {@inheritDoc} */ @Override public CompletableFuture startAsync(ComponentContext componentContext) { - ExecutorChooser replicaMessagesExecutorChooser = message -> requestsExecutor; - - clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, replicaMessagesExecutorChooser, handler); + clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, handler); clusterNetSvc.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, placementDriverMessageHandler); messageGroupsToHandle.forEach( - mg -> clusterNetSvc.messagingService().addMessageHandler(mg, replicaMessagesExecutorChooser, handler) + mg -> clusterNetSvc.messagingService().addMessageHandler(mg, handler) ); scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate( this::idleSafeTimeSync, diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java index 9259d34beb4..af1aef43143 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java @@ -26,6 +26,7 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_OVERLOADED_ERR; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; @@ -63,6 +64,7 @@ public class ReplicaService { ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR, GROUP_OVERLOADED_ERR, + REPLICA_OVERLOADED_ERR, REPLICA_ABSENT_ERR ); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java index e29c0d8d64f..67641289bed 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java @@ -73,4 +73,14 @@ public class ReplicationConfigurationSchema { @Range(min = 1) @Value(hasDefault = true) public int batchSizeBytes = DEFAULT_BATCH_SIZE_BYTES; + + /** + * Percentage of JVM max heap memory to use as the limit for total in-flight partition operation bytes on this node. + * When the limit is reached, new partition operation requests are rejected with an overload error. + * Applies to the replica manager (inter-node) partition operations. + * Zero means no limit. + */ + @Range(min = 0, max = 100) + @Value(hasDefault = true) + public int partitionOperationHeapUsagePercent = 50; } diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java index 22e0b5704e4..6f90ab007ff 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.thread.ExecutorChooser; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator; @@ -158,6 +159,7 @@ void startReplicaManager( Set.of(), placementDriver, requestsExecutor, + new PartitionOperationInflightLimiter(0), () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, new NoOpFailureManager(), marshaller, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index b35f59aaae1..28aede955c4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -239,6 +239,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource; import org.apache.ignite.internal.worker.CriticalWorkerWatchdog; @@ -649,6 +650,7 @@ public CompletableFuture invoke(Condition condition, List su Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriverManager.placementDriver(), threadPoolsManager.partitionOperationsExecutor(), + new PartitionOperationInflightLimiter(0), partitionIdleSafeTimePropagationPeriodMsSupplier, failureProcessor, new ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()), diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index afe9e7d4cd0..327b5e58df2 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -302,6 +302,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource; @@ -993,6 +994,10 @@ public class IgniteImpl implements Ignite { var validationSchemasSource = new CatalogValidationSchemasSource(catalogManager, schemaManager, indexMetaStorage); + PartitionOperationInflightLimiter partitionOperationInFlightLimiter = new PartitionOperationInflightLimiter( + () -> replicationConfig.partitionOperationHeapUsagePercent().value() + ); + replicaMgr = new ReplicaManager( clusterSvc, cmgMgr, @@ -1001,6 +1006,7 @@ public class IgniteImpl implements Ignite { Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriverMgr.placementDriver(), threadPoolsManager.partitionOperationsExecutor(), + partitionOperationInFlightLimiter, partitionIdleSafeTimePropagationPeriodMsSupplier, failureManager, raftMarshaller, diff --git a/modules/runner/src/test/resources/compatibility/configuration/ignite-snapshot.bin b/modules/runner/src/test/resources/compatibility/configuration/ignite-snapshot.bin index 31dac9e2ca7..ca37b94e3ae 100644 Binary files a/modules/runner/src/test/resources/compatibility/configuration/ignite-snapshot.bin and b/modules/runner/src/test/resources/compatibility/configuration/ignite-snapshot.bin differ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java index 8ceef87b6c4..046b731b59e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java @@ -161,7 +161,8 @@ private static boolean replicaMiss(Throwable th) { } private static boolean groupOverloaded(Throwable th) { - return ExceptionUtils.extractCodeFrom(th) == Replicator.GROUP_OVERLOADED_ERR; + int code = ExceptionUtils.extractCodeFrom(th); + return code == Replicator.GROUP_OVERLOADED_ERR || code == Replicator.REPLICA_OVERLOADED_ERR; } private static boolean replicaAbsent(Throwable th) { diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java index dcef3028282..fafd2eaf120 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java @@ -109,6 +109,7 @@ import org.apache.ignite.internal.tx.test.TestTransactionIds; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator; @@ -216,6 +217,7 @@ public void setup() throws NodeStoppingException { Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), new TestPlacementDriver(clusterService.staticLocalNode()), requestsExecutor, + new PartitionOperationInflightLimiter(0), () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, new NoOpFailureManager(), mock(ThreadLocalPartitionCommandsMarshaller.class), diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java index 373e2c35d2e..58c50656b6f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java @@ -163,6 +163,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator; @@ -456,6 +457,7 @@ private void startComponents() throws Exception { Set.of(), placementDriver, partitionOperationsExecutor, + new PartitionOperationInflightLimiter(0), () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, failureProcessor, null, diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index a19ca2b08eb..7f4bd909d4f 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -195,6 +195,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.Lazy; +import org.apache.ignite.internal.util.PartitionOperationInflightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; import org.apache.ignite.network.NetworkAddress; @@ -521,6 +522,7 @@ public void prepareCluster() throws Exception { Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriver, partitionOperationsExecutor, + new PartitionOperationInflightLimiter(0), this::getSafeTimePropagationTimeout, new NoOpFailureManager(), commandMarshaller,