-
Notifications
You must be signed in to change notification settings - Fork 75
feat: add recursiveDelete() to Firestore #622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Preconditions.checkState( | ||
documentsPending, "The recursive delete operation has already been completed"); | ||
|
||
// TODO: figure out if this is safe, or a way to lock this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to expose the lock in BulkWriter and synchronize it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to add a verifyNotClosed
that calls verifyNotClosedLocked
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks for the suggestion.
Codecov Report
@@ Coverage Diff @@
## bc/rc-main #622 +/- ##
=============================================
Coverage ? 74.59%
Complexity ? 1124
=============================================
Files ? 68
Lines ? 6057
Branches ? 737
=============================================
Hits ? 4518
Misses ? 1309
Partials ? 230 Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test code review coming this afternoon.
google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
Outdated
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
Outdated
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
Outdated
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
Outdated
Show resolved
Hide resolved
*/ | ||
@Nonnull | ||
@VisibleForTesting | ||
public ApiFuture<Void> recursiveDelete( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not need to be public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
|
||
public void onError(Throwable throwable) { | ||
lastError = | ||
new FirestoreException("Failed to fetch children documents.", Status.UNAVAILABLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This swallows the underlying exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapped the old exception. The status should be UNAVAILABLE
regardless of the error, as it is on Node.
query = query.select(FieldPath.documentId()).limit(maxPendingOps); | ||
|
||
return query; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should find a way to combine all three methods (by passing ResourcePath and collectionId at each callsite)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
// Delete the provided document reference if one was provided. | ||
if (documentReference != null) { | ||
ApiFuture<Void> voidDeleteFuture = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call deleteReference
? (Not sure)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in this case, since deleteReference()
doesn't return the ApiFuture, and we need it here to chain with flushFuture
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make it return an ApiFuture.
private ApiFuture<Void> deleteReference(final DocumentReference reference) {
synchronized (lock) {
pendingOperationsCount++;
}
return ApiFutures.transformAsync(
catchingDelete(reference),
new ApiAsyncFunction<WriteResult, Void>() {
public ApiFuture<Void> apply(WriteResult result) {
synchronized (lock) {
pendingOperationsCount--;
// We wait until the previous stream has ended in order to ensure the
// startAfter document is correct. Starting the next stream while
// there are pending operations allows Firestore to maximize
// BulkWriter throughput.
if (documentsPending
&& !streamInProgress
&& pendingOperationsCount < minPendingOps) {
streamDescendants();
}
return ApiFutures.immediateFuture(null);
}
}
},
MoreExecutors.directExecutor());
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see what you mean now, thanks. Done.
new ApiAsyncFunction<WriteResult, Object>() { | ||
|
||
public ApiFuture<Object> apply(WriteResult result) { | ||
pendingOperationsCount--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If BulkWriter sends two requests in parallel, this code might be invoked from two different Gax threads. I think this needs synchronization (same with the error handler).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a lock here and elsewhere that access lastError
, streamInProgress
, documentsPending
, and errorCount
.
ApiFutures.allAsList(pendingFutures), | ||
new ApiAsyncFunction<List<Void>, Void>() { | ||
public ApiFuture<Void> apply(List<Void> unused) { | ||
if (lastError == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this guaranteed to run on the same thread that sets lastError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure which executor gets used when transforming ApiFutures.allAsList()
, but if we assume it can be either of the two futures in the list, we should lock it. Added a lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
Matchers.<ServerStreamingCallable>any()); | ||
|
||
// Include dummy response for the deleted reference. | ||
doAnswer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could just use a collection delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted the comment (copied from another test). The test is using a collection delete, and the batchWrite mock is to delete the document that is streamed (a document needs to be returned for the stream to retry).
final List<Answer<RunQueryResponse>> runQueryResponses = | ||
new ArrayList<>( | ||
Arrays.asList( | ||
queryResponse(mockException, fullDocumentPath("a"), fullDocumentPath("b")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't the exception be the last response rather than the first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test checks that the stream retries work properly (streamed documents are added to the BulkWriter even if the stream is choppy), so we want the exception to be thrown from the start. The last response is a success response to stop the retry attempts.
google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java
Show resolved
Hide resolved
|
||
firestore.recursiveDelete(collectionB).get(); | ||
assertEquals(6, countCollectionChildren(randomColl)); | ||
assertEquals(0, countCollectionChildren(collectionB)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there not still be one document?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. We created a separate collection (collectionB
) with only a single document, deleted that document, and verified that the collection with all the people wasn't affected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I am glad you made me look at this again though - I think you should wait for "doggo" to be written before invoking the delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, done.
assertTrue(e.getMessage().contains("BulkWriter has already been closed")); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it will be pretty tough to test but we do not have a test that tests that we send the second request when we pass the MIN_PENDING_OPS threshold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified createsSecondQueryWithCorrectStartAfter()
to test this.
Co-authored-by: Sebastian Schmidt <mrschmidt@google.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the review!
*/ | ||
@Nonnull | ||
@VisibleForTesting | ||
public ApiFuture<Void> recursiveDelete( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
|
||
// Whether to select all documents under `parentPath`. By default, only | ||
// collections that match `collectionId` are selected. | ||
abstract boolean getKindless(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can have kindness without being kindless!
// Whether to require consistent documents when restarting the query. By | ||
// default, restarting the query uses the readTime offset of the original | ||
// query to provide consistent results. | ||
abstract boolean getRequireConsistency(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesRequireConsistency
sounds weird. I'll keep it as is to match the node implementation which is requireConsistency
.
|
||
public ApiFuture<Void> run() { | ||
Preconditions.checkState( | ||
documentsPending, "The recursive delete operation has already been completed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, added an invoked
boolean.
Preconditions.checkState( | ||
documentsPending, "The recursive delete operation has already been completed"); | ||
|
||
// TODO: figure out if this is safe, or a way to lock this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks for the suggestion.
|
||
firestore.recursiveDelete(collectionB).get(); | ||
assertEquals(6, countCollectionChildren(randomColl)); | ||
assertEquals(0, countCollectionChildren(collectionB)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. We created a separate collection (collectionB
) with only a single document, deleted that document, and verified that the collection with all the people wasn't affected.
assertTrue(e.getMessage().contains("BulkWriter has already been closed")); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified createsSecondQueryWithCorrectStartAfter()
to test this.
|
||
public void onError(Throwable throwable) { | ||
lastError = | ||
new FirestoreException("Failed to fetch children documents.", Status.UNAVAILABLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapped the old exception. The status should be UNAVAILABLE
regardless of the error, as it is on Node.
new ApiAsyncFunction<WriteResult, Object>() { | ||
|
||
public ApiFuture<Object> apply(WriteResult result) { | ||
pendingOperationsCount--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a lock here and elsewhere that access lastError
, streamInProgress
, documentsPending
, and errorCount
.
ApiFutures.allAsList(pendingFutures), | ||
new ApiAsyncFunction<List<Void>, Void>() { | ||
public ApiFuture<Void> apply(List<Void> unused) { | ||
if (lastError == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure which executor gets used when transforming ApiFutures.allAsList()
, but if we assume it can be either of the two futures in the list, we should lock it. Added a lock.
@VisibleForTesting | ||
ApiFuture<Void> recursiveDelete( | ||
CollectionReference reference, BulkWriter bulkWriter, int maxLimit, int minLimit) { | ||
RecursiveDelete deleter = new RecursiveDelete(this, bulkWriter, reference, maxLimit, minLimit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code flow would be a bit cleaner if you called this method from all overloads. This will ensue that all calls go through the same callsite. If you take parentPath
and collectionId
as input here then you will also remove some duplication in RecursiveDelete (as you only need one constructor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. RecursiveDelete
only has one constructor now.
* The number of pending BulkWriter operations at which RecursiveDelete starts the next limit | ||
* query to fetch descendants. | ||
*/ | ||
private int minPendingOps = MIN_PENDING_OPS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be final now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
private boolean invoked = false; | ||
|
||
/** Query limit to use when fetching all descendants. */ | ||
private int maxPendingOps = MAX_PENDING_OPS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be final now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
private boolean streamInProgress = false; | ||
|
||
/** Whether run() has been called. */ | ||
private boolean invoked = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
started?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
private final FirestoreRpcContext<?> firestoreRpcContext; | ||
private final BulkWriter writer; | ||
|
||
@Nullable private final DocumentReference documentReference; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be cleaner to replace this with a documentId string. Then you would have parentPath, collectionId and documentId below. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think trying to rehydrate the DocumentReference from the documentId is more bug-prone, since we'd be setting it to an empty string for collections, and the document path for documents.
I renamed documentReference
to rootDocumentReference
and added documentation. This way, we don't have to add extra code to rehydrate, and it's pretty clear to see the purpose of the variable at a glance.
|
||
public void onCompleted() { | ||
synchronized (lock) { | ||
streamInProgress = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably reduce the scope of the lock to just this line (also for the onError callback, which only needs to set lastError under lock). This improves readability as it makes the intention more clear and also answer the question why onNext
does not use locks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
firestore.recursiveDelete(collectionB).get(); | ||
assertEquals(6, countCollectionChildren(randomColl)); | ||
assertEquals(0, countCollectionChildren(collectionB)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I am glad you made me look at this again though - I think you should wait for "doggo" to be written before invoking the delete.
if (numDeletesBuffered[0] < cutoff) { | ||
numDeletesBuffered[0] += batchWriteResponse.size(); | ||
return ApiFutures.transformAsync( | ||
bufferFuture, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand why we need to block on bufferFuture
here. I thought the idea was that we should finish the first 4000 deletes and then ask for the next batch. The current test seems to indicate that we do not wait for the deletes to finish executing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally had bufferFuture
to ensure that the stream completes before batchWrite responses do in the tests. Without the bufferFuture
logic, we run into a race condition where the first 4000 writes complete before the stream is able to process all the documents. Since the test waits for secondQueryFuture
before completing the remaining writes, the test gets stuck.
Here's what is happening broken down (using the normal max/min pending ops parameters, rather than the simplified test params):
Intended order of events:
- Stream 5000 documents.
- BulkWriter mock completes 4000 documents, The delete completion handler detects we are under the
minPendingOps
threshold to start a new stream. - A second query streamed (testing startAfter), completing
secondQueryFuture
. - The remaining 1000 batchWrites are completed, ending the test.
Race condition order without bufferFuture:
- Stream 4XXXX documents.
- BulkWriter mock completes 4000 documents. However, since the stream is still in progress, a new stream is not started.
- The stream completes, but since we are waiting on the remaining 1000 pending operations, we do not start the next stream.
- Test stalls because the last 1000 mock writes are waiting for the second query to be run first to complete `secondQueryFuture. (Normally, the next batch isn't waiting on a future, and would trigger the next stream)
tl:dr; Delete completions are still being waited on, bufferFuture
is just here to make sure that the stream completes first in this kinda hacky test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining this. I am pretty sure I understand this now. If you can add a small comment in the test I might even still understand tomorrow (optional).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment.
google-cloud-firestore/src/test/java/com/google/cloud/firestore/RecursiveDeleteTest.java
Outdated
Show resolved
Hide resolved
|
||
// Delete the provided document reference if one was provided. | ||
if (documentReference != null) { | ||
ApiFuture<Void> voidDeleteFuture = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make it return an ApiFuture.
private ApiFuture<Void> deleteReference(final DocumentReference reference) {
synchronized (lock) {
pendingOperationsCount++;
}
return ApiFutures.transformAsync(
catchingDelete(reference),
new ApiAsyncFunction<WriteResult, Void>() {
public ApiFuture<Void> apply(WriteResult result) {
synchronized (lock) {
pendingOperationsCount--;
// We wait until the previous stream has ended in order to ensure the
// startAfter document is correct. Starting the next stream while
// there are pending operations allows Firestore to maximize
// BulkWriter throughput.
if (documentsPending
&& !streamInProgress
&& pendingOperationsCount < minPendingOps) {
streamDescendants();
}
return ApiFutures.immediateFuture(null);
}
}
},
MoreExecutors.directExecutor());
}
}
|
||
/** Whether a query stream is currently in progress. Only one stream can be run at a time. */ | ||
@GuardedBy("lock") | ||
private boolean streamInProgress = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be able to combine streamInProgress
and documentsPending
with some !
magic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Played around with a bit, but I couldn't think of a way to do this cleanly with good readability.
google-cloud-firestore/src/main/java/com/google/cloud/firestore/RecursiveDelete.java
Outdated
Show resolved
Hide resolved
onQueryEnd(); | ||
synchronized (lock) { | ||
String message = "Failed to fetch children documents. " + throwable.getMessage(); | ||
lastError = new FirestoreException(message, Status.UNAVAILABLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java exception "bubbling" usually involves passing the original exception as the cause to the wrapping exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to include the throwable and FirestoreExceptoin.forServerRejection()
.
// deleting a document, the parent is the path of that document. | ||
parentPath = path; | ||
collectionId = path.getParent().getId(); | ||
Preconditions.checkState(collectionId != null, "Parent of a document should not be null."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would drop this check - every document has a collection ID. If you do keep it, the check should be on getParent(), which is nullable and not on getId().
BTW, I really like this cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed check to parent.
I definitely like this version more, thanks for pushing through on your vision :) Having all the collectionId/parentPath logic inside getAllDescendantsQuery() makes so more sense, and makes the rest of the code much cleaner!
if (numDeletesBuffered[0] < cutoff) { | ||
numDeletesBuffered[0] += batchWriteResponse.size(); | ||
return ApiFutures.transformAsync( | ||
bufferFuture, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining this. I am pretty sure I understand this now. If you can add a small comment in the test I might even still understand tomorrow (optional).
🤖 I have created a release \*beep\* \*boop\* --- ## [2.5.0](https://www.github.com/googleapis/java-firestore/compare/v2.4.0...v2.5.0) (2021-06-01) ### Features * add `gcf-owl-bot[bot]` to `ignoreAuthors` ([#641](https://www.github.com/googleapis/java-firestore/issues/641)) ([6f40f0f](https://www.github.com/googleapis/java-firestore/commit/6f40f0fc821d31136686a41e82e4b57d0ad3dede)) * add recursiveDelete() to Firestore ([#622](https://www.github.com/googleapis/java-firestore/issues/622)) ([#649](https://www.github.com/googleapis/java-firestore/issues/649)) ([9ff2f41](https://www.github.com/googleapis/java-firestore/commit/9ff2f41b765c8878c3b3fb7df962f6f1ed537f05)) ### Dependencies * update dependency com.google.cloud:google-cloud-shared-dependencies to v1.2.0 ([#640](https://www.github.com/googleapis/java-firestore/issues/640)) ([9bd881a](https://www.github.com/googleapis/java-firestore/commit/9bd881a5ddd0bdb62396e8b778bf9784c00feba2)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Porting from node. Merging to a feature branch until it's good to release.
Several PRs are contained in here: 1, 2, 3.