forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAIOContextPool.cpp
More file actions
172 lines (131 loc) · 4.44 KB
/
AIOContextPool.cpp
File metadata and controls
172 lines (131 loc) · 4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#if defined(OS_LINUX) || defined(__FreeBSD__)
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <Common/MemorySanitizer.h>
#include <Poco/Logger.h>
#include <boost/range/iterator_range.hpp>
#include <errno.h>
#include <IO/AIOContextPool.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
}
AIOContextPool::~AIOContextPool()
{
cancelled.store(true, std::memory_order_relaxed);
io_completion_monitor.join();
}
void AIOContextPool::doMonitor()
{
/// continue checking for events unless cancelled
while (!cancelled.load(std::memory_order_relaxed))
waitForCompletion();
/// wait until all requests have been completed
while (!promises.empty())
waitForCompletion();
}
void AIOContextPool::waitForCompletion()
{
/// array to hold completion events
std::vector<io_event> events(max_concurrent_events);
try
{
const auto num_events = getCompletionEvents(events.data(), max_concurrent_events);
fulfillPromises(events.data(), num_events);
notifyProducers(num_events);
}
catch (...)
{
/// there was an error, log it, return to any producer and continue
reportExceptionToAnyProducer();
tryLogCurrentException("AIOContextPool::waitForCompletion()");
}
}
int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) const
{
timespec timeout{timeout_sec, 0};
auto num_events = 0;
/// request 1 to `max_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno);
/// Unpoison the memory returned from a non-instrumented system call.
__msan_unpoison(events, sizeof(*events) * num_events);
return num_events;
}
void AIOContextPool::fulfillPromises(const io_event events[], const int num_events)
{
if (num_events == 0)
return;
const std::lock_guard lock{mutex};
/// look at returned events and find corresponding promise, set result and erase promise from map
for (const auto & event : boost::make_iterator_range(events, events + num_events))
{
/// get id from event
#if defined(__FreeBSD__)
const auto completed_id = (reinterpret_cast<struct iocb *>(event.udata))->aio_data;
#else
const auto completed_id = event.data;
#endif
/// set value via promise and release it
const auto it = promises.find(completed_id);
if (it == std::end(promises))
{
LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id {}", completed_id);
continue;
}
#if defined(__FreeBSD__)
it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata)));
#else
it->second.set_value(event.res);
#endif
promises.erase(it);
}
}
void AIOContextPool::notifyProducers(const int num_producers) const
{
if (num_producers == 0)
return;
if (num_producers > 1)
have_resources.notify_all();
else
have_resources.notify_one();
}
void AIOContextPool::reportExceptionToAnyProducer()
{
const std::lock_guard lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
}
std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb)
{
std::unique_lock lock{mutex};
/// get current id and increment it by one
const auto request_id = next_id;
++next_id;
/// create a promise and put request in "queue"
promises.emplace(request_id, std::promise<BytesRead>{});
/// store id in AIO request for further identification
iocb.aio_data = request_id;
struct iocb * requests[] { &iocb };
/// submit a request
while (io_submit(aio_context.ctx, 1, requests) < 0)
{
if (errno == EAGAIN)
/// wait until at least one event has been completed (or a spurious wakeup) and try again
have_resources.wait(lock);
else if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
}
return promises[request_id].get_future();
}
AIOContextPool & AIOContextPool::instance()
{
static AIOContextPool instance;
return instance;
}
}
#endif