diff --git a/deps/oblib/src/lib/alloc/block_set.cpp b/deps/oblib/src/lib/alloc/block_set.cpp index a85817a6b..611533513 100644 --- a/deps/oblib/src/lib/alloc/block_set.cpp +++ b/deps/oblib/src/lib/alloc/block_set.cpp @@ -17,6 +17,7 @@ #include "block_set.h" #include "lib/alloc/ob_tenant_ctx_allocator.h" +#include "lib/time/ob_time_utility.h" #ifdef _WIN32 #include #ifndef MADV_DONTNEED @@ -32,13 +33,23 @@ using namespace oceanbase; using namespace oceanbase::lib; +namespace +{ +static const int64_t ORDINARY_PURGE_BUDGET = 4L << 20; +static const int64_t ORDINARY_PURGE_MIN_INTERVAL_US = 1000L * 1000L; +static const int64_t ORDINARY_PURGE_DELAY_US = 1000L * 1000L; +static const int64_t ORDINARY_PURGE_MAX_BLOCKS = 64; +} + BlockSet::BlockSet() : tallocator_(NULL), locker_(NULL), chunk_mgr_(NULL), clist_(NULL), avail_bm_(BLOCKS_PER_CHUNK+1, avail_bm_buf_), - total_hold_(0), total_payload_(0), total_used_(0) + purged_avail_bm_(BLOCKS_PER_CHUNK+1, purged_avail_bm_buf_), + total_hold_(0), total_payload_(0), total_used_(0), + last_ordinary_purge_ts_(0) { } @@ -60,6 +71,8 @@ void BlockSet::reset() //MEMSET(block_list_, 0, sizeof(block_list_)); clist_ = nullptr; avail_bm_.clear(); + purged_avail_bm_.clear(); + last_ordinary_purge_ts_ = 0; } void BlockSet::set_tenant_ctx_allocator(ObTenantCtxAllocator &allocator) @@ -105,6 +118,9 @@ ABlock *BlockSet::alloc_block(const uint64_t size, const ObMemAttr &attr) uint64_t payload = 0; block->hold(&payload); UNUSED(ATOMIC_FAA(&total_used_, payload)); + if (!block->is_large_) { + maybe_ordinary_purge(); + } } return block; @@ -156,25 +172,26 @@ void BlockSet::free_block(ABlock *const block) head->in_use_ = false; // copy a temp if (0 == chunk->using_cnt_) { - if (0 != chunk->washed_size_) { - int offset = 0; - do { - ABlock *unused_block = chunk->offset2blk(offset); - int next_offset = -1; - bool is_last = chunk->is_last_blk_offset(offset, &next_offset); - abort_unless(!unused_block->in_use_); - // don't allow to take off head twice - if (!unused_block->is_washed_ && head != unused_block) { - take_off_free_block(unused_block, chunk->blk_nblocks(unused_block), chunk); - } - if (is_last) break; - offset = next_offset; - } while (true); - } + int offset = 0; + do { + ABlock *unused_block = chunk->offset2blk(offset); + int next_offset = -1; + bool is_last = chunk->is_last_blk_offset(offset, &next_offset); + abort_unless(!unused_block->in_use_); + // don't allow to take off head twice + if (unused_block->is_washed_) { + take_off_purged_block(unused_block, chunk->blk_nblocks(unused_block), chunk); + } else if (head != unused_block) { + take_off_free_block(unused_block, chunk->blk_nblocks(unused_block), chunk); + } + if (is_last) break; + offset = next_offset; + } while (true); free_chunk(chunk); } else { int head_nblocks = chunk->blk_nblocks(head); add_free_block(head, head_nblocks, chunk); + maybe_ordinary_purge(); } } } @@ -186,6 +203,7 @@ void BlockSet::add_free_block(ABlock *block, int nblocks, AChunk *chunk) abort_unless(NULL != block && !block->in_use_ && !block->is_washed_); int offset = chunk->blk_offset(block); chunk->mark_blk_offset_bit(offset); + block->alloc_bytes_ = common::ObTimeUtility::fast_current_time(); #if MEMCHK_LEVEL >= 1 int expect_nblocks = chunk->blk_nblocks(block); @@ -212,14 +230,7 @@ ABlock* BlockSet::get_free_block(const int cls, const ObMemAttr &attr) if (ffs >= 0) { if (NULL != block_list_[ffs]) { // exist block = block_list_[ffs]; - if (block->next_ != block) { // not the only one - block->prev_->next_ = block->next_; - block->next_->prev_ = block->prev_; - block_list_[ffs] = block->next_; - } else { - avail_bm_.unset(ffs); - block_list_[ffs] = NULL; - } + take_off_free_block(block, ffs, block->chunk()); block->in_use_ = true; } } @@ -231,6 +242,10 @@ ABlock* BlockSet::get_free_block(const int cls, const ObMemAttr &attr) add_free_block(next_block, ffs - cls, chunk); } + if (block == NULL && ffs < 0) { + block = get_purged_block(cls, attr); + } + if (block == NULL && ffs < 0) { if (add_chunk(attr)) { block = get_free_block(cls, attr); @@ -242,7 +257,7 @@ ABlock* BlockSet::get_free_block(const int cls, const ObMemAttr &attr) void BlockSet::take_off_free_block(ABlock *block, int nblocks, AChunk *chunk) { - abort_unless(NULL != block && !block->in_use_); + abort_unless(NULL != block && !block->in_use_ && !block->is_washed_); #if MEMCHK_LEVEL >= 1 int expect_nblocks = chunk->blk_nblocks(block); @@ -260,6 +275,87 @@ void BlockSet::take_off_free_block(ABlock *block, int nblocks, AChunk *chunk) } } +void BlockSet::add_purged_block(ABlock *block, int nblocks, AChunk *chunk) +{ + abort_unless(NULL != block && !block->in_use_ && block->is_washed_); + int offset = chunk->blk_offset(block); + chunk->mark_blk_offset_bit(offset); + +#if MEMCHK_LEVEL >= 1 + int expect_nblocks = chunk->blk_nblocks(block); + abort_unless(nblocks == expect_nblocks); +#endif + ABlock *&blist = purged_block_list_[nblocks]; + if (purged_avail_bm_.isset(nblocks)) { + block->prev_ = blist->prev_; + block->next_ = blist; + block->prev_->next_ = block; + block->next_->prev_ = block; + } else { + block->prev_ = block->next_ = block; + blist = block; + purged_avail_bm_.set(nblocks); + } +} + +ABlock* BlockSet::get_purged_block(const int cls, const ObMemAttr &attr) +{ + ABlock *block = NULL; + const int ffs = purged_avail_bm_.find_first_significant(cls); + if (ffs >= 0 && OB_NOT_NULL(purged_block_list_[ffs]) && OB_NOT_NULL(tallocator_)) { + const int64_t restore_size = cls * ABLOCK_SIZE; + if (tallocator_->restore_purged_hold(restore_size, attr)) { + block = purged_block_list_[ffs]; + AChunk *chunk = block->chunk(); + take_off_purged_block(block, ffs, chunk); + + UNUSED(ATOMIC_FAA(&total_hold_, restore_size)); + UNUSED(ATOMIC_FAA(&total_payload_, restore_size)); + abort_unless(chunk->washed_size_ >= static_cast(restore_size)); + chunk->washed_size_ -= restore_size; + + int64_t related_chunks = 0; + int64_t washed_blks = 0; + if (ffs == cls) { + abort_unless(chunk->washed_blks_ > 0); + chunk->washed_blks_--; + washed_blks = -1; + if (0 == chunk->washed_blks_) { + related_chunks = -1; + } + } else { + ABlock *next_block = new (block + cls) ABlock(); + next_block->is_washed_ = true; + add_purged_block(next_block, ffs - cls, chunk); + } + tallocator_->update_wash_stat(related_chunks, washed_blks, -restore_size); + block->is_washed_ = false; + block->in_use_ = true; + } + } + return block; +} + +void BlockSet::take_off_purged_block(ABlock *block, int nblocks, AChunk *chunk) +{ + abort_unless(NULL != block && !block->in_use_ && block->is_washed_); + +#if MEMCHK_LEVEL >= 1 + int expect_nblocks = chunk->blk_nblocks(block); + abort_unless(nblocks == expect_nblocks); +#endif + if (block->next_ != block) { + block->next_->prev_ = block->prev_; + block->prev_->next_ = block->next_; + if (block == purged_block_list_[nblocks]) { + purged_block_list_[nblocks] = block->next_; + } + } else { + purged_avail_bm_.unset(nblocks); + purged_block_list_[nblocks] = NULL; + } +} + AChunk *BlockSet::alloc_chunk(const uint64_t size, const ObMemAttr &attr) { AChunk *chunk = NULL; @@ -341,28 +437,60 @@ inline int ob_madvise(void* addr, size_t length, int advice) { #endif int64_t BlockSet::sync_wash(int64_t wash_size) +{ + bool has_ignore = false; + return purge_free_blocks(wash_size, 0, INT64_MAX, &has_ignore); +} + +int64_t BlockSet::purge_free_blocks(const int64_t wash_size, + const int64_t delay_us, + const int64_t max_blocks_per_round, + bool *has_ignore, + int64_t *scanned_blocks) { #if !defined(MADV_DONTNEED) + UNUSED(wash_size); + UNUSED(delay_us); + UNUSED(max_blocks_per_round); + UNUSED(has_ignore); + if (OB_NOT_NULL(scanned_blocks)) { + *scanned_blocks = 0; + } return 0; -#endif +#else const ssize_t ps = get_page_size(); - bool has_ignore = false; + bool local_has_ignore = false; int64_t washed_size = 0; int64_t washed_blks = 0; + int64_t scanned_blks = 0; int64_t related_chunks = 0; + const int64_t now = delay_us > 0 ? common::ObTimeUtility::fast_current_time() : 0; int cls = avail_bm_.nbits() - 1; - while (washed_size < wash_size && cls >=1 && (cls = avail_bm_.find_first_most_significant(cls)) != -1) { + while (washed_size < wash_size && scanned_blks < max_blocks_per_round && + cls >= 1 && (cls = avail_bm_.find_first_most_significant(cls)) != -1) { + const int64_t len = cls * ABLOCK_SIZE; + if (len < ps) { + break; + } else if (washed_size + len > wash_size) { + cls--; + continue; + } ABlock *head = block_list_[cls]; if (nullptr == head) { } else { ABlock *block = head; - ABlock *next = block->next_; - do { - block = next; + bool need_scan = true; + int64_t scan_cnt = 0; + while (need_scan && OB_NOT_NULL(block) && + washed_size < wash_size && scanned_blks < max_blocks_per_round && + scan_cnt++ < BLOCKS_PER_CHUNK) { + ABlock *next = block->next_ != block ? block->next_ : nullptr; + need_scan = OB_NOT_NULL(next) && next != head; + scanned_blks++; AChunk *chunk = block->chunk(); if (chunk->is_hugetlb_) { _OB_LOG(DEBUG, "cannot be applied to Huge TLB pages"); - has_ignore = true; + local_has_ignore = true; } else { #if MEMCHK_LEVEL >= 1 abort_unless(!block->in_use_ && !block->is_washed_); @@ -370,11 +498,11 @@ int64_t BlockSet::sync_wash(int64_t wash_size) abort_unless(nblocks == cls); #endif char *data = chunk->blk_data(block); - int64_t len = cls * ABLOCK_SIZE; if ((reinterpret_cast(data) & (ps - 1)) != 0 || (len & (ps - 1)) != 0) { _OB_LOG(DEBUG, "cannot be applied to non-multiple of page-size, page_size: %zd", ps); - has_ignore = true; + local_has_ignore = true; + } else if (delay_us > 0 && now - static_cast(block->alloc_bytes_) < delay_us) { } else { int result = 0; do { @@ -386,10 +514,14 @@ int64_t BlockSet::sync_wash(int64_t wash_size) } while (result == -1 && errno == EAGAIN); if (-1 == result) { _OB_LOG_RET(WARN, OB_ERR_SYS, "madvise failed, errno: %d", errno); - has_ignore = true; + local_has_ignore = true; } else { + if (head == block) { + head = next; + } take_off_free_block(block, cls, chunk); block->is_washed_ = true; + add_purged_block(block, cls, chunk); if (0 == chunk->washed_blks_) { abort_unless(0 == chunk->washed_size_); related_chunks++; @@ -401,13 +533,16 @@ int64_t BlockSet::sync_wash(int64_t wash_size) } } } - next = next->next_; - } while (washed_size < wash_size && block != head); + block = next; + if (OB_ISNULL(head)) { + need_scan = false; + } + } } cls--; } #if MEMCHK_LEVEL >= 1 - if (wash_size == INT64_MAX && !has_ignore) { + if (wash_size == INT64_MAX && !local_has_ignore) { abort_unless(-1 == avail_bm_.find_first_significant(1)); } #endif @@ -417,10 +552,32 @@ int64_t BlockSet::sync_wash(int64_t wash_size) tallocator_->dec_hold(washed_size); tallocator_->update_wash_stat(related_chunks, washed_blks, washed_size); } + if (OB_NOT_NULL(has_ignore)) { + *has_ignore = local_has_ignore; + } + if (OB_NOT_NULL(scanned_blocks)) { + *scanned_blocks = scanned_blks; + } #if MEMCHK_LEVEL >= 1 if (0 == washed_size && ABLOCK_SIZE & (ps - 1)) { abort_unless(total_payload_ == total_used_); } #endif return washed_size; +#endif +} + +void BlockSet::maybe_ordinary_purge() +{ +#if defined(MADV_DONTNEED) + const int64_t now = common::ObTimeUtility::fast_current_time(); + const int64_t last_ts = ATOMIC_LOAD(&last_ordinary_purge_ts_); + if (now - last_ts >= ORDINARY_PURGE_MIN_INTERVAL_US) { + ATOMIC_STORE(&last_ordinary_purge_ts_, now); + (void)purge_free_blocks(ORDINARY_PURGE_BUDGET, + ORDINARY_PURGE_DELAY_US, + ORDINARY_PURGE_MAX_BLOCKS, + nullptr); + } +#endif } diff --git a/deps/oblib/src/lib/alloc/block_set.h b/deps/oblib/src/lib/alloc/block_set.h index 8b05a7ea2..8de403077 100644 --- a/deps/oblib/src/lib/alloc/block_set.h +++ b/deps/oblib/src/lib/alloc/block_set.h @@ -61,6 +61,15 @@ class BlockSet void add_free_block(ABlock *block, int nblocks, AChunk *chunk); ABlock *get_free_block(const int cls, const ObMemAttr &attr); void take_off_free_block(ABlock *block, int nblocks, AChunk *chunk); + void add_purged_block(ABlock *block, int nblocks, AChunk *chunk); + ABlock *get_purged_block(const int cls, const ObMemAttr &attr); + void take_off_purged_block(ABlock *block, int nblocks, AChunk *chunk); + int64_t purge_free_blocks(const int64_t wash_size, + const int64_t delay_us, + const int64_t max_blocks_per_round, + bool *has_ignore, + int64_t *scanned_blocks = nullptr); + void maybe_ordinary_purge(); AChunk *alloc_chunk(const uint64_t size, const ObMemAttr &attr); bool add_chunk(const ObMemAttr &attr); void free_chunk(AChunk *const chunk); @@ -74,12 +83,16 @@ class BlockSet union { ABlock *block_list_[BLOCKS_PER_CHUNK+1]; }; + ABlock *purged_block_list_[BLOCKS_PER_CHUNK+1]; AChunk *clist_; // using chunk list ABitSet avail_bm_; + ABitSet purged_avail_bm_; char avail_bm_buf_[ABitSet::buf_len(BLOCKS_PER_CHUNK+1)]; + char purged_avail_bm_buf_[ABitSet::buf_len(BLOCKS_PER_CHUNK+1)]; uint64_t total_hold_; uint64_t total_payload_; uint64_t total_used_; + int64_t last_ordinary_purge_ts_; }; // end of class BlockSet void BlockSet::lock() diff --git a/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp b/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp index cbf50d855..3c5644a69 100644 --- a/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp +++ b/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp @@ -278,6 +278,12 @@ void ObTenantCtxAllocator::dec_hold(const int64_t size) { ctx_allocator_.dec_hold(size); } + +bool ObTenantCtxAllocator::restore_purged_hold(const int64_t size, const ObMemAttr &attr) +{ + return ctx_allocator_.restore_purged_hold(size, attr); +} + void ObTenantCtxAllocatorV2::dec_hold(const int64_t size) { if (!resource_handle_.is_valid()) { @@ -289,6 +295,28 @@ void ObTenantCtxAllocatorV2::dec_hold(const int64_t size) } } +bool ObTenantCtxAllocatorV2::restore_purged_hold(const int64_t size, const ObMemAttr &attr) +{ + bool bret = true; + if (size <= 0) { + bret = true; + } else if (!resource_handle_.is_valid()) { + bret = false; + LIB_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "resource_handle is invalid", K_(ctx_id)); + } else { + bool reach_ctx_limit = false; + const bool high_prio = OB_HIGH_ALLOC == attr.prio_; + if (!resource_handle_.get_memory_mgr()->update_hold(size, ctx_id_, ObLabel(), reach_ctx_limit, high_prio)) { + bret = false; + } else if (!AChunkMgr::instance().try_restore_hold(size, high_prio)) { + bool unused_reach_ctx_limit = false; + resource_handle_.get_memory_mgr()->update_hold(-size, ctx_id_, ObLabel(), unused_reach_ctx_limit); + bret = false; + } + } + return bret; +} + int ObTenantCtxAllocator::set_idle(const int64_t set_size, const bool reserve/*=false*/) { int ret = OB_SUCCESS; diff --git a/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.h b/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.h index 2bed995ae..33540df00 100644 --- a/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.h +++ b/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.h @@ -82,6 +82,7 @@ class ObTenantCtxAllocatorV2 : private common::ObLink } bool resource_handle_valid() const { return resource_handle_.is_valid(); } void dec_hold(const int64_t size); + bool restore_purged_hold(const int64_t size, const ObMemAttr &attr); // statistic related int set_tenant_memory_mgr() { @@ -415,6 +416,7 @@ class AChunkUsingList AChunk *alloc_chunk(const int64_t size, const ObMemAttr &attr); void free_chunk(AChunk *chunk, const ObMemAttr &attr); void dec_hold(const int64_t size); + bool restore_purged_hold(const int64_t size, const ObMemAttr &attr); int set_idle(const int64_t size, const bool reserve = false); IBlockMgr &get_block_mgr() { return obj_mgr_; } IChunkMgr &get_chunk_mgr() { return chunk_mgr_; } diff --git a/deps/oblib/src/lib/resource/achunk_mgr.cpp b/deps/oblib/src/lib/resource/achunk_mgr.cpp index 8398489ed..16788ab86 100644 --- a/deps/oblib/src/lib/resource/achunk_mgr.cpp +++ b/deps/oblib/src/lib/resource/achunk_mgr.cpp @@ -416,6 +416,11 @@ void AChunkMgr::dec_hold(int64_t bytes) IGNORE_RETURN ATOMIC_AAF(&hold_, -bytes); } +bool AChunkMgr::try_restore_hold(int64_t bytes, bool high_prio) +{ + return bytes <= 0 ? true : try_inc_hold_hard(bytes, high_prio); +} + bool AChunkMgr::try_inc_hold(int64_t bytes, int64_t limit, bool high_prio) { bool bret = true; diff --git a/deps/oblib/src/lib/resource/achunk_mgr.h b/deps/oblib/src/lib/resource/achunk_mgr.h index 9f3afd64e..671d6fb09 100644 --- a/deps/oblib/src/lib/resource/achunk_mgr.h +++ b/deps/oblib/src/lib/resource/achunk_mgr.h @@ -241,6 +241,7 @@ class AChunkMgr } inline static AChunk *ptr2chunk(const void *ptr); void dec_hold(int64_t bytes); + bool try_restore_hold(int64_t bytes, bool high_prio); virtual int madvise(void *addr, size_t length, int advice); void munmap(void *addr, size_t length); int64_t to_string(char *buf, const int64_t buf_len) const; diff --git a/deps/oblib/unittest/lib/alloc/test_block_set.cpp b/deps/oblib/unittest/lib/alloc/test_block_set.cpp index 50c64d39c..27c0ef5c7 100644 --- a/deps/oblib/unittest/lib/alloc/test_block_set.cpp +++ b/deps/oblib/unittest/lib/alloc/test_block_set.cpp @@ -183,6 +183,65 @@ TEST_F(TestBlockSet, BigBlockOrigin) } } +TEST_F(TestBlockSet, ReusePurgedBlock) +{ + const uint64_t sz = 100L * ABLOCK_SIZE; + ABlock *p1 = Malloc(sz); + ABlock *p2 = Malloc(sz); + check_ptr(p1); + check_ptr(p2); + ASSERT_NE(p1, p2); + + Free(p1); + const uint64_t hold_before_wash = cs_.get_total_hold(); + const int64_t washed_size = cs_.sync_wash(INT64_MAX); + ASSERT_GT(washed_size, 0); + ASSERT_LT(cs_.get_total_hold(), hold_before_wash); + + const uint64_t hold_after_wash = cs_.get_total_hold(); + ABlock *p3 = Malloc(sz); + ASSERT_EQ(p1, p3); + ASSERT_TRUE(p3->in_use_); + ASSERT_FALSE(p3->is_washed_); + ASSERT_GT(cs_.get_total_hold(), hold_after_wash); + + Free(p2); + Free(p3); +} + +TEST_F(TestBlockSet, SplitPurgedBlock) +{ + const uint64_t first_blocks = 100; + const uint64_t first_size = first_blocks * ABLOCK_SIZE; + const uint64_t second_size = (BLOCKS_PER_CHUNK - first_blocks) * ABLOCK_SIZE; + ABlock *p1 = Malloc(first_size); + ABlock *p2 = Malloc(second_size); + check_ptr(p1); + check_ptr(p2); + AChunk *chunk = p2->chunk(); + ASSERT_EQ(chunk, p1->chunk()); + + Free(p1); + ASSERT_EQ(static_cast(first_size), cs_.sync_wash(INT64_MAX)); + ASSERT_EQ(first_size, chunk->washed_size_); + ASSERT_EQ(1, chunk->washed_blks_); + + const uint64_t split_blocks = 40; + ABlock *p3 = Malloc(split_blocks * ABLOCK_SIZE); + ASSERT_EQ(p1, p3); + ASSERT_EQ((first_blocks - split_blocks) * ABLOCK_SIZE, chunk->washed_size_); + ASSERT_EQ(1, chunk->washed_blks_); + + ABlock *p4 = Malloc((first_blocks - split_blocks) * ABLOCK_SIZE); + ASSERT_EQ(p1 + split_blocks, p4); + ASSERT_EQ(0, chunk->washed_size_); + ASSERT_EQ(0, chunk->washed_blks_); + + Free(p2); + Free(p3); + Free(p4); +} + TEST_F(TestBlockSet, Single) { uint64_t sz = INTACT_NORMAL_AOBJECT_SIZE;