TI AM335x: Thread 呼叫 QMutex.lock() 後就卡住
程式 scheduling 設成 SCHED_RR。情況是這樣,有一個 thread 它是要接收從 socket 過來的資料,它平常是 block 等待在 accept。當有請求來了之後,會接收,然後使用 QMutex.lock() 嘗試取得一個共享資源。但這樣的程式碼,在 console 下直接執行程式都沒有問題。然而程式如果是由 init.d 中的 script 叫起來,該 thread 就會卡在 QMutex.lock() 之中。
按照經驗,看起來又是這個 thread 被 scheduling 切走之後,就再也回不來了。通常我都會先嘗試呼叫 sleep 0s 讓它 timing 稍微不一樣,來看看是不是可以解決。再試果然就不會卡住了。但為什麼呢?為了更詳細地看到底發生何事,我嘗試用 ftrace 的技巧來看看發生了什麼。
首先讓程式在 init.d 中被執行,發現該 thread 最後一個 syscall 是 sys_enter_futex,接著 scheduling 就把這個 thread 切走,執行其它 thread 了。所以我的猜測沒錯,它呼叫了 futex 之後,就卡住了。
再來一樣在 init.d 中執行程式,但是在取得 QMutex.lock() 之前先 sleep 0s 一下。透過 trace-cmd 看結果,發現 sys_enter_futex 沒有被呼叫到!該 thread 很快樂地做 QMutex.lock() 之後要做的事情。
一開始我很困惑,這與我想像的不同。於是我翻出 qt source code 來看,經過一翻頭昏腦脹之後,總算稍微看到為什麼了。
首先 QMutex 的實作在
qtbase\include\QtCore\qmutex.h
之中。我們看看它的 lock() 實作:
第一個被叫到的是 fastTryLock,它是一個 inline 函式:
這個 d_ptr 是:
好,它是 qt 一個 atomic pointer。fastTryLock 又再呼叫了 testAndSetAcquire。另外還有 dummyLocked():
嗯,開始頭昏中。然後不小心在
qtbase\src\corelib\thread\qmutex_linux.cpp
看到下面的註解:
重點我用紅字標出來。它說一個 non-recursive (就像我們用的) 會先利用 testAndSetAcquire 判斷值是不是 0,如果是則將它設為 1,代表 lock 取得了。如果值一開始不是 1,代表已有人取到這個 lock,所以接下來就會呼叫 lockInternal()。
看到這邊有個小概念產生,qt 內部的 lock 事實上是對一個 atomic pointer 做操作。如果它是 0,代表沒人用,於是 qt 就將它設成 1,代表有人用了。由於是 atomic 操作,所以可以保證是 thread-safe 的。那有人用的情況之下,才會再進而呼叫 lockInternal()。那這個函式的實作是:
好,這個函式總算讓我們看到了一個重點:_q_futex。它內容是:
Bingo!lockInternal() 最後是呼叫 sys_enter_futex 的 syscall 無誤。
看到這邊,就可以回過來解釋為什麼同樣是 QMutex.lock(),在 trace-cmd 結果中卻不一定會看見 sys_enter_futex 這個 syscall。因為如果該 QMutex 還未被佔用,則不需要呼叫 sys_enter_futex 啊!
但是,看到這裡仍然無法解釋,為何呼叫了 sys_enter_futex 之後就卡住了。只能說我們在 lock() 之前先 sleep 0s,造成原本佔用此 QMutex 的情形消失了,於是我們就很順利地取得 lock,接著做要做的事。
目前就先這樣頂著用吧!XD
按照經驗,看起來又是這個 thread 被 scheduling 切走之後,就再也回不來了。通常我都會先嘗試呼叫 sleep 0s 讓它 timing 稍微不一樣,來看看是不是可以解決。再試果然就不會卡住了。但為什麼呢?為了更詳細地看到底發生何事,我嘗試用 ftrace 的技巧來看看發生了什麼。
首先讓程式在 init.d 中被執行,發現該 thread 最後一個 syscall 是 sys_enter_futex,接著 scheduling 就把這個 thread 切走,執行其它 thread 了。所以我的猜測沒錯,它呼叫了 futex 之後,就卡住了。
再來一樣在 init.d 中執行程式,但是在取得 QMutex.lock() 之前先 sleep 0s 一下。透過 trace-cmd 看結果,發現 sys_enter_futex 沒有被呼叫到!該 thread 很快樂地做 QMutex.lock() 之後要做的事情。
一開始我很困惑,這與我想像的不同。於是我翻出 qt source code 來看,經過一翻頭昏腦脹之後,總算稍微看到為什麼了。
首先 QMutex 的實作在
qtbase\include\QtCore\qmutex.h
之中。我們看看它的 lock() 實作:
void QMutex::lock() QT_MUTEX_LOCK_NOEXCEPT
{
QMutexData *current;
if (fastTryLock(current))
return;
if (QT_PREPEND_NAMESPACE(isRecursive)(current))
static_cast<QRecursiveMutexPrivate *>(current)->lock(-1);
else
lockInternal();
}
第一個被叫到的是 fastTryLock,它是一個 inline 函式:
inline bool fastTryLock(QMutexData *¤t) Q_DECL_NOTHROW {
return d_ptr.testAndSetAcquire(0, dummyLocked(), current);
}
這個 d_ptr 是:
QBasicAtomicPointer<QMutexData> d_ptr;
好,它是 qt 一個 atomic pointer。fastTryLock 又再呼叫了 testAndSetAcquire。另外還有 dummyLocked():
bool testAndSetAcquire(Type expectedValue, Type newValue, Type ¤tValue) Q_DECL_NOTHROW
{ return Ops::testAndSetAcquire(_q_value, expectedValue, newValue, ¤tValue); }
static inline QMutexData *dummyLocked() {
return reinterpret_cast<QMutexData *>(quintptr(1));
}
嗯,開始頭昏中。然後不小心在
qtbase\src\corelib\thread\qmutex_linux.cpp
看到下面的註解:
/*
* QBasicMutex implementation on Linux with futexes
*
* QBasicMutex contains one pointer value, which can contain one of four
* different values:
* 0x0 unlocked, non-recursive mutex
* 0x1 locked non-recursive mutex, no waiters
* 0x3 locked non-recursive mutex, at least one waiter
* > 0x3 recursive mutex, points to a QMutexPrivate object
*
* LOCKING (non-recursive):
*
* A non-recursive mutex starts in the 0x0 state, indicating that it's
* unlocked. When the first thread attempts to lock it, it will perform a
* testAndSetAcquire from 0x0 to 0x1. If that succeeds, the caller concludes
* that it successfully locked the mutex. That happens in fastTryLock().
*
* If that testAndSetAcquire fails, QBasicMutex::lockInternal is called.
*
* lockInternal will examine the value of the pointer. Otherwise, it will use
* futexes to sleep and wait for another thread to unlock. To do that, it needs
* to set a pointer value of 0x3, which indicates that thread is waiting. It
* does that by a simple fetchAndStoreAcquire operation.
*
* If the pointer value was 0x0, it means we succeeded in acquiring the mutex.
* For other values, it will then call FUTEX_WAIT and with an expected value of
* 0x3.
*
* If the pointer value changed before futex(2) managed to sleep, it will
* return -1 / EWOULDBLOCK, in which case we have to start over. And even if we
* are woken up directly by a FUTEX_WAKE, we need to acquire the mutex, so we
* start over again.
*
* UNLOCKING (non-recursive):
*
* To unlock, we need to set a value of 0x0 to indicate it's unlocked. The
* first attempt is a testAndSetRelease operation from 0x1 to 0x0. If that
* succeeds, we're done.
*
* If it fails, unlockInternal() is called. The only possibility is that the
* mutex value was 0x3, which indicates some other thread is waiting or was
* waiting in the past. We then set the mutex to 0x0 and perform a FUTEX_WAKE.
*/
重點我用紅字標出來。它說一個 non-recursive (就像我們用的) 會先利用 testAndSetAcquire 判斷值是不是 0,如果是則將它設為 1,代表 lock 取得了。如果值一開始不是 1,代表已有人取到這個 lock,所以接下來就會呼叫 lockInternal()。
看到這邊有個小概念產生,qt 內部的 lock 事實上是對一個 atomic pointer 做操作。如果它是 0,代表沒人用,於是 qt 就將它設成 1,代表有人用了。由於是 atomic 操作,所以可以保證是 thread-safe 的。那有人用的情況之下,才會再進而呼叫 lockInternal()。那這個函式的實作是:
void QBasicMutex::lockInternal() Q_DECL_NOTHROW
{
Q_ASSERT(!isRecursive());
lockInternal_helper<false>(d_ptr);
}
template <bool IsTimed> static inlinebool lockInternal_helper(QBasicAtomicPointer<QMutexData> &d_ptr, int timeout = -1, QElapsedTimer *elapsedTimer = 0) Q_DECL_NOTHROW{if (!IsTimed)timeout = -1;// we're here because fastTryLock() has just failedif (timeout == 0)return false;struct timespec ts, *pts = 0;if (IsTimed && timeout > 0) {ts.tv_sec = timeout / 1000;ts.tv_nsec = (timeout % 1000) * 1000 * 1000;}
// the mutex is locked already, set a bit indicating we're waitingwhile (d_ptr.fetchAndStoreAcquire(dummyFutexValue()) != 0) {if (IsTimed && pts == &ts) {// recalculate the timeoutqint64 xtimeout = qint64(timeout) * 1000 * 1000;xtimeout -= elapsedTimer->nsecsElapsed();if (xtimeout <= 0) {// timer expired after we returnedreturn false;}
ts.tv_sec = xtimeout / Q_INT64_C(1000) / 1000 / 1000;ts.tv_nsec = xtimeout % (Q_INT64_C(1000) * 1000 * 1000);}
if (IsTimed && timeout > 0)pts = &ts;// successfully set the waiting bit, now sleepint r = _q_futex(&d_ptr, FUTEX_WAIT, quintptr(dummyFutexValue()), pts);if (IsTimed && r != 0 && errno == ETIMEDOUT)return false;// we got woken up, so try to acquire the mutex// note we must set to dummyFutexValue because there could be other threads// also waiting}
Q_ASSERT(d_ptr.load());return true;}
好,這個函式總算讓我們看到了一個重點:_q_futex。它內容是:
static inline int _q_futex(void *addr, int op, int val, const struct timespec *timeout) Q_DECL_NOTHROW
{
volatile int *int_addr = reinterpret_cast<volatile int *>(addr);
#if Q_BYTE_ORDER == Q_BIG_ENDIAN && QT_POINTER_SIZE == 8
int_addr++; //We want a pointer to the 32 least significant bit of QMutex::d
#endif
int *addr2 = 0;
int val2 = 0;
// we use __NR_futex because some libcs (like Android's bionic) don't
// provide SYS_futex etc.
return syscall(__NR_futex, int_addr, op | futexFlags(), val, timeout, addr2, val2);
}
Bingo!lockInternal() 最後是呼叫 sys_enter_futex 的 syscall 無誤。
看到這邊,就可以回過來解釋為什麼同樣是 QMutex.lock(),在 trace-cmd 結果中卻不一定會看見 sys_enter_futex 這個 syscall。因為如果該 QMutex 還未被佔用,則不需要呼叫 sys_enter_futex 啊!
但是,看到這裡仍然無法解釋,為何呼叫了 sys_enter_futex 之後就卡住了。只能說我們在 lock() 之前先 sleep 0s,造成原本佔用此 QMutex 的情形消失了,於是我們就很順利地取得 lock,接著做要做的事。
目前就先這樣頂著用吧!XD
留言
張貼留言