From 08ee8d2030fbc73c4c144e819dd68806b0351cbe Mon Sep 17 00:00:00 2001 From: Yabin Cui Date: Wed, 11 Feb 2015 17:04:36 -0800 Subject: [PATCH] Switch pthread_rwlock_t to stdatomic. Bug: 19099838 Change-Id: Ie82967a60b5cec61a8bdd1e0e4a03738d01944f8 --- libc/bionic/pthread_rwlock.cpp | 234 +++++++++++++++++++++------------ libc/include/pthread.h | 8 +- tests/pthread_test.cpp | 75 +++++++++++ 3 files changed, 227 insertions(+), 90 deletions(-) diff --git a/libc/bionic/pthread_rwlock.cpp b/libc/bionic/pthread_rwlock.cpp index 0d63457d2..83243ab46 100644 --- a/libc/bionic/pthread_rwlock.cpp +++ b/libc/bionic/pthread_rwlock.cpp @@ -27,6 +27,7 @@ */ #include +#include #include "pthread_internal.h" #include "private/bionic_futex.h" @@ -52,11 +53,6 @@ * - This implementation will return EDEADLK in "write after write" and "read after * write" cases and will deadlock in write after read case. * - * TODO: VERY CAREFULLY convert this to use C++11 atomics when possible. All volatile - * members of pthread_rwlock_t should be converted to atomics<> and __sync_bool_compare_and_swap - * should be changed to compare_exchange_strong accompanied by the proper ordering - * constraints (comments have been added with the intending ordering across the code). - * * TODO: As it stands now, pending_readers and pending_writers could be merged into a * a single waiters variable. Keeping them separate adds a bit of clarity and keeps * the door open for a writer-biased implementation. @@ -105,8 +101,40 @@ int pthread_rwlockattr_getpshared(const pthread_rwlockattr_t* attr, int* pshared return 0; } +static inline atomic_int* STATE_ATOMIC_POINTER(pthread_rwlock_t* rwlock) { + static_assert(sizeof(atomic_int) == sizeof(rwlock->state), + "rwlock->state should actually be atomic_int in implementation."); + + // We prefer casting to atomic_int instead of declaring rwlock->state to be atomic_int directly. + // Because using the second method pollutes pthread.h, and causes an error when compiling libcxx. + return reinterpret_cast(&rwlock->state); +} + +static inline atomic_int* WRITER_THREAD_ID_ATOMIC_POINTER(pthread_rwlock_t* rwlock) { + static_assert(sizeof(atomic_int) == sizeof(rwlock->writer_thread_id), + "rwlock->writer_thread_id should actually be atomic_int in implementation."); + + return reinterpret_cast(&rwlock->writer_thread_id); +} + +static inline atomic_uint* PENDING_READERS_ATOMIC_POINTER(pthread_rwlock_t* rwlock) { + static_assert(sizeof(atomic_uint) == sizeof(rwlock->pending_readers), + "rwlock->pending_readers should actually be atomic_uint in implementation."); + + return reinterpret_cast(&rwlock->pending_readers); +} + +static inline atomic_uint* PENDING_WRITERS_ATOMIC_POINTER(pthread_rwlock_t* rwlock) { + static_assert(sizeof(atomic_uint) == sizeof(rwlock->pending_writers), + "rwlock->pending_writers should actually be atomic_uint in implementation."); + + return reinterpret_cast(&rwlock->pending_writers); +} + int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr_t* attr) { - if (attr != NULL) { + if (__predict_true(attr == NULL)) { + rwlock->attr = 0; + } else { switch (*attr) { case PTHREAD_PROCESS_SHARED: case PTHREAD_PROCESS_PRIVATE: @@ -117,10 +145,10 @@ int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr_t* at } } - rwlock->state = 0; - rwlock->pending_readers = 0; - rwlock->pending_writers = 0; - rwlock->writer_thread_id = 0; + atomic_init(STATE_ATOMIC_POINTER(rwlock), 0); + atomic_init(WRITER_THREAD_ID_ATOMIC_POINTER(rwlock), 0); + atomic_init(PENDING_READERS_ATOMIC_POINTER(rwlock), 0); + atomic_init(PENDING_WRITERS_ATOMIC_POINTER(rwlock), 0); return 0; } @@ -133,72 +161,87 @@ int pthread_rwlock_destroy(pthread_rwlock_t* rwlock) { } static int __pthread_rwlock_timedrdlock(pthread_rwlock_t* rwlock, const timespec* abs_timeout) { - if (__predict_false(__get_thread()->tid == rwlock->writer_thread_id)) { + if (__predict_false(__get_thread()->tid == + atomic_load_explicit(WRITER_THREAD_ID_ATOMIC_POINTER(rwlock), memory_order_relaxed))) { return EDEADLK; } timespec ts; timespec* rel_timeout = (abs_timeout == NULL) ? NULL : &ts; - bool done = false; - do { - // This is actually a race read as there's nothing that guarantees the atomicity of integer - // reads / writes. However, in practice this "never" happens so until we switch to C++11 this - // should work fine. The same applies in the other places this idiom is used. - int32_t cur_state = rwlock->state; // C++11 relaxed atomic read + + atomic_int* state_ptr = STATE_ATOMIC_POINTER(rwlock); + + while (true) { + int cur_state = atomic_load_explicit(state_ptr, memory_order_relaxed); if (__predict_true(cur_state >= 0)) { - // Add as an extra reader. - done = __sync_bool_compare_and_swap(&rwlock->state, cur_state, cur_state + 1); // C++11 memory_order_aquire + if (atomic_compare_exchange_weak_explicit(state_ptr, &cur_state, cur_state + 1, + memory_order_acquire, memory_order_relaxed)) { + return 0; + } } else { if (!timespec_from_absolute(rel_timeout, abs_timeout)) { return ETIMEDOUT; } - // Owner holds it in write mode, hang up. - // To avoid losing wake ups the pending_readers update and the state read should be - // sequentially consistent. (currently enforced by __sync_fetch_and_add which creates a full barrier) - __sync_fetch_and_add(&rwlock->pending_readers, 1); // C++11 memory_order_relaxed (if the futex_wait ensures the ordering) - int ret = __futex_wait_ex(&rwlock->state, rwlock_is_shared(rwlock), cur_state, rel_timeout); - __sync_fetch_and_sub(&rwlock->pending_readers, 1); // C++11 memory_order_relaxed + atomic_uint* pending_readers_ptr = PENDING_READERS_ATOMIC_POINTER(rwlock); + + // To avoid losing wake ups, the pending_readers increment should be observed before + // futex_wait by all threads. A seq_cst fence instead of a seq_cst operation is used + // here. Because only a seq_cst fence can ensure sequential consistency for non-atomic + // operations in futex_wait. + atomic_fetch_add_explicit(pending_readers_ptr, 1, memory_order_relaxed); + atomic_thread_fence(memory_order_seq_cst); + int ret = __futex_wait_ex(state_ptr, rwlock_is_shared(rwlock), cur_state, rel_timeout); + atomic_fetch_sub_explicit(pending_readers_ptr, 1, memory_order_relaxed); if (ret == -ETIMEDOUT) { return ETIMEDOUT; } } - } while (!done); - - return 0; + } } static int __pthread_rwlock_timedwrlock(pthread_rwlock_t* rwlock, const timespec* abs_timeout) { - int tid = __get_thread()->tid; - if (__predict_false(tid == rwlock->writer_thread_id)) { + if (__predict_false(__get_thread()->tid == + atomic_load_explicit(WRITER_THREAD_ID_ATOMIC_POINTER(rwlock), memory_order_relaxed))) { return EDEADLK; } timespec ts; timespec* rel_timeout = (abs_timeout == NULL) ? NULL : &ts; - bool done = false; - do { - int32_t cur_state = rwlock->state; + + atomic_int* state_ptr = STATE_ATOMIC_POINTER(rwlock); + + while (true) { + int cur_state = atomic_load_explicit(state_ptr, memory_order_relaxed); if (__predict_true(cur_state == 0)) { - // Change state from 0 to -1. - done = __sync_bool_compare_and_swap(&rwlock->state, 0 /* cur state */, -1 /* new state */); // C++11 memory_order_aquire + if (atomic_compare_exchange_weak_explicit(state_ptr, &cur_state, -1, + memory_order_acquire, memory_order_relaxed)) { + // writer_thread_id is protected by rwlock and can only be modified in rwlock write + // owner thread. Other threads may read it for EDEADLK error checking, atomic operation + // is safe enough for it. + atomic_store_explicit(WRITER_THREAD_ID_ATOMIC_POINTER(rwlock), __get_thread()->tid, + memory_order_relaxed); + return 0; + } } else { if (!timespec_from_absolute(rel_timeout, abs_timeout)) { return ETIMEDOUT; } - // Failed to acquire, hang up. - // To avoid losing wake ups the pending_writers update and the state read should be - // sequentially consistent. (currently enforced by __sync_fetch_and_add which creates a full barrier) - __sync_fetch_and_add(&rwlock->pending_writers, 1); // C++11 memory_order_relaxed (if the futex_wait ensures the ordering) - int ret = __futex_wait_ex(&rwlock->state, rwlock_is_shared(rwlock), cur_state, rel_timeout); - __sync_fetch_and_sub(&rwlock->pending_writers, 1); // C++11 memory_order_relaxed + + atomic_uint* pending_writers_ptr = PENDING_WRITERS_ATOMIC_POINTER(rwlock); + + // To avoid losing wake ups, the pending_writers increment should be observed before + // futex_wait by all threads. A seq_cst fence instead of a seq_cst operation is used + // here. Because only a seq_cst fence can ensure sequential consistency for non-atomic + // operations in futex_wait. + atomic_fetch_add_explicit(pending_writers_ptr, 1, memory_order_relaxed); + atomic_thread_fence(memory_order_seq_cst); + int ret = __futex_wait_ex(state_ptr, rwlock_is_shared(rwlock), cur_state, rel_timeout); + atomic_fetch_sub_explicit(pending_writers_ptr, 1, memory_order_relaxed); if (ret == -ETIMEDOUT) { return ETIMEDOUT; } } - } while (!done); - - rwlock->writer_thread_id = tid; - return 0; + } } int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock) { @@ -210,10 +253,14 @@ int pthread_rwlock_timedrdlock(pthread_rwlock_t* rwlock, const timespec* abs_tim } int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock) { - int32_t cur_state = rwlock->state; - if ((cur_state >= 0) && - __sync_bool_compare_and_swap(&rwlock->state, cur_state, cur_state + 1)) { // C++11 memory_order_acquire - return 0; + atomic_int* state_ptr = STATE_ATOMIC_POINTER(rwlock); + int cur_state = atomic_load_explicit(state_ptr, memory_order_relaxed); + + while (cur_state >= 0) { + if (atomic_compare_exchange_weak_explicit(state_ptr, &cur_state, cur_state + 1, + memory_order_acquire, memory_order_relaxed)) { + return 0; + } } return EBUSY; } @@ -227,12 +274,16 @@ int pthread_rwlock_timedwrlock(pthread_rwlock_t* rwlock, const timespec* abs_tim } int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock) { - int tid = __get_thread()->tid; - int32_t cur_state = rwlock->state; - if ((cur_state == 0) && - __sync_bool_compare_and_swap(&rwlock->state, 0 /* cur state */, -1 /* new state */)) { // C++11 memory_order_acquire - rwlock->writer_thread_id = tid; - return 0; + atomic_int* state_ptr = STATE_ATOMIC_POINTER(rwlock); + int cur_state = atomic_load_explicit(state_ptr, memory_order_relaxed); + + while (cur_state == 0) { + if (atomic_compare_exchange_weak_explicit(state_ptr, &cur_state, -1, + memory_order_acquire, memory_order_relaxed)) { + int tid = __get_thread()->tid; + atomic_store_explicit(WRITER_THREAD_ID_ATOMIC_POINTER(rwlock), tid, memory_order_relaxed); + return 0; + } } return EBUSY; } @@ -240,42 +291,53 @@ int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock) { int pthread_rwlock_unlock(pthread_rwlock_t* rwlock) { int tid = __get_thread()->tid; - bool done = false; - do { - int32_t cur_state = rwlock->state; - if (cur_state == 0) { + atomic_int* state_ptr = STATE_ATOMIC_POINTER(rwlock); + atomic_uint* pending_readers_ptr = PENDING_READERS_ATOMIC_POINTER(rwlock); + atomic_uint* pending_writers_ptr = PENDING_WRITERS_ATOMIC_POINTER(rwlock); + + int cur_state = atomic_load_explicit(state_ptr, memory_order_relaxed); + if (__predict_false(cur_state == 0)) { + return EPERM; + } else if (cur_state == -1) { + atomic_int* writer_thread_id_ptr = WRITER_THREAD_ID_ATOMIC_POINTER(rwlock); + if (atomic_load_explicit(writer_thread_id_ptr, memory_order_relaxed) != tid) { return EPERM; } - if (cur_state == -1) { - if (rwlock->writer_thread_id != tid) { + // We're no longer the owner. + atomic_store_explicit(writer_thread_id_ptr, 0, memory_order_relaxed); + // Change state from -1 to 0. + atomic_store_explicit(state_ptr, 0, memory_order_release); + goto wakeup_waiters; + + } else { // cur_state > 0 + // Reduce state by 1. + while (!atomic_compare_exchange_weak_explicit(state_ptr, &cur_state, cur_state - 1, + memory_order_release, memory_order_relaxed)) { + if (cur_state <= 0) { return EPERM; } - // We're no longer the owner. - rwlock->writer_thread_id = 0; - // Change state from -1 to 0. - // We use __sync_bool_compare_and_swap to achieve sequential consistency of the state store and - // the following pendingX loads. A simple store with memory_order_release semantics - // is not enough to guarantee that the pendingX loads are not reordered before the - // store (which may lead to a lost wakeup). - __sync_bool_compare_and_swap( &rwlock->state, -1 /* cur state*/, 0 /* new state */); // C++11 maybe memory_order_seq_cst? - - // Wake any waiters. - if (__predict_false(rwlock->pending_readers > 0 || rwlock->pending_writers > 0)) { - __futex_wake_ex(&rwlock->state, rwlock_is_shared(rwlock), INT_MAX); - } - done = true; - } else { // cur_state > 0 - // Reduce state by 1. - // See the comment above on why we need __sync_bool_compare_and_swap. - done = __sync_bool_compare_and_swap(&rwlock->state, cur_state, cur_state - 1); // C++11 maybe memory_order_seq_cst? - if (done && (cur_state - 1) == 0) { - // There are no more readers, wake any waiters. - if (__predict_false(rwlock->pending_readers > 0 || rwlock->pending_writers > 0)) { - __futex_wake_ex(&rwlock->state, rwlock_is_shared(rwlock), INT_MAX); - } - } } - } while (!done); + if (cur_state == 1) { + goto wakeup_waiters; + } + } + return 0; +wakeup_waiters: + // To avoid losing wake ups, the update of state should be observed before reading + // pending_readers/pending_writers by all threads. Use read locking as an example: + // read locking thread unlocking thread + // pending_readers++; state = 0; + // seq_cst fence seq_cst fence + // read state for futex_wait read pending_readers for futex_wake + // + // So when locking and unlocking threads are running in parallel, we will not get + // in a situation that the locking thread reads state as negative and needs to wait, + // while the unlocking thread reads pending_readers as zero and doesn't need to wake up waiters. + atomic_thread_fence(memory_order_seq_cst); + if (__predict_false(atomic_load_explicit(pending_readers_ptr, memory_order_relaxed) > 0 || + atomic_load_explicit(pending_writers_ptr, memory_order_relaxed) > 0)) { + __futex_wake_ex(state_ptr, rwlock_is_shared(rwlock), INT_MAX); + } return 0; } diff --git a/libc/include/pthread.h b/libc/include/pthread.h index 212551b81..1fe61e29c 100644 --- a/libc/include/pthread.h +++ b/libc/include/pthread.h @@ -91,10 +91,10 @@ typedef struct { pthread_mutex_t __unused_lock; pthread_cond_t __unused_cond; #endif - volatile int32_t state; // 0=unlock, -1=writer lock, +n=reader lock - volatile int32_t writer_thread_id; - volatile int32_t pending_readers; - volatile int32_t pending_writers; + int32_t state; // 0=unlock, -1=writer lock, +n=reader lock + int32_t writer_thread_id; + uint32_t pending_readers; + uint32_t pending_writers; int32_t attr; #ifdef __LP64__ char __reserved[36]; diff --git a/tests/pthread_test.cpp b/tests/pthread_test.cpp index 5dc60eeee..c507faab9 100644 --- a/tests/pthread_test.cpp +++ b/tests/pthread_test.cpp @@ -33,6 +33,8 @@ #include #include +#include + TEST(pthread, pthread_key_create) { pthread_key_t key; ASSERT_EQ(0, pthread_key_create(&key, NULL)); @@ -699,6 +701,79 @@ TEST(pthread, pthread_rwlock_smoke) { ASSERT_EQ(0, pthread_rwlock_destroy(&l)); } +struct RwlockWakeupHelperArg { + pthread_rwlock_t lock; + enum Progress { + LOCK_INITIALIZED, + LOCK_WAITING, + LOCK_RELEASED, + LOCK_ACCESSED + }; + std::atomic progress; +}; + +static void pthread_rwlock_reader_wakeup_writer_helper(RwlockWakeupHelperArg* arg) { + ASSERT_EQ(RwlockWakeupHelperArg::LOCK_INITIALIZED, arg->progress); + arg->progress = RwlockWakeupHelperArg::LOCK_WAITING; + + ASSERT_EQ(EBUSY, pthread_rwlock_trywrlock(&arg->lock)); + ASSERT_EQ(0, pthread_rwlock_wrlock(&arg->lock)); + ASSERT_EQ(RwlockWakeupHelperArg::LOCK_RELEASED, arg->progress); + ASSERT_EQ(0, pthread_rwlock_unlock(&arg->lock)); + + arg->progress = RwlockWakeupHelperArg::LOCK_ACCESSED; +} + +TEST(pthread, pthread_rwlock_reader_wakeup_writer) { + RwlockWakeupHelperArg wakeup_arg; + ASSERT_EQ(0, pthread_rwlock_init(&wakeup_arg.lock, NULL)); + ASSERT_EQ(0, pthread_rwlock_rdlock(&wakeup_arg.lock)); + wakeup_arg.progress = RwlockWakeupHelperArg::LOCK_INITIALIZED; + + pthread_t thread; + ASSERT_EQ(0, pthread_create(&thread, NULL, + reinterpret_cast(pthread_rwlock_reader_wakeup_writer_helper), &wakeup_arg)); + sleep(1); + ASSERT_EQ(RwlockWakeupHelperArg::LOCK_WAITING, wakeup_arg.progress); + wakeup_arg.progress = RwlockWakeupHelperArg::LOCK_RELEASED; + ASSERT_EQ(0, pthread_rwlock_unlock(&wakeup_arg.lock)); + + ASSERT_EQ(0, pthread_join(thread, NULL)); + ASSERT_EQ(RwlockWakeupHelperArg::LOCK_ACCESSED, wakeup_arg.progress); + ASSERT_EQ(0, pthread_rwlock_destroy(&wakeup_arg.lock)); +} + +static void pthread_rwlock_writer_wakeup_reader_helper(RwlockWakeupHelperArg* arg) { + ASSERT_EQ(RwlockWakeupHelperArg::LOCK_INITIALIZED, arg->progress); + arg->progress = RwlockWakeupHelperArg::LOCK_WAITING; + + ASSERT_EQ(EBUSY, pthread_rwlock_tryrdlock(&arg->lock)); + ASSERT_EQ(0, pthread_rwlock_rdlock(&arg->lock)); + ASSERT_EQ(RwlockWakeupHelperArg::LOCK_RELEASED, arg->progress); + ASSERT_EQ(0, pthread_rwlock_unlock(&arg->lock)); + + arg->progress = RwlockWakeupHelperArg::LOCK_ACCESSED; +} + +TEST(pthread, pthread_rwlock_writer_wakeup_reader) { + RwlockWakeupHelperArg wakeup_arg; + ASSERT_EQ(0, pthread_rwlock_init(&wakeup_arg.lock, NULL)); + ASSERT_EQ(0, pthread_rwlock_wrlock(&wakeup_arg.lock)); + wakeup_arg.progress = RwlockWakeupHelperArg::LOCK_INITIALIZED; + + pthread_t thread; + ASSERT_EQ(0, pthread_create(&thread, NULL, + reinterpret_cast(pthread_rwlock_writer_wakeup_reader_helper), &wakeup_arg)); + sleep(1); + ASSERT_EQ(RwlockWakeupHelperArg::LOCK_WAITING, wakeup_arg.progress); + wakeup_arg.progress = RwlockWakeupHelperArg::LOCK_RELEASED; + ASSERT_EQ(0, pthread_rwlock_unlock(&wakeup_arg.lock)); + + ASSERT_EQ(0, pthread_join(thread, NULL)); + ASSERT_EQ(RwlockWakeupHelperArg::LOCK_ACCESSED, wakeup_arg.progress); + ASSERT_EQ(0, pthread_rwlock_destroy(&wakeup_arg.lock)); +} + static int g_once_fn_call_count = 0; static void OnceFn() { ++g_once_fn_call_count;