Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix T-San Data Race Issues #2078

Draft
wants to merge 8 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions src/source/Ice/ConnectionListener.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,30 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener)
ATOMIC_STORE_BOOL(&pConnectionListener->terminate, TRUE);

if (IS_VALID_MUTEX_VALUE(pConnectionListener->lock)) {
MUTEX_LOCK(pConnectionListener->lock);
threadId = pConnectionListener->receiveDataRoutine;
MUTEX_UNLOCK(pConnectionListener->lock);

// TODO add support for windows socketpair
// This writes to the socketpair, kicking the POLL() out early,
// otherwise wait for the POLL to timeout
#ifndef _WIN32
socketWrite(pConnectionListener->kickSocket[CONNECTION_LISTENER_KICK_SOCKET_WRITE], msg, STRLEN(msg));
#endif

DLOGW("[TESTING] LOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine.");
// receiveDataRoutine TID should be used under pConnectionListener->lock lock.
MUTEX_LOCK(pConnectionListener->lock);
threadId = pConnectionListener->receiveDataRoutine;
// wait for thread to finish.
if (IS_VALID_TID_VALUE(threadId)) {
THREAD_JOIN(pConnectionListener->receiveDataRoutine, NULL);
THREAD_JOIN(threadId, NULL);
pConnectionListener->receiveDataRoutine = INVALID_TID_VALUE;
}

MUTEX_FREE(pConnectionListener->lock);
MUTEX_UNLOCK(pConnectionListener->lock);
DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine.");
}

DLOGW("[TESTING] LOCKING pConnectionListener->lock for closeSocket.");
MUTEX_LOCK(pConnectionListener->lock);

// TODO add support for windows socketpair
#ifndef _WIN32
if (pConnectionListener->kickSocket[CONNECTION_LISTENER_KICK_SOCKET_LISTEN] != -1) {
Expand All @@ -91,6 +96,11 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener)
}
#endif

MUTEX_UNLOCK(pConnectionListener->lock);
DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for closeSocket.");

MUTEX_FREE(pConnectionListener->lock);

MEMFREE(pConnectionListener);

*ppConnectionListener = NULL;
Expand Down Expand Up @@ -332,8 +342,13 @@ PVOID connectionListenerReceiveDataRoutine(PVOID arg)
if (canReadFd(localSocket, rfds, nfds)) {
iterate = TRUE;
while (iterate) {
DLOGW("[TESTING] LOCKING pConnectionListener->lock for recvfrom.");
MUTEX_LOCK(pConnectionListener->lock);
readLen = recvfrom(localSocket, pConnectionListener->pBuffer, pConnectionListener->bufferLen, 0,
(struct sockaddr*) &srcAddrBuff, &srcAddrBuffLen);
MUTEX_UNLOCK(pConnectionListener->lock);
DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for recvfrom.");

if (readLen < 0) {
switch (getErrorCode()) {
case EWOULDBLOCK:
Expand Down
18 changes: 17 additions & 1 deletion src/source/Rtcp/RtpRollingBuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@ STATUS rtpRollingBufferAddRtpPacket(PRtpRollingBuffer pRollingBuffer, PRtpPacket
pRawPacketCopy = NULL;

CHK_STATUS(rollingBufferAppendData(pRollingBuffer->pRollingBuffer, (UINT64) pRtpPacketCopy, &index));

CHK(pRollingBuffer->pRollingBuffer != NULL, STATUS_NULL_ARG);

// DLOGW("[TESTING] LOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex.");
MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock);
pRollingBuffer->lastIndex = index;
MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock);
// DLOGW("[TESTING] UNLOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex.");

CleanUp:
SAFE_MEMFREE(pRawPacketCopy);
Expand All @@ -90,9 +97,13 @@ STATUS rtpRollingBufferGetValidSeqIndexList(PRtpRollingBuffer pRollingBuffer, PU
PUINT64 pCurSeqIndexListPtr;
UINT16 seqNum;
UINT32 size = 0;
BOOL rollingBufferLocked = FALSE;

CHK(pRollingBuffer != NULL && pValidSeqIndexList != NULL && pSequenceNumberList != NULL, STATUS_NULL_ARG);
CHK(pRollingBuffer != NULL && pRollingBuffer->pRollingBuffer && pValidSeqIndexList != NULL && pSequenceNumberList != NULL, STATUS_NULL_ARG);

DLOGW("[TESTING] LOCKING pRollingBuffer->lock for pRollingBuffer size.");
MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock);
rollingBufferLocked = TRUE;
CHK_STATUS(rollingBufferGetSize(pRollingBuffer->pRollingBuffer, &size));
// Empty buffer, just return
CHK(size > 0, retStatus);
Expand Down Expand Up @@ -124,6 +135,11 @@ STATUS rtpRollingBufferGetValidSeqIndexList(PRtpRollingBuffer pRollingBuffer, PU
}

CleanUp:
if (rollingBufferLocked) {
MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock);
DLOGW("[TESTING] UNLOCKING pRollingBuffer->lock for pRollingBuffer size.");
}

CHK_LOG_ERR(retStatus);

if (pValidIndexListLen != NULL) {
Expand Down
5 changes: 5 additions & 0 deletions src/source/Signaling/LwsApiCalls.c
Original file line number Diff line number Diff line change
Expand Up @@ -2417,9 +2417,14 @@ STATUS wakeLwsServiceEventLoop(PSignalingClient pSignalingClient, UINT32 protoco
// Early exit in case we don't need to do anything
CHK(pSignalingClient != NULL && pSignalingClient->pLwsContext != NULL, retStatus);

DLOGW("[TESTING] LOCKING pSignalingClient->lwsSerializerLock for pSignalingClient->currentWsi.");
// currentWsi should be used under lwsSerializerLock.
MUTEX_LOCK(pSignalingClient->lwsSerializerLock);
if (pSignalingClient->currentWsi[protocolIndex] != NULL) {
lws_callback_on_writable(pSignalingClient->currentWsi[protocolIndex]);
}
MUTEX_UNLOCK(pSignalingClient->lwsSerializerLock);
DLOGW("[TESTING] UNLOCKING pSignalingClient->lwsSerializerLock for pSignalingClient->currentWsi.");

CleanUp:

Expand Down
7 changes: 5 additions & 2 deletions tst/IceFunctionalityTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,17 @@ TEST_F(IceFunctionalityTest, connectionListenerFunctionalityTest)
newConnectionCount = pConnectionListener->socketCount;
EXPECT_EQ(connectionCount, newConnectionCount);

// Keeping TSAN happy need to lock/unlock when retrieving the value of TID
// receiveDataRoutine TID should be used under pConnectionListener->lock lock.
DLOGW("[TESTING] LOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine.");
MUTEX_LOCK(pConnectionListener->lock);
threadId = pConnectionListener->receiveDataRoutine;
MUTEX_UNLOCK(pConnectionListener->lock);
EXPECT_TRUE(IS_VALID_TID_VALUE(threadId));
ATOMIC_STORE_BOOL(&pConnectionListener->terminate, TRUE);

THREAD_JOIN(threadId, NULL);
pConnectionListener->receiveDataRoutine = INVALID_TID_VALUE;
MUTEX_UNLOCK(pConnectionListener->lock);
DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine.");

EXPECT_EQ(STATUS_SUCCESS, freeConnectionListener(&pConnectionListener));

Expand Down
Loading