-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathmessagequeue.h
More file actions
113 lines (101 loc) · 2.98 KB
/
messagequeue.h
File metadata and controls
113 lines (101 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// MessageQueue is a thread-safe queue of messages.
// Copyright (c) 2015 C. Scott Ananian <[email protected]>
#ifndef NODE_PHP_EMBED_MESSAGEQUEUE_H_
#define NODE_PHP_EMBED_MESSAGEQUEUE_H_
#include <cassert>
#include <list>
#include "nan.h"
#include "src/macros.h"
namespace node_php_embed {
class Message;
// A queue of messages passed between threads.
class MessageQueue {
public:
explicit MessageQueue(uv_async_t *async)
: async_(async), data_(), shutdown_(false) {
uv_mutex_init(&lock_);
uv_cond_init(&cond_);
}
virtual ~MessageQueue() {
uv_cond_destroy(&cond_);
uv_mutex_destroy(&lock_);
}
inline uv_async_t *async() { return async_; }
void Push(Message *m) {
assert(m);
_Push(m);
}
void Notify() {
_Push(nullptr);
}
// Processes all messages on the queue.
// Returns true if at least one message was processed.
// If `match` is non null, it will block if the queue is empty and
// continue processing messages until `match->IsProcessed` is true.
template<typename Func>
bool DoProcess(Message *match, Func func) {
bool sawOne = false, loop = true;
Message *m;
while (loop) {
// Grab one message at a time, so that we don't end up processing
// messages out of order in case `func(m)` below ends up creating
// a recursive processing loop.
uv_mutex_lock(&lock_);
if (data_.empty()) {
m = nullptr;
if (match) {
// We're blocking for a particular message, and there's nothing here.
// Block to wait for some data.
uv_cond_wait(&cond_, &lock_);
} else {
loop = false;
}
} else {
sawOne = true;
m = data_.front();
data_.pop_front();
}
uv_mutex_unlock(&lock_);
if (m) { func(m); }
// Check whether either we processed the matching message,
// or else a recursive processing loop handled it for us.
if (match && match->IsProcessed()) { loop = false; }
}
return sawOne;
}
// Shutdown the queue: no more messages will be pushed
// after this method is called.
void Shutdown() {
uv_mutex_lock(&lock_);
shutdown_ = true;
uv_mutex_unlock(&lock_);
}
private:
void _Push(Message *m) {
bool was_shutdown = false;
uv_mutex_lock(&lock_);
if (!shutdown_) {
if (m) { data_.push_back(m); }
uv_cond_broadcast(&cond_);
// on a shutdown message, async_ could be torn down as soon
// as the other thread wakes up, so do the send inside the lock.
if (async_) { uv_async_send(async_); }
} else {
was_shutdown = true;
}
uv_mutex_unlock(&lock_);
if (was_shutdown) {
// Shouldn't happen (but assert after releasing the lock)
NPE_ERROR("Push after shutdown :(");
assert(false);
}
assert(!was_shutdown);
}
uv_async_t *async_;
uv_mutex_t lock_;
uv_cond_t cond_;
std::list<Message *> data_;
bool shutdown_;
};
} // namespace node_php_embed
#endif // NODE_PHP_EMBED_MESSAGEQUEUE_H_