Skip to content

Commit 994257a

Browse files
authored
Prevent starvation when acquiring mutex over and over (ruby#15877)
Continually locking a mutex m can lead to starvation if all other threads are on the waitq of m. See https://bugs.ruby-lang.org/issues/21840 for more details. Solution: When a thread `T1` wakes up `T2` during mutex unlock but `T1` or any other thread successfully acquires it before `T2`, then we record the `running_time` of the thread during mutex acquisition. Then during unlock, if that thread's running_time is less than the saved running time, we set it back to the saved time. Fixes [Bug #21840]
1 parent 3c63489 commit 994257a

2 files changed

Lines changed: 55 additions & 2 deletions

File tree

test/ruby/test_thread.rb

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,4 +1664,37 @@ def test_mn_threads_sub_millisecond_sleep
16641664
assert_operator elapsed, :>=, 0.1, "sub-millisecond sleeps should not return immediately"
16651665
end;
16661666
end
1667+
1668+
# [Bug #21840]
1669+
def test_mutex_owner_doesnt_starve_waiters
1670+
assert_ruby_status([], "#{<<~"begin;"}\n#{<<~'end;'}")
1671+
begin;
1672+
m = Mutex.new
1673+
1674+
fib = lambda { |n|
1675+
return n if n <= 1
1676+
fib(n - 1) + fib(n - 2)
1677+
}
1678+
1679+
t1_running = false
1680+
t1 = Thread.new do
1681+
t1_running = true
1682+
loop do
1683+
fib(20)
1684+
m.synchronize do
1685+
File.open(__FILE__) { } # reset timeslice due to blocking operation
1686+
end
1687+
end
1688+
end
1689+
1690+
loop until t1_running
1691+
1692+
3.times.map do
1693+
Thread.new do
1694+
m.synchronize do
1695+
end
1696+
end
1697+
end.each(&:join)
1698+
end;
1699+
end
16671700
end

thread_sync.c

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ typedef struct rb_mutex_struct {
1010
rb_thread_t *th; // even if the fiber is collected, we might need access to the thread in mutex_free
1111
struct rb_mutex_struct *next_mutex;
1212
struct ccan_list_head waitq; /* protected by GVL */
13+
uint32_t saved_running_time_us;
14+
bool wait_waking; // Is there a thread waiting to be woken up by this mutex? Reset during every wakeup.
1315
} rb_mutex_t;
1416

1517
/* sync_waiter is always on-stack */
@@ -212,8 +214,15 @@ mutex_locked(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
212214
static inline bool
213215
do_mutex_trylock(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
214216
{
217+
// NOTE: we can successfully lock a mutex even if there are other threads waiting on it. First one to it wins.
215218
if (mutex->ec_serial == 0) {
216219
RUBY_DEBUG_LOG("%p ok", mutex);
220+
if (mutex->wait_waking) {
221+
// If we acquired `mutex` without contention and before the thread that was popped off the waitq, we're going
222+
// to set our running_time back to what it was here during mutex unlock if it got reset during our critical
223+
// section. This is to prevent starvation of other threads waiting on the mutex.
224+
mutex->saved_running_time_us = th->running_time_us;
225+
}
217226

218227
mutex_locked(mutex, th, ec_serial);
219228
return true;
@@ -350,7 +359,8 @@ do_mutex_lock(struct mutex_args *args, int interruptible_p)
350359
}
351360
ccan_list_del(&sync_waiter.node);
352361

353-
// unlocked by another thread while sleeping
362+
// If mutex->ec_serial != 0, the mutex was locked by another thread before we had the chance to acquire it.
363+
// We'll put ourselves on the waitq and sleep again.
354364
if (!mutex->ec_serial) {
355365
mutex_set_owner(mutex, th, ec_serial);
356366
}
@@ -391,6 +401,7 @@ do_mutex_lock(struct mutex_args *args, int interruptible_p)
391401

392402
if (saved_ints) th->ec->interrupt_flag = saved_ints;
393403
if (mutex->ec_serial == ec_serial) mutex_locked(mutex, th, ec_serial);
404+
mutex->wait_waking = false;
394405
}
395406

396407
RUBY_DEBUG_LOG("%p locked", mutex);
@@ -454,6 +465,15 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
454465

455466
struct sync_waiter *cur = 0, *next;
456467

468+
469+
if (mutex->wait_waking) {
470+
uint32_t saved = mutex->saved_running_time_us;
471+
if (th->running_time_us < saved) {
472+
th->running_time_us = saved;
473+
}
474+
}
475+
476+
mutex->saved_running_time_us = 0;
457477
mutex->ec_serial = 0;
458478
thread_mutex_remove(th, mutex);
459479

@@ -469,6 +489,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
469489
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
470490
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
471491
RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
492+
mutex->wait_waking = true;
472493
rb_threadptr_interrupt(cur->th);
473494
return NULL;
474495
case THREAD_STOPPED: /* probably impossible */
@@ -480,7 +501,6 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
480501
}
481502
}
482503
}
483-
484504
// We did not find any threads to wake up, so we can just return with no error:
485505
return NULL;
486506
}

0 commit comments

Comments
 (0)