-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add disposeGracefully method to Scheduler #3089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Some `Disposable`s should be disposed with a chance to clean up the underlying resources. At the same time it is desired to coordinate logic that depends on successful disposal. Specifically, instances of `Scheduler` should allow shutting down by not accepting new work, but giving the currently executing tasks to finish without interruption. Therefore, a `Disposable.Graceful` interface has been added, that provides the means to do a timely cleanup and observing the result via a `Mono<Void> disposeGracefully(Duration)` method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is shaping up good, but the review triggers a few thoughts on my part and surface additional corner cases 🤔
reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Outdated
Show resolved
Hide resolved
int size(); | ||
} | ||
|
||
// TODO(dj): add javadoc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the semantics of disposeGracefully may vary widely, so I would make the documented contract say that explicitly (eg. "each class implementing this trait should define how subsequent calls behave during the grace period and after it")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one limitation I'm thinking of with this API is that most of the time the underlying resource(s) being disposed gracefully will be atomically swapped out. Which means that even if one re-subscribes to the Mono
, including with onErrorResume()
or retry()
, the underlying resources won't be reachable anymore.
thus, there will be no way of composing operators to fall back to a "hard" shutdown once the graceful shutdown is initiated.
I'm thinking this is fine if documented. The recommendation for implementors should probably be to trigger a hard dispose at the end of the gracePeriod
THEN propagate a TimeoutException
, noting that it only serves as a warning / logging but cannot be recovered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed the possible contracts here with @OlegDokuka. The approach with forceful shutdown before propagating the TimeoutException
and non-recoverable errors might be confusing to users if they don't read the specific Scheduler
documentation, but it has some advantages implementation wise.
Another approach could be to propagate a retry-able error and allow re-initiating the shutdown()
+ awaitTermination(...)
procedure, while also allowing for an explicit final call to explicit shutdownNow()
when desired. I'll go back to the original issue and ask for opinion from the user's perspective to guide the design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My personal opinion is that once we fix specific behavior (e.g. call shutdownNow()
if a timeout or InteruptedException
) then we probably will end up with everyone doing scheduler.disposeGracefully(Duration.ofHours(9999999)).subscribe()
or then complaining that they did not wont to have shutdownNow
called but rather retry later
Another thought on TimoutException
- any exception is useless if we can not do anything useful after that. I'm not sure that logging such an event makes any sense. This exception is just a fact that we forced shutdown process so a user just has to take it. Also, taking into account the impl details - all other active subscribers are going to get the same notification but the other late subscriber will not get it, then it is going to be too confusing so even having it documented will not resolve this confusion.
My personal recommendation is to prefer flexibility over fixed behavior. One can always write something like the following to mimic what we can hardcode
scheduler.disposeGracefully(Duration.ofMillis(100))
.retryWhen(Retry.backoff(5, Duration.ofMillis(50)))
.onErrorResume(e -> Mono.fromRunnable(scheduler::dispose));
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Outdated
Show resolved
Hide resolved
@chemicL what do you think of this piece of code to shutdown multiple executors at once, try to await as close to the grace period as possible while still only needing one thread, and finally //this supposes that we somehow can get all the executors and swap them with an empty array
//with more advanced schedulers like BoundedElasticScheduler, we might not get an ExecutorService array
//but an array of another resource (like BoundedState[]), hence the Function
static <RES> void shutdownAndAwait(final RES[] resources, Function<RES, ExecutorService> serviceExtractor, Duration gracePeriod, Sinks.Empty<Void> disposeNotifier) {
for (int i = 0; i < resources.length; i++) {
ExecutorService service = serviceExtractor.apply(resources[i]);
service.shutdown();
}
//TODO: use a configurable separate pool?
final ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(() -> {
long nanoStart = System.nanoTime();
long nanoGraceRemaining = gracePeriod.toNanos();
boolean allAwaited = true;
//wait for one executor at a time
int index = 0;
while (index < resources.length) {
ExecutorService toAwait = serviceExtractor.apply(resources[index]);;
//short case: the current executor has already terminated
if (toAwait.isTerminated()) {
index++;
continue;
}
//we're inspecting the next executor and giving it nanoGraceRemaining ns to terminate gracefully
try {
if (!toAwait.awaitTermination(nanoGraceRemaining, TimeUnit.NANOSECONDS)) {
//if it didn't terminate gracefully, the whole graceful operation can be considered a failure
allAwaited = false;
break;
}
else {
//update the nanoGraceRemaining so that the global operation is within gracePeriod bounds
long oldStart = nanoStart;
nanoStart = System.nanoTime();
nanoGraceRemaining = Math.max(0, nanoStart - oldStart);
index++;
}
}
catch (InterruptedException e) {
allAwaited = false;
break;
}
}
if (allAwaited) {
disposeNotifier.tryEmitEmpty();
}
else {
for (int i = 0; i < resources.length; i++) {
ExecutorService executorService = serviceExtractor.apply(resources[i]);
executorService.shutdownNow();
}
disposeNotifier.tryEmitError(new TimeoutException("Scheduler didn't shutdown gracefully in time (" + gracePeriod + "), used shutdownNow"));
}
});
} |
Yep, that's a great optimization, thanks for the suggestion. |
22e8b1d
to
9026186
Compare
} | ||
|
||
@Test | ||
public void immediateTaskIsSkippedIfDisposeRightAfter() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer the case. It still could race, but now there's a few more instructions in dispose()
so the task gets aborted. IMO it's not a feature worth testing - the test was introduced in 6f3383d but one should not rely on a race to dispose tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice progress overall. Left my comments. Also, there are a set of general polishing points:
- Let's use plain volatile access (e.g.
this.state
instead ofSTATE.get(this)
) plain access is the fastest approach and we use it consistently through the codebase.AtomicXXXFieldUpdate#get
is an option to get value when plain access is impossible (e.g. shared utils function i.eOperators#requested
) - Let's make sure imports are not collapsed
ShedulerState#terminated
is not making useful of the old state except bounded elastic, thus lets avoidterminated(old)
and usestatic final TERMINATED_STATE = new SchedulerState(dead_executor_service, Mono.empty())
where possible ;
reactor-core/src/main/java/reactor/core/scheduler/SchedulerState.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/SingleScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/SingleScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/SingleScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/SingleScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
The latest changes include improvements for avoiding looping in the |
…y, added simple validation for sequential multiple disposeGracefully
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good overall with few minor comments
reactor-core/src/jcstress/java/reactor/core/scheduler/RacingDisposeGracefullyStressTest.java
Outdated
Show resolved
Hide resolved
reactor-core/src/jcstress/java/reactor/core/scheduler/SchedulersStressTest.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Show resolved
Hide resolved
great job @chemicL ! finally approved and ready to merge 😄 now the only remaining step is to try to summarize the design of the change for the commit message 📖 |
@chemicL this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to |
Currently, all
Scheduler
s forcefully shutdown the underlyingExecutorService
s by callingshutdownNow()
method. In some scenarios, that is undesired as it does not allow proper cleanup.Scheduler
s should allow shutting down by not accepting new work, but giving the currently executing tasks a chance to finish without interruption. AMono<Void> disposeGracefully()
method has been added for that purpose. Upon subscription, it callsshutdown()
instead ofshutdownNow()
and creates a background task that doesawaitTermination()
to complete the returnedMono
. It can be combined withtimeout()
andretry()
operators in realistic scenarios.