线程同步和互斥

线程同步是指多线程通过特定的设置(如互斥量,事件对象,临界区)来控制线程之间的执行顺序(即所谓的同步),也可以说是在线程之间通过同步建立起执行顺序的关系。

线程互斥是指对于共享的进程系统资源,在各单个线程访问时的排它性。当有若干个线程都要使用某一共享资源时,任何时刻最多只允许一个线程去使用,其它要使用该资源的线程必须等待,直到占用资源者释放该资源。线程互斥可以看成是一种特殊的线程同步。

线程同步和互斥的方法或机制

  1. 临界区(Critical Section):通过对多线程的串行化来访问公共资源或一段代码,速度快,适合控制数据访问。
  2. 互斥锁(Mutex):为协调共同对一个共享资源的单独访问而设计的。
  3. 读写锁(rwlock):为能明确区分读操作和写操作的场景而设计,多用于读多写少的场景。
  4. 信号量(Semaphore):为控制一个具有有限数量用户资源而设计。
  5. 事件对象(Event):用来通知线程有一些事件已发生,从而启动后继任务的开始。
  6. 条件变量(Condition):通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足。

临界区(Critical Section)

每个进程中访问临界资源的那段代码称为临界区。当有线程进入临界区段时,其他线程或是进程必须等待。

临界区与互斥锁的区别:

  1. 临界区只能用于对象在同一进程里线程间的互斥访问;互斥锁可以用于对象进程间或线程间的互斥访问。
  2. 临界区是非内核对象,只在用户态进行锁操作,速度快;互斥锁是内核对象,在核心态进行锁操作,速度慢。
  3. 临界区和互斥锁在 Windows 平台都下可用;Linux下只有互斥锁可用。

注意:临界区同步速度很快,但只能用来同步本进程内的线程,而不可用来同步多个进程中的线程。

条件变量(Condition)

条件变量常与互斥锁同时使用,达到线程同步的目的。

条件变量与互斥锁的区别:

  1. 互斥锁是为上锁而优化的,条件变量是为等待而优化的。

  2. 互斥锁,条件变量都只用于同一个进程的各线程间。

  3. 互斥锁必须是谁上锁就由谁来解锁,而信号量的wait和post操作不必由同一个线程执行。

  4. 信号量既可用于上锁,也可用于等待,因此会有更多的开销和更高的复杂性。
    ,而信号量(有名信号量)可用于不同进程间的同步。当信号量用于进程间同步时,要求信号量建立在共享内存区。

跨平台封装实现

互斥锁(Mutex/CriticalSection)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// CMutex.h

#if !defined(_WIN32) && !defined(_WIN64)
#include <pthread.h>
#else
#include <Windows.h>
#endif
#include <string.h>

class CCondition;
class CMutex
{
public:
CMutex(bool bRecursive = true)
{

#if !defined(_WIN32) && !defined(_WIN64)
pthread_mutexattr_t mAttr ;
memset(&mAttr,0,sizeof(mAttr));
pthread_mutexattr_init(&mAttr);
// setup recursive mutex for mutex attribute
#if defined(__APPLE__) || defined(__ORBIS__)
pthread_mutexattr_settype(&mAttr, PTHREAD_MUTEX_RECURSIVE);
#else
pthread_mutexattr_settype(&mAttr, PTHREAD_MUTEX_RECURSIVE_NP);
#endif
// Use the mutex attribute to create the mutex
if(bRecursive)
{
pthread_mutex_init(&m_pMutex, &mAttr);
}
else
{
pthread_mutex_init(&m_pMutex, NULL);
}

// Mutex attribute can be destroy after initializing the mutex variable
pthread_mutexattr_destroy(&mAttr);

//Unlock();
#else
InitializeCriticalSection(&m_criticalSection);
#endif

}
~CMutex()
{
//Unlock();
#if !defined(_WIN32) && !defined(_WIN64)
pthread_mutex_destroy(&m_pMutex);
#else
DeleteCriticalSection(&m_criticalSection);
#endif
}

public:
void Lock()
{

#if !defined(_WIN32) && !defined(_WIN64)
pthread_mutex_lock(&m_pMutex);
#else
EnterCriticalSection(&m_criticalSection);
#endif
}
void Unlock()
{

#if !defined(_WIN32) && !defined(_WIN64)
pthread_mutex_unlock(&m_pMutex);
#else
LeaveCriticalSection(&m_criticalSection);
#endif
}

public:

#if !defined(_WIN32) && !defined(_WIN64)
pthread_mutex_t m_pMutex;
#else
CRITICAL_SECTION m_criticalSection;
#endif

friend class CCondition;
};

自动锁(AutoMutex)

利用类的构造与析构,实现锁的自动 lock 与 unlock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// CCritical.h

#include "CMutex.h"

class CCritical
{
public:
CCritical(CMutex* pMutex)
{
m_pMutex = pMutex;
if (m_pMutex != NULL)
{
m_pMutex->Lock();
}
}
~CCritical()
{
if (m_pMutex != NULL)
{
m_pMutex->Unlock();
}
}

private:
CMutex* m_pMutex;
};

条件变量(Condition)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// CCondition.h

#if !defined(_WIN32) && !defined(_WIN64)
#include <pthread.h>
#include <sys/time.h>
#else
#include <Windows.h>
#endif

#include "CMutex.h"

class CCondition
{
public:
CCondition(CMutex* pMutex, bool autoLock = true)
{
m_autoLock = autoLock;
m_pMutex = pMutex;
#if !defined(_WIN32) && !defined(_WIN64)
pthread_cond_init(&m_pCond, 0);
#else
InitializeConditionVariable(&m_conditionVariable);
#endif
}

~CCondition()
{
#if !defined(_WIN32) && !defined(_WIN64)
pthread_cond_destroy(&m_pCond);
#endif
}

public:
void Wait()
{
if (m_pMutex)
{

if(m_autoLock)
{
m_pMutex->Lock();
}
#if !defined(_WIN32) && !defined(_WIN64)
pthread_cond_wait(&m_pCond, &(m_pMutex->m_pMutex));
#else
SleepConditionVariableCS(&m_conditionVariable, &(m_pMutex->m_criticalSection), INFINITE);
#endif
if (m_autoLock)
{
m_pMutex->Unlock();
}
}
}

void TimeWait(uint32_t milliseconds) //tw is millSecond
{
if (m_pMutex)
{
m_pMutex->Lock();
#if !defined(_WIN32) && !defined(_WIN64)
long inv_sec = milliseconds / 1000;
long inv_nsec = (milliseconds % 1000) * 1000000;

struct timeval now;
gettimeofday(&now, NULL);
long now_sec = now.tv_sec;
long now_nsec = now.tv_usec * 1000;

struct timespec ts;
ts.tv_sec = now_sec + inv_sec + (now_nsec+inv_nsec)/1000000000;
ts.tv_nsec = (now_nsec + inv_nsec) % 1000000000;

pthread_cond_timedwait(&m_pCond, &(m_pMutex->m_pMutex), &ts);
#else
SleepConditionVariableCS(&m_conditionVariable, &(m_pMutex->m_criticalSection), milliseconds);
#endif
m_pMutex->Unlock();
}
}

void Set()
{
if (m_pMutex)
{
m_pMutex->Lock();
#if !defined(_WIN32) && !defined(_WIN64)
pthread_cond_broadcast(&m_pCond);
#else
WakeAllConditionVariable(&m_conditionVariable);
#endif
m_pMutex->Unlock();
}

}

void WakeUp()
{
if (m_pMutex)
{
m_pMutex->Lock();
#if !defined(_WIN32) && !defined(_WIN64)
pthread_cond_signal(&m_pCond);
#else
WakeConditionVariable(&m_conditionVariable);
#endif
m_pMutex->Unlock();
}
}

private:

#if !defined(_WIN32) && !defined(_WIN64)
pthread_cond_t m_pCond;
#else
CONDITION_VARIABLE m_conditionVariable;
#endif
CMutex* m_pMutex;
bool m_autoLock;
};

事件(Event)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// CEvent.h

#include "CMutex.h"

class CEvent
{
public:
CEvent()
#if !defined(_WIN32) && !defined(_WIN64)
:m_Mutex(false)
#endif
{
#if !defined(_WIN32) && !defined(_WIN64)
m_pCond = new CCondition(&m_Mutex);
#else
m_hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
#endif

}

~CEvent()
{
#if !defined(_WIN32) && !defined(_WIN64)
if(m_pCond != 0)
{
delete m_pCond;
m_pCond = NULL;
}
#else
CloseHandle(m_hEvent);
#endif
}

public:
void Wait()
{
#if !defined(_WIN32) && !defined(_WIN64)
if(m_pCond != 0)
{
m_pCond->Wait();
}
#else
WaitForSingleObject(m_hEvent, INFINITE);
#endif
}

void TimeWait(uint32_t tw) //millisecond
{
#if !defined(_WIN32) && !defined(_WIN64)
if(m_pCond != 0)
{
m_pCond->TimeWait(tw);
}
#else
ResetEvent(m_hEvent);
WaitForSingleObject(m_hEvent, tw);
#endif
}

void Set()
{
#if !defined(_WIN32) && !defined(_WIN64)
if(m_pCond != 0)
{
m_pCond->Set();
}
#else
SetEvent(m_hEvent);
#endif
}

void Reset()
{
#if !defined(_WIN32) && !defined(_WIN64)
#else
ResetEvent(m_hEvent);
#endif
}

private:
#if !defined(_WIN32) && !defined(_WIN64)
CMutex m_Mutex;
CCondition* m_pCond;
#else
HANDLE m_hEvent;
#endif
};

参考文献

https://cloud.tencent.com/developer/article/1399240