RATIS-2235. Allow only one thread to perform appendLog #1206
RATIS-2235. Allow only one thread to perform appendLog #1206szetszwo merged 3 commits intoapache:masterfrom
Conversation
szetszwo
left a comment
There was a problem hiding this comment.
@SzyWilliam , thanks for working this! Please see a comment inlined.
| final CompletableFuture<Void> appendLog = appendLogFuture.updateAndGet(f -> f.thenCompose(ignored -> { | ||
| final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList() | ||
| : state.getLog().append(requestRef.delegate(entries)); | ||
| return JavaUtils.allOf(futures); | ||
| })); |
There was a problem hiding this comment.
We should retain the ref first, i.e.
@@ -1641,9 +1645,8 @@ class RaftServerImpl implements RaftServer.Division,
state.updateConfiguration(entries);
}
future.join();
-
- final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
- : state.getLog().append(requestRef.delegate(entries));
+ final CompletableFuture<Void> appendLog = entries.isEmpty()? appendLogFuture.get()
+ : appendLog(requestRef.delegate(entries));
proto.getCommitInfosList().forEach(commitInfoCache::update);
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
@@ -1657,7 +1660,7 @@ class RaftServerImpl implements RaftServer.Division,
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
- return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
+ return appendLog.whenCompleteAsync((r, t) -> {
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
timer.stop();
}, getServerExecutor()).thenApply(v -> {
@@ -1675,6 +1678,13 @@ class RaftServerImpl implements RaftServer.Division,
});
}
+ private CompletableFuture<Void> appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
+ entriesRef.retain();
+ return appendLogFuture.updateAndGet(f -> f.thenCompose(
+ ignored -> JavaUtils.allOf(state.getLog().append(entriesRef))))
+ .whenComplete((v, e) -> entriesRef.release());
+ }
+
private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
// Check if a snapshot installation through state machine is in progress.
final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();|
@szetszwo Thanks for reviewing this pr and help with the code. Now I learned how to use zero-copy in async programming, thank you! |
|
The test failures seem related. Not yet sure what is the problem. |
| state.updateConfiguration(entries); | ||
| } | ||
| future.join(); | ||
| final CompletableFuture<Void> appendLog = entries.isEmpty()? appendLogFuture.get() |
There was a problem hiding this comment.
@SzyWilliam , I see the problem now -- when it is empty, it should not wait for the previous append. Otherwise, it blocks the heartbeats.
final CompletableFuture<Void> appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null)There was a problem hiding this comment.
That is a tricky one. Thanks for spotting and troubleshooting this!
|
@szetszwo the tests are all passed now, PTAL ~ |
szetszwo
left a comment
There was a problem hiding this comment.
+1 the change looks good.
|
@szetszwo Thanks for reviewing and merging this PR! |
(cherry picked from commit 9b74401)
See https://issues.apache.org/jira/browse/RATIS-2235
See the discussion at https://issues.apache.org/jira/browse/RATIS-2208