Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions eo-runtime/src/main/java/org/eolang/AtWithRho.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@

package org.eolang;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

/**
* The attribute that tries to copy object and set \rho to it if it has not already set.
* This attribute is NOT thread safe!
*
* @since 0.36.0
* @todo #4673:30min The {@link AtWithRho#get()} is not thread safe. If multiple threads
* call get() concurrently when the underlying object lacks RHO, each thread will:
* 1. Pass the !ret.hasRho() check
* 2. Create its own copy via ret.copy()
* 3. Attempt to set RHO on its copy
* This results in different threads receiving different copies, violating the expectation
* that get() returns a consistent view of the attribute's value.
*/
final class AtWithRho implements Attr {
/**
Expand All @@ -29,6 +24,16 @@ final class AtWithRho implements Attr {
*/
private final Phi rho;

/**
* Cached result of {@link #get()} to guarantee a consistent view across threads.
*/
private final AtomicReference<Phi> cached;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
* Lock guarding the first initialization of {@link #cached}.
*/
private final ReentrantLock lock;

/**
* Ctor.
* @param attr Attribute
Expand All @@ -37,6 +42,8 @@ final class AtWithRho implements Attr {
AtWithRho(final Attr attr, final Phi rho) {
this.original = attr;
this.rho = rho;
this.cached = new AtomicReference<>();
this.lock = new ReentrantLock();
}

@Override
Expand All @@ -49,16 +56,29 @@ public Attr copy(final Phi self) {

@Override
public Phi get() {
Phi ret = this.original.get();
if (!ret.hasRho()) {
ret = ret.copy();
ret.put(Phi.RHO, this.rho);
Phi ret = this.cached.get();
if (ret == null) {
this.lock.lock();
try {
ret = this.cached.get();
if (ret == null) {
ret = this.original.get();
if (!ret.hasRho()) {
ret = ret.copy();
ret.put(Phi.RHO, this.rho);
}
this.cached.set(ret);
}
} finally {
this.lock.unlock();
}
}
return ret;
}

@Override
public void put(final Phi phi) {
this.cached.set(null);
this.original.put(phi);
Comment on lines 68 to 70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Synchronize cache invalidation with initialization.

put() clears cached without the get() lock. A racing get() can compute an old value, put() can clear/update, then get() can write the stale value back into cached. Use the same lock around invalidation and delegation.

🔒 Proposed fix: serialize `put()` with `get()` initialization
     `@Override`
     public void put(final Phi phi) {
-        this.cached.set(null);
-        this.original.put(phi);
+        this.lock.lock();
+        try {
+            this.cached.set(null);
+            this.original.put(phi);
+        } finally {
+            this.lock.unlock();
+        }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@eo-runtime/src/main/java/org/eolang/AtWithRho.java` around lines 80 - 82,
put() clears cached without holding the same lock used by get(), allowing a race
where get() recomputes and rewrites a stale value; fix by wrapping the
invalidation and delegation inside the same synchronization used by get() (i.e.,
use the same lock object/monitor that guards cached initialization in get()) —
for example, place cached.set(null) and original.put(phi) inside a
synchronized(...) block using the lock object referenced in get() so put() is
serialized with get()'s initialization.

}
}
46 changes: 46 additions & 0 deletions eo-runtime/src/test/java/org/eolang/AtWithRhoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
*/
package org.eolang;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Surface worker failures and assert identity directly.

submit() captures task exceptions in Futures, but this test never reads them. If only one worker succeeds and the rest fail, results can still have size 1. Also, Set uniqueness uses equals/hashCode, not strict instance identity.

🧪 Proposed fix: collect futures and assert `sameInstance` for every result
+import java.util.ArrayList;
+import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@
     `@Test`
-    void returnsSameInstanceToConcurrentCallers() throws InterruptedException {
+    void returnsSameInstanceToConcurrentCallers() throws Exception {
         final int threads = 16;
         final Attr attr = new AtWithRho(
             new AtComposite(new PhDefault(), phi -> phi),
             new PhDefault()
         );
-        final Set<Phi> results = ConcurrentHashMap.newKeySet();
         final CountDownLatch start = new CountDownLatch(1);
-        final CountDownLatch done = new CountDownLatch(threads);
         final ExecutorService pool = Executors.newFixedThreadPool(threads);
+        final List<Future<Phi>> futures = new ArrayList<>(threads);
         try {
             for (int idx = 0; idx < threads; ++idx) {
-                pool.submit(() -> {
-                    try {
-                        start.await();
-                        results.add(attr.get());
-                    } catch (final InterruptedException ex) {
-                        Thread.currentThread().interrupt();
-                    } finally {
-                        done.countDown();
-                    }
+                futures.add(pool.submit(() -> {
+                    start.await();
+                    return attr.get();
                 });
             }
             start.countDown();
+            final Phi first = futures.get(0).get(10L, TimeUnit.SECONDS);
+            for (final Future<Phi> future : futures) {
+                MatcherAssert.assertThat(
+                    "AtWithRho.get() must return the same instance for all concurrent callers",
+                    future.get(10L, TimeUnit.SECONDS),
+                    Matchers.sameInstance(first)
+                );
+            }
-            MatcherAssert.assertThat(
-                "all threads must finish",
-                done.await(10L, TimeUnit.SECONDS),
-                Matchers.is(true)
-            );
         } finally {
             pool.shutdownNow();
         }
-        MatcherAssert.assertThat(
-            "AtWithRho.get() must return the same instance for all concurrent callers",
-            results,
-            Matchers.hasSize(1)
-        );
     }

Also applies to: 107-145

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@eo-runtime/src/test/java/org/eolang/AtWithRhoTest.java` around lines 7 - 12,
The test currently submits tasks with ExecutorService.submit but never inspects
the returned Futures and uses a Set (results) which relies on equals/hashCode;
change the test in AtWithRhoTest to collect the Future<?> objects returned by
submit, after awaiting the CountDownLatch iterate over each Future and call get
(with a timeout) so any worker exceptions are thrown and fail the test, and
replace the Set-based uniqueness/assertion with explicit identity checks (use
assertSame or Hamcrest sameInstance) against the expected object instances for
every result entry instead of relying on Set membership; refer to the
variables/methods ExecutorService.submit, Future.get, CountDownLatch.await, and
the results collection to locate where to change code.

import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -97,4 +103,44 @@ void copiesWithNewRhoMustCallCopyOnOriginal() {
Matchers.not(Matchers.is(obj))
);
}

@Test
void returnsSameInstanceToConcurrentCallers() throws InterruptedException {
final int threads = 16;
final Attr attr = new AtWithRho(
new AtComposite(new PhDefault(), phi -> phi),
new PhDefault()
);
final Set<Phi> results = ConcurrentHashMap.newKeySet();
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(threads);
final ExecutorService pool = Executors.newFixedThreadPool(threads);
try {
for (int idx = 0; idx < threads; ++idx) {
pool.submit(() -> {
try {
start.await();
results.add(attr.get());
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
done.countDown();
}
});
}
start.countDown();
MatcherAssert.assertThat(
"all threads must finish",
done.await(10L, TimeUnit.SECONDS),
Matchers.is(true)
);
} finally {
pool.shutdownNow();
}
MatcherAssert.assertThat(
"AtWithRho.get() must return the same instance for all concurrent callers",
results,
Matchers.hasSize(1)
);
}
}
Loading