-
Notifications
You must be signed in to change notification settings - Fork 1.2k
adds on discard support for FluxZip/MonoZip
#3209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
|
|
||
| void drain(@Nullable ZipInner<T> callerInner, @Nullable Object dataSignal) { | ||
| if (WIP.getAndIncrement(this) != 0) { | ||
| if (callerInner != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As onDiscard is a no-op when dataSignal == null, perhaps it's worth checking that it's non-null also in this condition to avoid the unnecessary checks that lead to the no-op?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already done inside onDiscard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is what I am saying, but without the "already" part. My suggestion is to do the null check and fail early without executing the following 4 if statements + 1 method call, all leading to a no-op.
|
|
||
| if (queue != null) { | ||
| if (sourceMode == ASYNC) { | ||
| // delegates discarding to the queue holder to ensure there is no racing on draining from the SpScQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there actual implementations of this logic? How can the queue holder tell whether the clear came from a discard or another behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of the fusion contract, the call for the queue.clear means - termination of the process. In the case of fusion - the queue comes from the upstream, so the upstream is a creator/owner of the queue, so it and only it can have full control of when values are offered and drained. See #2636 for more information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's what I was asking about. I checked a sample implementation and saw that the clear() gets overriden and handles the discarding.
| DiscardScenario.allFluxSourceArray("zipScalar", 4, | ||
| sources -> { | ||
| Publisher<Tracked>[] sources1 = | ||
| Stream.concat(sources.stream(), Stream.of(Mono.just(Tracked.RELEASED))).toArray(Publisher[]::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the scalar already RELEASED? Shouldn't it be an unreleased one, to be able to validate that it is properly consumed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scenarios are defined statically, so within a call, we can not get access to the allocator which holds all the references to Tracked objects. In this particular case, we initiate a specific path for FluxZip, which uses ZipSingleCoordinator instead of the default ZipCoordinator. Thus, it is not that important for now, although this can be improved in a separate PR so the rules API can get access to allocator upon the rule installation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Now I understand it's not vital to validate the scalar value is released. Thanks.
|
|
||
| @Override | ||
| public ColdTestPublisher<T> next(@Nullable T t) { | ||
| public ColdTestPublisher<T> next(@Nullable T t, Consumer<T> onUndelivered) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no need to use the onUndelivered value like in DefaultTestPublisher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rolled back this API since it is a breaking one after all. In the case of ColdTestPublisher, all the values are cached in the internal store, so they can be consumed slightly latter
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
| for (ZipInner<R> ms : subscribers) { | ||
| ms.cancel(); | ||
| } | ||
| if (DONE.getAndSet(this, Integer.MIN_VALUE) <=0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (DONE.getAndSet(this, Integer.MIN_VALUE) <=0) { | |
| if (DONE.getAndSet(this, Integer.MIN_VALUE) <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
|
||
| @Override | ||
| public void onError(Throwable t) { | ||
| if (value != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When can this happen? Should this case not be reported somehow? Is it not a behaviour change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally, this should never happen as of Mono specification which guarantees only the following cases:
onNext + onComplete
onComplete
onError
The others are prohibited so onNext + onError was never expected although I added that as an extra check.
Will roll it back
| if (value == null) { | ||
| value = t; | ||
| parent.signal(); | ||
| if (!parent.signal()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that false is returned when this value does not finish the collection of the zipped items. Why then should the onDiscard be called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
false means the signal was not delivered which happens when the process was canceled or force terminated with the error or empty source. In that scenario value is discarded by a caller since the signal method has no information about the caller or signaled value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way it is implemented right now seems different. Also, please have a look at failing tests from CI.
The condition for returning false is:
if (DONE.incrementAndGet(this) != n) {
return false;
}
And that is the same condition which signifies the zipping is done. I'm a bit confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| import java.util.Arrays; | ||
| import java.util.EnumSet; | ||
| import java.util.Objects; | ||
| import java.util.function.Consumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the API got reverted, the unused imports can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one doubt that seems significant, about discarding items in MonoZip, but everything else looks good. Happy to approve once I understand the discarding in MonoZip.
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
FluxZipFluxZip/MonoZip. Improves FluxPublishOn onDiscard behavior
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
FluxZip/MonoZip. Improves FluxPublishOn onDiscard behaviorFluxZip/MonoZip
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few comments on the tests, plus minor note on change from onNextDropped to onDiscard
reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java
Outdated
Show resolved
Hide resolved
reactor-core/src/test/java/reactor/core/publisher/MonoCacheInvalidateWhenTest.java
Show resolved
Hide resolved
| .publishOn(Schedulers.newSingle("discardPollAsync")) | ||
| .filter(i -> { throw new IllegalStateException("boom"); }) | ||
| .filter(i -> true) | ||
| .contextWrite(previous -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this change really relates to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fails on my machine all the time (m1 does goos job in detecting such things)
| CountDownLatch latch = new CountDownLatch(10); | ||
| StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead | ||
| .publishOn(Schedulers.newSingle("discardPollAsync"), 1) | ||
| .contextWrite(previous -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this change really relates to this PR?
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good from my perspective. @chemicL did you have any unresolved comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I had one comment about handling dataSignal == null case early in FluxZip::drain but now the logic is different. Anyways, it's not a blocker. Worth considering an optimisation to avoid a few branched executions just to hit a null check. That optimisation can be addressed in a more focused effort around #3227.
Signed-off-by: Oleh Dokuka <odokuka@vmware.com> Signed-off-by: OlegDokuka <odokuka@vmware.com>
|
@OlegDokuka this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to |
Signed-off-by: Oleh Dokuka <odokuka@vmware.com> Signed-off-by: OlegDokuka <odokuka@vmware.com>
This PR adds support for missing the
onDiscardfunctionality to theFluxZip/MonoZipoperatorsAlso, this PR includes fixes to fluky tests
Signed-off-by: Oleh Dokuka odokuka@vmware.com