diff --git a/src/readpool.h b/src/readpool.h index 2b1bf733..616637e2 100644 --- a/src/readpool.h +++ b/src/readpool.h @@ -7,6 +7,7 @@ #include #include #include +#include #include "read.h" #include "options.h" #include "singleproducersingleconsumerlist.h" @@ -29,10 +30,10 @@ class ReadPool{ private: Options* mOptions; SingleProducerSingleConsumerList** mBufferLists; - size_t mProduced; + std::atomic mProduced; size_t mConsumed; unsigned long mLimit; - bool mIsFull; + std::atomic mIsFull; }; #endif \ No newline at end of file diff --git a/src/singleproducersingleconsumerlist.h b/src/singleproducersingleconsumerlist.h index 66304dca..cb79b6d8 100644 --- a/src/singleproducersingleconsumerlist.h +++ b/src/singleproducersingleconsumerlist.h @@ -63,7 +63,7 @@ template class SingleProducerSingleConsumerList { public: inline SingleProducerSingleConsumerList() { - head = NULL; + head.store(NULL, std::memory_order_relaxed); tail = NULL; producerFinished = false; consumerFinished = false; @@ -87,33 +87,38 @@ class SingleProducerSingleConsumerList { blocks = NULL; } inline size_t size() { - return produced - consumed; + return produced.load(std::memory_order_relaxed) - consumed.load(std::memory_order_relaxed); } inline bool canBeConsumed() { - if(head == NULL) + if(head.load(std::memory_order_acquire) == NULL) return false; - return head->nextItemReady || producerFinished; + return head.load(std::memory_order_relaxed)->nextItemReady || producerFinished; } inline void produce(T val) { LockFreeListItem* item = makeItem(val); - if(head==NULL) { - head = item; + if(head.load(std::memory_order_relaxed) == NULL) { tail = item; + // Release store: publishing head to consumer thread. + // All writes to *item are ordered before this store. + head.store(item, std::memory_order_release); // Signal the first item is consumable (no predecessor to set this) - head->nextItemReady.store(true, std::memory_order_release); + item->nextItemReady.store(true, std::memory_order_release); } else { tail->nextItem = item; - tail->nextItemReady = true; + // Release store: ensures nextItem write visible before nextItemReady flag. + tail->nextItemReady.store(true, std::memory_order_release); tail = item; } - produced++; + produced.fetch_add(1, std::memory_order_relaxed); } inline T consume() { - assert(head != NULL); - T val = head->value; - head = head->nextItem; - consumed++; - if((consumed & 0xFFF) == 0) + LockFreeListItem* h = head.load(std::memory_order_acquire); + assert(h != NULL); + T val = h->value; + // Advance head; release so next canBeConsumed() acquire sees updated state. + head.store(h->nextItem, std::memory_order_release); + unsigned long _c = consumed.fetch_add(1, std::memory_order_relaxed) + 1; + if((_c & 0xFFF) == 0) recycle(); return val; } @@ -132,8 +137,9 @@ class SingleProducerSingleConsumerList { private: // blockized list inline LockFreeListItem* makeItem(T val) { - unsigned long blk = produced >> 12; - unsigned long idx = produced & 0xFFF; + unsigned long _p = produced.load(std::memory_order_relaxed); + unsigned long blk = _p >> 12; + unsigned long idx = _p & 0xFFF; size_t size = 0x01<<12; if(blocksNum <= blk) { LockFreeListItem* buffer = new LockFreeListItem[size]; @@ -147,7 +153,7 @@ class SingleProducerSingleConsumerList { } inline void recycle() { - unsigned long blk = consumed >> 12; + unsigned long blk = consumed.load(std::memory_order_relaxed) >> 12; while((recycled+1) < blk) { delete[] blocks[recycled & blocksRingBufferSizeMask]; blocks[recycled & blocksRingBufferSizeMask] = NULL; @@ -156,13 +162,13 @@ class SingleProducerSingleConsumerList { } private: - LockFreeListItem* head; - LockFreeListItem* tail; + std::atomic*> head; + LockFreeListItem* tail; // tail is producer-private, no atomic needed LockFreeListItem** blocks; std::atomic_bool producerFinished; std::atomic_bool consumerFinished; - unsigned long produced; - unsigned long consumed; + std::atomic produced; + std::atomic consumed; unsigned long recycled; unsigned long blocksRingBufferSize; unsigned long blocksRingBufferSizeMask; diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 5d21091a..0e5d0fc0 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -28,9 +28,9 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ if (mFd < 0) error_exit("Failed to open for pwrite: " + mFilename); mOffsetRing = new OffsetSlot[OFFSET_RING_SIZE]; - mNextSeq = new size_t[mOptions->thread]; + mNextSeq = new std::atomic[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) - mNextSeq[t] = t; + mNextSeq[t].store(t, std::memory_order_relaxed); mCompressors = new libdeflate_compressor*[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); @@ -75,12 +75,15 @@ bool WriterThread::setInputCompleted() { } void WriterThread::setInputCompletedPwrite() { + // Acquire fence: synchronize with the release stores in inputPwrite() + // so that all mNextSeq[t] writes from worker threads are visible here. + std::atomic_thread_fence(std::memory_order_acquire); int W = mOptions->thread; size_t lastSeq = 0; bool anyProcessed = false; for (int t = 0; t < W; t++) { - if (mNextSeq[t] != (size_t)t) { - size_t workerLastSeq = mNextSeq[t] - W; + if (mNextSeq[t].load(std::memory_order_relaxed) != (size_t)t) { + size_t workerLastSeq = mNextSeq[t].load(std::memory_order_relaxed) - W; if (!anyProcessed || workerLastSeq > lastSeq) { lastSeq = workerLastSeq; anyProcessed = true; @@ -131,7 +134,7 @@ void WriterThread::inputPwrite(int tid, string* data) { const char* writeData = mCompBufs[tid]; size_t wsize = outsize; - size_t seq = mNextSeq[tid]; + size_t seq = mNextSeq[tid].load(std::memory_order_relaxed); // Wait for previous batch's cumulative offset. // Sleep yields CPU to prevent livelock under contention. @@ -164,7 +167,11 @@ void WriterThread::inputPwrite(int tid, string* data) { } } - mNextSeq[tid] += mOptions->thread; + // Release store: ensures the pwrite and cumulative_offset publication + // happen-before the acquire fence in setInputCompletedPwrite(). + mNextSeq[tid].store( + mNextSeq[tid].load(std::memory_order_relaxed) + mOptions->thread, + std::memory_order_release); } void WriterThread::cleanup() { diff --git a/src/writerthread.h b/src/writerthread.h index 053d27f6..bd685875 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -59,7 +59,7 @@ class WriterThread{ bool mPwriteMode; int mFd; OffsetSlot* mOffsetRing; - size_t* mNextSeq; + std::atomic* mNextSeq; libdeflate_compressor** mCompressors; char** mCompBufs; // per-worker pre-allocated compress output buffers size_t* mCompBufSizes; // per-worker buffer sizes