Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/readpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <atomic>
#include "read.h"
#include "options.h"
#include "singleproducersingleconsumerlist.h"
Expand All @@ -29,10 +30,10 @@ class ReadPool{
private:
Options* mOptions;
SingleProducerSingleConsumerList<Read*>** mBufferLists;
size_t mProduced;
std::atomic<size_t> mProduced;
size_t mConsumed;
unsigned long mLimit;
bool mIsFull;
std::atomic<bool> mIsFull;
};

#endif
48 changes: 27 additions & 21 deletions src/singleproducersingleconsumerlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ template<typename T>
class SingleProducerSingleConsumerList {
public:
inline SingleProducerSingleConsumerList() {
head = NULL;
head.store(NULL, std::memory_order_relaxed);
tail = NULL;
producerFinished = false;
consumerFinished = false;
Expand All @@ -87,33 +87,38 @@ class SingleProducerSingleConsumerList {
blocks = NULL;
}
inline size_t size() {
return produced - consumed;
return produced.load(std::memory_order_relaxed) - consumed.load(std::memory_order_relaxed);
}
inline bool canBeConsumed() {
if(head == NULL)
if(head.load(std::memory_order_acquire) == NULL)
return false;
return head->nextItemReady || producerFinished;
return head.load(std::memory_order_relaxed)->nextItemReady || producerFinished;
}
inline void produce(T val) {
LockFreeListItem<T>* item = makeItem(val);
if(head==NULL) {
head = item;
if(head.load(std::memory_order_relaxed) == NULL) {
tail = item;
// Release store: publishing head to consumer thread.
// All writes to *item are ordered before this store.
head.store(item, std::memory_order_release);
// Signal the first item is consumable (no predecessor to set this)
head->nextItemReady.store(true, std::memory_order_release);
item->nextItemReady.store(true, std::memory_order_release);
} else {
tail->nextItem = item;
tail->nextItemReady = true;
// Release store: ensures nextItem write visible before nextItemReady flag.
tail->nextItemReady.store(true, std::memory_order_release);
tail = item;
}
produced++;
produced.fetch_add(1, std::memory_order_relaxed);
}
inline T consume() {
assert(head != NULL);
T val = head->value;
head = head->nextItem;
consumed++;
if((consumed & 0xFFF) == 0)
LockFreeListItem<T>* h = head.load(std::memory_order_acquire);
assert(h != NULL);
T val = h->value;
// Advance head; release so next canBeConsumed() acquire sees updated state.
head.store(h->nextItem, std::memory_order_release);
unsigned long _c = consumed.fetch_add(1, std::memory_order_relaxed) + 1;
if((_c & 0xFFF) == 0)
recycle();
return val;
}
Expand All @@ -132,8 +137,9 @@ class SingleProducerSingleConsumerList {
private:
// blockized list
inline LockFreeListItem<T>* makeItem(T val) {
unsigned long blk = produced >> 12;
unsigned long idx = produced & 0xFFF;
unsigned long _p = produced.load(std::memory_order_relaxed);
unsigned long blk = _p >> 12;
unsigned long idx = _p & 0xFFF;
size_t size = 0x01<<12;
if(blocksNum <= blk) {
LockFreeListItem<T>* buffer = new LockFreeListItem<T>[size];
Expand All @@ -147,7 +153,7 @@ class SingleProducerSingleConsumerList {
}

inline void recycle() {
unsigned long blk = consumed >> 12;
unsigned long blk = consumed.load(std::memory_order_relaxed) >> 12;
while((recycled+1) < blk) {
delete[] blocks[recycled & blocksRingBufferSizeMask];
blocks[recycled & blocksRingBufferSizeMask] = NULL;
Expand All @@ -156,13 +162,13 @@ class SingleProducerSingleConsumerList {
}

private:
LockFreeListItem<T>* head;
LockFreeListItem<T>* tail;
std::atomic<LockFreeListItem<T>*> head;
LockFreeListItem<T>* tail; // tail is producer-private, no atomic needed
LockFreeListItem<T>** blocks;
std::atomic_bool producerFinished;
std::atomic_bool consumerFinished;
unsigned long produced;
unsigned long consumed;
std::atomic<unsigned long> produced;
std::atomic<unsigned long> consumed;
unsigned long recycled;
unsigned long blocksRingBufferSize;
unsigned long blocksRingBufferSizeMask;
Expand Down
19 changes: 13 additions & 6 deletions src/writerthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
if (mFd < 0)
error_exit("Failed to open for pwrite: " + mFilename);
mOffsetRing = new OffsetSlot[OFFSET_RING_SIZE];
mNextSeq = new size_t[mOptions->thread];
mNextSeq = new std::atomic<size_t>[mOptions->thread];
for (int t = 0; t < mOptions->thread; t++)
mNextSeq[t] = t;
mNextSeq[t].store(t, std::memory_order_relaxed);
mCompressors = new libdeflate_compressor*[mOptions->thread];
for (int t = 0; t < mOptions->thread; t++)
mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression);
Expand Down Expand Up @@ -75,12 +75,15 @@ bool WriterThread::setInputCompleted() {
}

void WriterThread::setInputCompletedPwrite() {
// Acquire fence: synchronize with the release stores in inputPwrite()
// so that all mNextSeq[t] writes from worker threads are visible here.
std::atomic_thread_fence(std::memory_order_acquire);
int W = mOptions->thread;
size_t lastSeq = 0;
bool anyProcessed = false;
for (int t = 0; t < W; t++) {
if (mNextSeq[t] != (size_t)t) {
size_t workerLastSeq = mNextSeq[t] - W;
if (mNextSeq[t].load(std::memory_order_relaxed) != (size_t)t) {
size_t workerLastSeq = mNextSeq[t].load(std::memory_order_relaxed) - W;
if (!anyProcessed || workerLastSeq > lastSeq) {
lastSeq = workerLastSeq;
anyProcessed = true;
Expand Down Expand Up @@ -131,7 +134,7 @@ void WriterThread::inputPwrite(int tid, string* data) {
const char* writeData = mCompBufs[tid];
size_t wsize = outsize;

size_t seq = mNextSeq[tid];
size_t seq = mNextSeq[tid].load(std::memory_order_relaxed);

// Wait for previous batch's cumulative offset.
// Sleep yields CPU to prevent livelock under contention.
Expand Down Expand Up @@ -164,7 +167,11 @@ void WriterThread::inputPwrite(int tid, string* data) {
}
}

mNextSeq[tid] += mOptions->thread;
// Release store: ensures the pwrite and cumulative_offset publication
// happen-before the acquire fence in setInputCompletedPwrite().
mNextSeq[tid].store(
mNextSeq[tid].load(std::memory_order_relaxed) + mOptions->thread,
std::memory_order_release);
}

void WriterThread::cleanup() {
Expand Down
2 changes: 1 addition & 1 deletion src/writerthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class WriterThread{
bool mPwriteMode;
int mFd;
OffsetSlot* mOffsetRing;
size_t* mNextSeq;
std::atomic<size_t>* mNextSeq;
libdeflate_compressor** mCompressors;
char** mCompBufs; // per-worker pre-allocated compress output buffers
size_t* mCompBufSizes; // per-worker buffer sizes
Expand Down
Loading