KEMBAR78
fix: Prevent watch stream from emitting events after close. by tom-andersen · Pull Request #1471 · googleapis/java-firestore · GitHub
Skip to content

Conversation

tom-andersen
Copy link
Contributor

@tom-andersen tom-andersen commented Nov 8, 2023

b/288270811

@product-auto-label product-auto-label bot added size: l Pull request size is large. api: firestore Issues related to the googleapis/java-firestore API. labels Nov 8, 2023
} else {
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000));
logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't directly related to PR. But since I was adding logging...

I noticed this log was building a string, even if log level is not fine. A lambda allows lazy evaluation, such that production environments don't suffer work when logging is set to a more coarse level.

public class WatchTest {

@Rule
public Timeout timeout = new Timeout(1, TimeUnit.SECONDS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working through problem, I introduced bug that made unit tests hang. To make sure tests completed, I added timeout to fail test if they don't run within 1 second.

@tom-andersen tom-andersen changed the title Prevent watch stream from emitting events after close. fix: Prevent watch stream from emitting events after close. Nov 8, 2023
@tom-andersen tom-andersen added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Nov 8, 2023
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Nov 8, 2023
@tom-andersen tom-andersen marked this pull request as ready for review November 9, 2023 14:07
@tom-andersen tom-andersen requested a review from a team as a code owner November 9, 2023 14:07
@tom-andersen tom-andersen requested a review from wu-hui November 9, 2023 14:07
Copy link
Contributor

@wu-hui wu-hui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also look into if we can add unit tests for the new stream class.

import java.util.function.Function;
import java.util.logging.Logger;

final class SuppressibleBidiStream<RequestT, ResponseT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a class level comment to explain what this class does and why it is neccessary.


private final ClientStream<ListenRequest> stream;
private final BidiStreamObserver<RequestT, ResponseT> delegate;
private boolean silence = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the class is now call Suppressbile.., this should be isSuppressed?

stream.closeSend();
}

public void closeAndSilence() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, use closeAndSuppress instead.

@Override
public void onReady(ClientStream<RequestT> stream) {
if (silence) {
LOGGER.info(() -> String.format("Silenced: %s", stream));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for the log message.

if (listenResponse.getFilter().getCount() != currentSize()) {
int filterCount = listenResponse.getFilter().getCount();
int currentSize = currentSize();
if (filterCount != currentSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlining currentSize() seems better IMO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentSize() hides a lot of computation:

  private int currentSize() {
    ChangeSet changeSet = extractChanges(Timestamp.now());
    return documentSet.size() + changeSet.adds.size() - changeSet.deletes.size();
  }
  private ChangeSet extractChanges(Timestamp readTime) {
    ChangeSet changeSet = new ChangeSet();

    for (Entry<ResourcePath, Document> change : changeMap.entrySet()) {
      if (change.getValue() == null) {
        if (documentSet.contains(change.getKey())) {
          changeSet.deletes.add(documentSet.getDocument(change.getKey()));
        }
        continue;
      }

      QueryDocumentSnapshot snapshot =
          QueryDocumentSnapshot.fromDocument(firestore, readTime, change.getValue());

      if (documentSet.contains(change.getKey())) {
        changeSet.updates.add(snapshot);
      } else {
        changeSet.adds.add(snapshot);
      }
    }

    return changeSet;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant if (filterCount != currentSize())

listener.eventsCountDownLatch.awaitInitialEvents();
listener
.assertionsForLastEvent()
.noError()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more comments explaining why we have this test and what would happen if SuppressiableStream is not used.


// A race condition will sometimes throw an error if the SuppressibleBidiStream does not
// silence the old stream. This can be caused by `Preconditions.checkState(stream == null)`
// in Watch class.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explanation added here.

@gcf-owl-bot gcf-owl-bot bot requested a review from a team as a code owner November 9, 2023 21:09
@tom-andersen tom-andersen requested a review from wu-hui November 9, 2023 21:09
Copy link
Contributor

@wu-hui wu-hui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor nits, but LGTM in general, thanks!

* allowed to send `onError` or `onComplete` since that would signal the downstream that the stream
* is finished.
*
* @param <RequestT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think these @params are useful here.

if (listenResponse.getFilter().getCount() != currentSize()) {
int filterCount = listenResponse.getFilter().getCount();
int currentSize = currentSize();
if (filterCount != currentSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant if (filterCount != currentSize())


@Mock ClientStream<Integer> mockClientStream;

SilenceableBidiStream<Integer, String> sut;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does sut stand for here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System Under Test

@tom-andersen tom-andersen merged commit ee3f8c0 into main Nov 14, 2023
@tom-andersen tom-andersen deleted the tomandersen/FixWatchFilterMismatchRetryLogic branch November 14, 2023 17:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: firestore Issues related to the googleapis/java-firestore API. size: l Pull request size is large.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants