-
Notifications
You must be signed in to change notification settings - Fork 85
chore: add BidiUploadState #3222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: add BidiUploadState #3222
Conversation
d79117e to
fc552a4
Compare
437797f to
af8b816
Compare
This will be used by an upcoming PR that will integrate it with a BidiWriteObject stream. At a high level this class hierarchy models the state and lifecycle events which happen when performing an upload -- in this change append specific, but many generalize to all uploads.
af8b816 to
75d4cfd
Compare
| private static final Marker TRACE_EXIT = MarkerFactory.getMarker("exit"); | ||
|
|
||
| static final OneofDescriptor FIRST_MESSAGE_DESCRIPTOR = | ||
| BidiWriteObjectRequest.getDescriptor().getOneofs().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can't be the right way to get at the "first_message" field, can it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the easiest way to probe whether it is set or not. protoc doesn't generate a .hasFirstMessage() method like it does for optional fields. But the base message class has com.google.protobuf.GeneratedMessageV3#hasOneof(com.google.protobuf.Descriptors.OneofDescriptor) method. Using this to probe presence felt cleaner than m.hasUploadId() || m.hasWriteObjectSpec() || m.hasAppendObjectSpec().
| checkArgument( | ||
| !(first.hasChecksummedData() && second.hasChecksummedData()), | ||
| "attempting to merge two requests that both specify checksummed_data"); | ||
| BidiWriteObjectRequest.Builder b = first.toBuilder().mergeFrom(second); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this throw away the data from one of the two writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checkArgument on line 195-197 guarantees that at most one of the messages contains data. By the time we make it to mergeFrom, we know that only 0 or 1 of the messages contains any actual data. If there were a bug in the caller of this method, the exception from checkArgument would prevent it from silently making a bad mutation.
## Description feat: *breaking behavior* rewrite Storage.blobAppendableUpload to be non-blocking and have improved throughput (#3231) Rewrite internals of BlobAppendableUpload to provide non-blocking write calls, and it take advantage of grpc async message handling. When `AppendableUploadWriteableByteChannel#write(ByteBuffer)` is called, an attempt will be made to enqueue the bytes in the outbound queue to GCS. If there is only enough room to partially consume the bytes provided in the `ByteBuffer` the write call will return early specifying the number of bytes actually consumed. As acknowledgements come in from gcs, enqueued messages will be evicted freeing space in the outbound queue. Thereby allowing more bytes to be consumed and enqueued. Given appendable objects are still in private preview I can't quote any metrics here, however preliminary benchmarking of several million objects across a range of sizes show across the board throughput improvments. Because the channel's write call is now non-blocking, if you want to block your application until the full buffer is consumed some new helper methods have been added in StorageChannelUtils to provide blocking behavior. A new method `MinFlushSizeFlushPolicy#withMaxPendingBytes(long)` has been added to allow limiting the number of pending outbound bytes. The default values is 16MiB, but can be configured lower if necessary. ## Release Notes BEGIN_COMMIT_OVERRIDE BEGIN_NESTED_COMMIT feat: *breaking behavior* rewrite Storage.blobAppendableUpload to be non-blocking and have improved throughput (#3231) END_NESTED_COMMIT BEGIN_NESTED_COMMIT feat: add StorageChannelUtils to provide helper methods to perform blocking read/write to/from non-blocking channels (#3231) END_NESTED_COMMIT BEGIN_NESTED_COMMIT feat: add MinFlushSizeFlushPolicy#withMaxPendingBytes(long) (#3231) END_NESTED_COMMIT BEGIN_NESTED_COMMIT fix: update BlobAppendableUploadConfig and FlushPolicy.MinFlushSizeFlushPolicy to default to 4MiB minFlushSize and 16MiB maxPendingBytes (#3249) END_NESTED_COMMIT BEGIN_NESTED_COMMIT fix: make FlushPolicy${Min,Max}FlushSizeFlushPolicy constructors private (#3217) END_NESTED_COMMIT END_COMMIMT_OVERRIDE ## Sub PRs This PR is made of up the following PRs, in sequence 1. #3217 2. #3218 3. #3219 4. #3220 5. #3221 6. #3222 7. #3223 8. #3224 9. #3225 10. #3226 11. #3227 12. #3228 13. #3229 14. #3230 15. #3235 16. #3236 17. #3241 18. #3242 19. #3246 20. #3248 21. #3249 22. #3252
|
Merged in #3231 |
This will be used by an upcoming PR that will integrate it with a BidiWriteObject stream.
At a high level this class hierarchy models the state and lifecycle events which happen when performing an upload -- in this change append specific, but many generalize to all uploads.