-
Notifications
You must be signed in to change notification settings - Fork 75
fix: close the Watch stream when we receive an error #834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ResponseObserver<ResponseT> responseObserverT, | ||
ServerStreamingCallable<RequestT, ResponseT> callable); | ||
|
||
<RequestT, ResponseT> ApiStreamObserver<RequestT> streamRequest( | ||
ApiStreamObserver<ResponseT> responseObserverT, | ||
<RequestT, ResponseT> ClientStream<RequestT> streamRequest( | ||
BidiStreamObserver<RequestT, ResponseT> responseObserverT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the main changes, the rest is plumbing and cleanup.
() -> { | ||
synchronized (Watch.this) { | ||
stream.onCompleted(); | ||
stream.closeSend(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now closes the stream instead of invoking a no-op method.
} else { | ||
return queryResponse(DOCUMENT_NAME + "3").answer(invocation); | ||
} | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: Here and below - this is only a formatting change as I removed the unused cast.
MoreExecutors.directExecutor()); | ||
} | ||
}) | ||
mock -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only a formatting change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes look fine. Just a few questions around testing
retryAttempts[0]++; | ||
return RETRYABLE_FAILED_FUTURE; | ||
}) | ||
mock -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this test the new changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's hard to test these changes. We could mock the backend stream, but then we are essentially only testing the behavior of the mock. If you know of a pre-existing implementation/fake of a GRPC stream that we can use to test this behavior, then we can add a test. A homegrown implementation that validates that our code follows our ow assumptions will not provide us with much meaningful test coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would something like this be helpful? https://github.com/googleapis/java-bigtable/blob/main/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/testing/FakeStreamingApi.java
Examples in tests:
https://github.com/googleapis/java-bigtable/blob/main/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/FilterMarkerRowsCallableTest.java#L38
(It's using ApiStreamObserver, but I imagine could be refactored to this use case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test queryWatchShutsDownStreamOnPermissionDenied() that re-uses some of the existing functionality.
The Watch code called "onCompleted" to close the stream - but that did not actually do anything. This PR migrates our streaming GAPIC APIs to ClientStream (which is not deprecated) and then calls "close()" instead.
Fixes: #822