diff --git a/eo-runtime/src/main/java/org/eolang/AtWithRho.java b/eo-runtime/src/main/java/org/eolang/AtWithRho.java index 446977707d5..5abe822c287 100644 --- a/eo-runtime/src/main/java/org/eolang/AtWithRho.java +++ b/eo-runtime/src/main/java/org/eolang/AtWithRho.java @@ -5,17 +5,13 @@ package org.eolang; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + /** * The attribute that tries to copy object and set \rho to it if it has not already set. - * This attribute is NOT thread safe! + * * @since 0.36.0 - * @todo #4673:30min The {@link AtWithRho#get()} is not thread safe. If multiple threads - * call get() concurrently when the underlying object lacks RHO, each thread will: - * 1. Pass the !ret.hasRho() check - * 2. Create its own copy via ret.copy() - * 3. Attempt to set RHO on its copy - * This results in different threads receiving different copies, violating the expectation - * that get() returns a consistent view of the attribute's value. */ final class AtWithRho implements Attr { @@ -29,6 +25,16 @@ final class AtWithRho implements Attr { */ private final Phi rho; + /** + * Cached result of {@link #get()} to guarantee a consistent view across threads. + */ + private final AtomicReference cached; + + /** + * Lock guarding the first initialization of {@link #cached}. + */ + private final ReentrantLock lock; + /** * Ctor. * @param attr Attribute @@ -37,6 +43,8 @@ final class AtWithRho implements Attr { AtWithRho(final Attr attr, final Phi rho) { this.original = attr; this.rho = rho; + this.cached = new AtomicReference<>(); + this.lock = new ReentrantLock(); } @Override @@ -49,16 +57,48 @@ public Attr copy(final Phi self) { @Override public Phi get() { - Phi ret = this.original.get(); - if (!ret.hasRho()) { - ret = ret.copy(); - ret.put(Phi.RHO, this.rho); + Phi ret = this.cached.get(); + if (ret == null) { + ret = this.initialize(); } return ret; } @Override public void put(final Phi phi) { + this.cached.set(null); this.original.put(phi); } + + /** + * Initialize the cached value under the lock, using double-checked locking. + * @return Cached {@link Phi} + */ + private Phi initialize() { + this.lock.lock(); + try { + Phi ret = this.cached.get(); + if (ret == null) { + ret = this.withRho(this.original.get()); + this.cached.set(ret); + } + return ret; + } finally { + this.lock.unlock(); + } + } + + /** + * Return the given object with {@link Phi#RHO} attached, copying it if needed. + * @param phi Original object + * @return Object with {@code \rho} set + */ + private Phi withRho(final Phi phi) { + Phi ret = phi; + if (!ret.hasRho()) { + ret = ret.copy(); + ret.put(Phi.RHO, this.rho); + } + return ret; + } } diff --git a/eo-runtime/src/test/java/org/eolang/AtWithRhoTest.java b/eo-runtime/src/test/java/org/eolang/AtWithRhoTest.java index c2adb1cb55d..df143c365ea 100644 --- a/eo-runtime/src/test/java/org/eolang/AtWithRhoTest.java +++ b/eo-runtime/src/test/java/org/eolang/AtWithRhoTest.java @@ -4,6 +4,11 @@ */ package org.eolang; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; @@ -98,4 +103,61 @@ void copiesWithNewRhoMustCallCopyOnOriginal() { Matchers.not(Matchers.is(obj)) ); } + + @Test + void returnsSameInstanceToConcurrentCallers() throws InterruptedException { + MatcherAssert.assertThat( + "AtWithRho.get() must return the same instance for all concurrent callers", + AtWithRhoTest.distinctConcurrentGets( + new AtWithRho( + new AtComposite(new PhDefault(), phi -> phi), + new PhDefault() + ), + 16 + ), + Matchers.is(1) + ); + } + + /** + * Invoke {@link Attr#get()} concurrently from many threads released + * simultaneously and return the number of distinct instances observed. + * @param attr Attribute to query + * @param threads Number of concurrent callers + * @return Count of distinct {@link Phi} instances returned across threads + * @throws InterruptedException If interrupted while waiting + */ + private static int distinctConcurrentGets(final Attr attr, final int threads) + throws InterruptedException { + return AtWithRhoTest.distinctGets(attr, threads, ConcurrentHashMap.newKeySet()); + } + + /** + * Run concurrent {@link Attr#get()} calls collecting results into the given sink. + * @param attr Attribute to query + * @param threads Number of concurrent callers + * @param sink Destination set collecting distinct instances + * @return Size of {@code sink} after all threads finish + * @throws InterruptedException If interrupted while waiting + */ + private static int distinctGets(final Attr attr, final int threads, final Set sink) + throws InterruptedException { + final CountDownLatch start = new CountDownLatch(1); + try (ExecutorService pool = Executors.newFixedThreadPool(threads)) { + for (int idx = 0; idx < threads; ++idx) { + pool.submit( + () -> { + try { + start.await(); + sink.add(attr.get()); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + ); + } + start.countDown(); + } + return sink.size(); + } }