Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class TopicSubscription extends AbstractSubscription {
private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private final AtomicInteger discarded = new AtomicInteger();
private boolean enableExpiry = true;
Comment thread
mattrpav marked this conversation as resolved.
Outdated
private final Object matchedListMutex = new Object();
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
Expand Down Expand Up @@ -166,7 +167,7 @@ public void add(MessageReference node) throws Exception {
if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
max = maximumPendingMessages;
}
if (!matched.isEmpty() && matched.size() > max) {
if (enableExpiry && !matched.isEmpty() && matched.size() > max) {
removeExpiredMessages();
}
// lets discard old messages as we are a slow consumer
Expand Down Expand Up @@ -530,6 +531,18 @@ public void setMaximumPendingMessages(int maximumPendingMessages) {
this.maximumPendingMessages = maximumPendingMessages;
}

public boolean isEnableExpiry() {
return enableExpiry;
}

/**
* When set to {@code false} the eager O(n) expired-message scan is skipped on every
* {@link #add(MessageReference)} call.
*/
public void setEnableExpiry(boolean enableExpiry) {
this.enableExpiry = enableExpiry;
}

Comment thread
mattrpav marked this conversation as resolved.
Outdated
public MessageEvictionStrategy getMessageEvictionStrategy() {
return messageEvictionStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class PolicyEntry extends DestinationMapEntry {

private MessageInterceptorStrategy messageInterceptorStrategy = null;

private boolean enableExpiry = true;
Comment thread
mattrpav marked this conversation as resolved.
Outdated

Comment thread
mattrpav marked this conversation as resolved.
Outdated
public void configure(Broker broker,Queue queue) {
baseConfiguration(broker,queue);
if (dispatchPolicy != null) {
Expand Down Expand Up @@ -356,6 +358,11 @@ public void configure(Broker broker, SystemUsage memoryManager, TopicSubscriptio
subscription.setMaximumPendingMessages(value);
}
}
if (!enableExpiry) {
LOG.debug("Disabling eager expiry scan for consumer: {} (enableExpiry=false on PolicyEntry)",
subscription.getInfo().getConsumerId());
subscription.setEnableExpiry(false);
}
if (messageEvictionStrategy != null) {
subscription.setMessageEvictionStrategy(messageEvictionStrategy);
}
Expand Down Expand Up @@ -704,6 +711,30 @@ public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}

/**
* Returns whether the broker performs an eager expired-message scan on
* {@link org.apache.activemq.broker.region.TopicSubscription#add}.
*
* @return {@code true} if the expiry scan is enabled (default), {@code false} if skipped
*/
public boolean isEnableExpiry() {
return enableExpiry;
}

/**
* Controls whether the broker performs an eager expired-message scan when the
* pending slow-consumer buffer exceeds the high-water mark.
* <p>
* Set to {@code false} when messages carry no TTL, or when the O(n) scan cost
* outweighs the benefit of eagerly evicting expired messages from slow-consumer
* buffers.
*
* @param enableExpiry {@code false} to skip the scan; {@code true} to enable it (default)
*/
public void setEnableExpiry(boolean enableExpiry) {
this.enableExpiry = enableExpiry;
}

Comment thread
mattrpav marked this conversation as resolved.
Outdated
public int getMaxQueueAuditDepth() {
return maxQueueAuditDepth;
}
Expand Down
Loading