-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add 'tap', a generic side-effect/observability operator #3013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8ee6533 to
003dc5a
Compare
reactor-core/src/main/java/reactor/core/publisher/FluxSource.java
Outdated
Show resolved
Hide resolved
| * @param <STATE> the type of the publisher-level state that will be shared between all {@link SequenceListener} created by this factory | ||
| * @author Simon Baslé | ||
| */ | ||
| public interface SequenceListenerFactory<T, STATE> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the public API can and does have a simplified version Flux.listen(Supplier), but I do strongly believe the STATE is an important feature. Passing a Scannable is too likely to run into Scannable.SCAN_UNAVAILABLE instances.
This approach enables calls like flux.listen(Micrometer.metrics()) with powerful configurability (the metrics can use a app-wide configuration for the MeterRegistry + discover name and tags on the source flux or its parents, once per publisher)
- SignalListener and SignalListenerFactory user-facing abstraction - Flux#listen and Mono#listen operators - includes Fuseable and Conditional implementations TODO: tests of FluxListen
e9ddad3 to
39a7eed
Compare
|
Split the PR into this one dedicated to the operator and #3015 dedicated to metrics (deprecation, new module) |
- Flux#listen and Mono#listen didn't actually use the fuseable versions - doFirst listener error in all 4 classes was triggering doFinally and didn't pass to handleListenerError - onSubscribe didn't pass the operator as downstream Subscription - Mono variants have scan(PREFETCH) == -1 like Flux variants
|
I'm wondering why we invent a new operator name like I'm personally against "own" naming and for convenient naming e.g. P.S. in our reality convenience means better user experience thus better adoption |
| * | ||
| * @param listenerError the exception thrown from a handler method | ||
| */ | ||
| protected void handleListenerErrorAndTerminate(Throwable listenerError) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still support continuation? Probably we have to check if we wanna continue here before cancelling subscription
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want to add onErrorContinue support to the operator in some cases, but that's not an absolute priority for M2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it was mentioned my main suggestion is to name the operator in a more convenient way which is either to stick to the old peek or to use instead do or tap from the rx world convenience. Also few minor suggestions
|
let's go with |
|
@OlegDokuka I will open a follow-up issue around the support of errorContinue + increase test coverage. |
Add SignalListener and the FluxTap operator
Follow-up: full fusion test coverage, consider support of onErrorContinue