forked from cryfs/cryfs
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThreadSystem.cpp
More file actions
112 lines (99 loc) · 4.55 KB
/
ThreadSystem.cpp
File metadata and controls
112 lines (99 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include "ThreadSystem.h"
#include "../logging/logging.h"
#include "debugging.h"
using std::function;
using std::string;
using namespace cpputils::logging;
namespace cpputils {
ThreadSystem &ThreadSystem::singleton() {
static ThreadSystem system;
return system;
}
ThreadSystem::ThreadSystem(): _runningThreads(), _mutex() {
#if !defined(_MSC_VER)
//Stopping the thread before fork() (and then also restarting it in the parent thread after fork()) is important,
//because as a running thread it might hold locks or condition variables that won't play well when forked.
pthread_atfork(&ThreadSystem::_onBeforeFork, &ThreadSystem::_onAfterFork, &ThreadSystem::_onAfterFork);
#else
// not needed on windows because we don't fork
#endif
}
ThreadSystem::Handle ThreadSystem::start(function<bool()> loopIteration, string threadName) {
boost::unique_lock<boost::mutex> lock(_mutex);
auto thread = _startThread(loopIteration, threadName);
_runningThreads.push_back(RunningThread{std::move(threadName), std::move(loopIteration), std::move(thread)});
return std::prev(_runningThreads.end());
}
void ThreadSystem::stop(Handle handle) {
boost::unique_lock<boost::mutex> lock(_mutex);
boost::thread thread = std::move(handle->thread);
thread.interrupt();
_runningThreads.erase(handle);
//It's fine if another thread gets the mutex while we still wait for the join. Joining doesn't change any internal state.
lock.unlock();
thread.join();
}
void ThreadSystem::_onBeforeFork() {
singleton()._stopAllThreadsForRestart();
}
void ThreadSystem::_onAfterFork() {
singleton()._restartAllThreads();
}
void ThreadSystem::_stopAllThreadsForRestart() {
_mutex.lock(); // Is unlocked in the after-fork handler. This way, the whole fork() is protected.
for (RunningThread &thread : _runningThreads) {
if (boost::this_thread::get_id() == thread.thread.get_id()) {
// This means fork was called from within one of our _runningThreads.
// We cannot wait or ourselves to die.
// Forking from within a thread is usually chaos since the forked process only gets a copy
// of the calling thread as its new main thread. So we (hopefully) never should do this.
// This is, however, a valid pattern when fork() is directly followed by an exec().
// So let's just ignore this situation and continue as if nothing happened, assuming an exec()
// follows soon.
continue;
}
thread.thread.interrupt();
}
for (RunningThread &thread : _runningThreads) {
if (boost::this_thread::get_id() == thread.thread.get_id()) {
// This means fork was called from within one of our _runningThreads. See comment above.
continue;
}
thread.thread.join();
}
}
void ThreadSystem::_restartAllThreads() {
for (RunningThread &thread : _runningThreads) {
if (thread.thread.joinable()) {
// Because all non-self threads have been terminated in _stopAllThreadsForRestart,
// this means fork was called from within one of our _runningThreads. See comment above.
continue;
}
thread.thread = _startThread(thread.loopIteration, thread.threadName);
}
_mutex.unlock(); // Was locked in the before-fork handler
}
boost::thread ThreadSystem::_startThread(function<bool()> loopIteration, const string& threadName) {
return boost::thread([loopIteration = std::move(loopIteration), threadName] {
cpputils::set_thread_name(threadName.c_str());
ThreadSystem::_runThread(loopIteration);
});
}
void ThreadSystem::_runThread(function<bool()> loopIteration) {
try {
bool cont = true;
while(cont) {
boost::this_thread::interruption_point();
cont = loopIteration(); // This might also be interrupted.
}
//The thread is terminated gracefully.
} catch (const boost::thread_interrupted &e) {
//Do nothing, exit thread.
} catch (const std::exception &e) {
LOG(ERR, "LoopThread crashed: {}", e.what());
} catch (...) {
LOG(ERR, "LoopThread crashed");
}
//TODO We should remove the thread from _runningThreads here, not in stop().
}
}