KEMBAR78
adds on discard support for `FluxZip`/`MonoZip` by OlegDokuka · Pull Request #3209 · reactor/reactor-core · GitHub
Skip to content

Conversation

@OlegDokuka
Copy link
Contributor

@OlegDokuka OlegDokuka commented Oct 3, 2022

This PR adds support for missing the onDiscard functionality to the FluxZip / MonoZip operators

Also, this PR includes fixes to fluky tests

Signed-off-by: Oleh Dokuka odokuka@vmware.com

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
@OlegDokuka OlegDokuka added type/enhancement A general enhancement area/doOnDiscard This belongs to the doOnDiscard theme labels Oct 3, 2022
@OlegDokuka OlegDokuka added this to the 3.4.24 milestone Oct 3, 2022
@OlegDokuka OlegDokuka requested a review from a team as a code owner October 3, 2022 10:10
@OlegDokuka OlegDokuka changed the base branch from main to 3.4.x October 3, 2022 10:10
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>

void drain(@Nullable ZipInner<T> callerInner, @Nullable Object dataSignal) {
if (WIP.getAndIncrement(this) != 0) {
if (callerInner != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As onDiscard is a no-op when dataSignal == null, perhaps it's worth checking that it's non-null also in this condition to avoid the unnecessary checks that lead to the no-op?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already done inside onDiscard

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I am saying, but without the "already" part. My suggestion is to do the null check and fail early without executing the following 4 if statements + 1 method call, all leading to a no-op.


if (queue != null) {
if (sourceMode == ASYNC) {
// delegates discarding to the queue holder to ensure there is no racing on draining from the SpScQueue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there actual implementations of this logic? How can the queue holder tell whether the clear came from a discard or another behaviour?

Copy link
Contributor Author

@OlegDokuka OlegDokuka Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of the fusion contract, the call for the queue.clear means - termination of the process. In the case of fusion - the queue comes from the upstream, so the upstream is a creator/owner of the queue, so it and only it can have full control of when values are offered and drained. See #2636 for more information

Copy link
Member

@chemicL chemicL Oct 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that's what I was asking about. I checked a sample implementation and saw that the clear() gets overriden and handles the discarding.

DiscardScenario.allFluxSourceArray("zipScalar", 4,
sources -> {
Publisher<Tracked>[] sources1 =
Stream.concat(sources.stream(), Stream.of(Mono.just(Tracked.RELEASED))).toArray(Publisher[]::new);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the scalar already RELEASED? Shouldn't it be an unreleased one, to be able to validate that it is properly consumed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scenarios are defined statically, so within a call, we can not get access to the allocator which holds all the references to Tracked objects. In this particular case, we initiate a specific path for FluxZip, which uses ZipSingleCoordinator instead of the default ZipCoordinator. Thus, it is not that important for now, although this can be improved in a separate PR so the rules API can get access to allocator upon the rule installation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Now I understand it's not vital to validate the scalar value is released. Thanks.


@Override
public ColdTestPublisher<T> next(@Nullable T t) {
public ColdTestPublisher<T> next(@Nullable T t, Consumer<T> onUndelivered) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no need to use the onUndelivered value like in DefaultTestPublisher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rolled back this API since it is a breaking one after all. In the case of ColdTestPublisher, all the values are cached in the internal store, so they can be consumed slightly latter

Oleh Dokuka added 3 commits October 4, 2022 16:02
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
@OlegDokuka OlegDokuka requested a review from a team October 4, 2022 13:55
for (ZipInner<R> ms : subscribers) {
ms.cancel();
}
if (DONE.getAndSet(this, Integer.MIN_VALUE) <=0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (DONE.getAndSet(this, Integer.MIN_VALUE) <=0) {
if (DONE.getAndSet(this, Integer.MIN_VALUE) <= 0) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


@Override
public void onError(Throwable t) {
if (value != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this happen? Should this case not be reported somehow? Is it not a behaviour change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally, this should never happen as of Mono specification which guarantees only the following cases:

onNext + onComplete
onComplete
onError

The others are prohibited so onNext + onError was never expected although I added that as an extra check.

Will roll it back

if (value == null) {
value = t;
parent.signal();
if (!parent.signal()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that false is returned when this value does not finish the collection of the zipped items. Why then should the onDiscard be called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

false means the signal was not delivered which happens when the process was canceled or force terminated with the error or empty source. In that scenario value is discarded by a caller since the signal method has no information about the caller or signaled value

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way it is implemented right now seems different. Also, please have a look at failing tests from CI.
The condition for returning false is:

        if (DONE.incrementAndGet(this) != n) {
			return false;
		}

And that is the same condition which signifies the zipping is done. I'm a bit confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

import java.util.Arrays;
import java.util.EnumSet;
import java.util.Objects;
import java.util.function.Consumer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the API got reverted, the unused imports can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one doubt that seems significant, about discarding items in MonoZip, but everything else looks good. Happy to approve once I understand the discarding in MonoZip.

Oleh Dokuka added 6 commits October 7, 2022 01:23
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
@OlegDokuka OlegDokuka changed the title adds on discard support for FluxZip adds on discard support for FluxZip/MonoZip. Improves FluxPublishOn onDiscard behavior Oct 7, 2022
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
@OlegDokuka OlegDokuka changed the title adds on discard support for FluxZip/MonoZip. Improves FluxPublishOn onDiscard behavior adds on discard support for FluxZip/MonoZip Oct 7, 2022
Copy link
Contributor

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few comments on the tests, plus minor note on change from onNextDropped to onDiscard

.publishOn(Schedulers.newSingle("discardPollAsync"))
.filter(i -> { throw new IllegalStateException("boom"); })
.filter(i -> true)
.contextWrite(previous -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this change really relates to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails on my machine all the time (m1 does goos job in detecting such things)

CountDownLatch latch = new CountDownLatch(10);
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
.publishOn(Schedulers.newSingle("discardPollAsync"), 1)
.contextWrite(previous -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this change really relates to this PR?

Oleh Dokuka added 3 commits October 7, 2022 22:03
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
@simonbasle simonbasle modified the milestones: 3.4.24, 3.4.25 Oct 10, 2022
Copy link
Contributor

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good from my perspective. @chemicL did you have any unresolved comment?

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I had one comment about handling dataSignal == null case early in FluxZip::drain but now the logic is different. Anyways, it's not a blocker. Worth considering an optimisation to avoid a few branched executions just to hit a null check. That optimisation can be addressed in a more focused effort around #3227.

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
@OlegDokuka OlegDokuka merged commit fe5a3b5 into 3.4.x Oct 24, 2022
@reactorbot
Copy link

@OlegDokuka this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to main 🙇

@OlegDokuka OlegDokuka deleted the bugfix/3.4.x-zip-ondiscard branch October 24, 2022 10:42
OlegDokuka pushed a commit that referenced this pull request Oct 24, 2022
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
chemicL pushed a commit that referenced this pull request Mar 7, 2023
This PR adds support for missing the `onDiscard` functionality to the `FluxZip` / `MonoZip` operators
Also, this commit includes fixes to fluky tests related to the `onDiscard` functionality
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/doOnDiscard This belongs to the doOnDiscard theme type/enhancement A general enhancement

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants