KEMBAR78
Add `async_call_with_retries` method to grpc_util. by bileschi · Pull Request #4825 · tensorflow/tensorboard · GitHub
Skip to content

Conversation

@bileschi
Copy link
Collaborator

  • Motivation for features / changes
    Towards making uploader asynchronous (for upload speed to TensorBoard.dev). Actually making the uploader use this new utility will follow in a separate PR. This portion is submitted separately in the spirit of smaller, easier to understand PRs.

  • Technical description of changes
    Creates a new util API async_call_with_retries which mirrors the existing call_with_retries but (unsurprisingly) returns right away and completes the gRPC call asynchronously, using the gRPC future API. The retry functionality is implemented as a recursive call where the continuation calls the same API with a decremented number of remaining tries.

  • Test Plan:
    Adds unit tests covering cases very similar to the call_with_retries functionality. I've also tested that this API meets the needs to make the uploader asynchronous.

@bileschi bileschi requested a review from nfelt March 30, 2021 20:18
@google-cla google-cla bot added the cla: yes label Mar 30, 2021
Copy link
Contributor

@nfelt nfelt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High-level seems reasonable (haven't reviewed tests yet).

I do wonder a bit whether a callback API is really what we want here; the disadvantage is that (as add_done_callback() explains) the error handling story is pretty bad in the event that some exception happens within the handling logic itself (which includes the callback provided by the caller of this utility):

Exceptions raised in the callback will be logged at ERROR level, but will not terminate any threads of execution.

So I guess depending on how you plan to structure the calling code, this seems like it could be an issue that either causes the uploader to hang forever (if the calling thread waits on the completion callback to flip some success flag) or causes it to drop data at upload time "quietly", aka with only an error in the logs rather than actually aborting (if we fire-and-forget the write RPC and basically don't check if the completion callback is ever called).

A possible compromise is to have the calling thread wait on completion with a timeout that's greater than the maximum of each RPC wait time plus the total possible backoff, but it's a little ugly to have to essentially "know" how long the retrying logic might run in order to wait for the right length of time.

The alternative API that resolves this would be making async_call_with_retries() return a future that the calling thread can hold onto and wait on confidently (knowing it'll unblock eventually). That future would essentially be a proxy over the future from the last attempt at sending the async RPC; blocking on the proxy future effectively just turns async_call_with_retries into call_with_retries. In that case, we can still set add_done_callback() on the underlying future to use as a hint to basically trigger a preemptive retry in the event of a retryable failure, without waiting for the calling thread to block on our completion, but we're protected in the event that the callback doesn't exit cleanly.

Args:
api_method: Callable for the API method to invoke.
request: Request protocol buffer to pass to the API method.
completion_handler: A function which takes a `grpc.Future` object as an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call this done_callback to match the underlying grpc.Future naming and API? Then we can just point to the docs at https://grpc.github.io/grpc/python/grpc.html#grpc.Future.add_done_callback for describing the callback.

api_method,
request,
completion_handler,
num_remaining_tries=_GRPC_RETRY_MAX_ATTEMPTS - 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's better not to expose details like this that are only needed for recursive calls in the function signature that's exposed to callers, since they shouldn't ever set num_remaining_tries or num_tries_so_far.

Instead we can just have two dedicated smaller functions nested inside this one, which close over the unchanging parts of the recursion (like the request and the API method and the clock) without having to pass them as arguments in the recursion; rather, we only pass arguments for the state that needs to be updated. E.g. like this:

def async_call_with_retries(
    api_method,
    request,
    done_callback,
    clock=None,
):
    if clock is None:
        clock = time
    logger.debug("Async RPC call %s with request: %r", rpc_name, request)

    def async_call(handler):
        api_method.future(
            request,
            timeout=_GRPC_DEFAULT_TIMEOUT_SECS,
            metadata=version_metadata(),
        ).add_done_callback(handler)

    def retry_handler(future, num_attempts):
        e = future.exception()
        if e is None:
            done_callback(future)
            return
        else:
            logger.info("RPC call %s got error %s", rpc_name, e)
            # If unable to retry, proceed to completion_handler.
            if e.code() not in _GRPC_RETRYABLE_STATUS_CODES:
                done_callback(future)
                return
            if num_attempts >= _GRPC_RETRY_MAX_ATTEMPTS:
                done_callback(future)
                return
            # If able to retry, wait then do so.
            backoff_secs = _compute_backoff_seconds(num_attempts + 1)
            clock.sleep(backoff_secs)
            async_call(functools.partial(retry_handler, num_attempts=num_attempts + 1))
            
    return async_call(functools.partial(retry_handler, num_attempts=1))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder a bit whether a callback API is really what we want here; the disadvantage is that (as add_done_callback() explains) the error handling story is pretty bad in the event that some exception happens within the handling logic itself (which includes the callback provided by the caller of this utility):

Exceptions raised in the callback will be logged at ERROR level, but will not terminate any threads of execution.

Agreed.

A possible compromise is to have the calling thread wait on completion with a timeout that's greater than the maximum of each RPC wait time plus the total possible backoff, but it's a little ugly to have to essentially "know" how long the retrying logic might run in order to wait for the right length of time.

Yeah, I've tried building the uploader using this logic, and you have to be careful to ensure that the callback passed to the async caller triggers some sort of watchable event, otherwise the uploader may complete before a retryable future is continued.

The alternative API that resolves this would be making async_call_with_retries() return a future that the calling thread can hold onto and wait on confidently (knowing it'll unblock eventually). That future would essentially be a proxy over the future from the last attempt at sending the async RPC; blocking on the proxy future effectively just turns async_call_with_retries into call_with_retries. In that case, we can still set add_done_callback() on the underlying future to use as a hint to basically trigger a preemptive retry in the event of a retryable failure, without waiting for the calling thread to block on our completion, but we're protected in the event that the callback doesn't exit cleanly.

Agreed that returning a grpc.Future would be a superior API. I spent around an hour trying to make this work, but was confounded by the following.

  • I do not see how, in the given public API, to construct a grpc.Future outside of actually invoking the gRPC call. This implies that we must return one of the futures created by generating an RPC call.
  • The future we wish to return the caller corresponds to exactly one of the gRPC calls, but we do not know which one at the time of the initial invocation, in fact, in general that future will not exist yet, since we will not want to create the second future until we are sure the first one has failed for a retryable reason.

I looked to see if there was a library or other relevant advice. The closest thing I could find was this SO question. https://stackoverflow.com/questions/61443958/how-to-wrap-rendezvous-future-returned-by-grpc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe using grpc.Method.future() isn't the right API here. Maybe we should be using https://github.com/lidizheng/proposal/blob/grpc-python-async-api/L58-python-async-api.md ?

Copy link
Contributor

@nfelt nfelt Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was envisioning that we'd just implement our own future class, which proxies to the appropriate underlying grpc.Future as need be, but doesn't expose it directly. We could probably have it subclass grpc.Future if we wanted, but I don't think it's strictly necessary, since all we really need in terms of an API for the caller is a single result() method that the calling code can invoke in order to block on the future completing.

Perhaps a simple way to avoid the issue of "exception in the caller-provided done callback gets swallowed` would be this. Define a Future class that will block on a completion event, representing the "completion of all possible retries":

class AsyncCallFuture:
    def __init__(self, completion_event):
        self._active_grpc_future = None
        self._active_grpc_future_lock = threading.Lock()
        self._completion_event = completion_event

    def _set_active_future(self, grpc_future):
        # maybe add validation logic here that future is not None
        with self._active_grpc_future_lock:
            self._active_grpc_future = grpc_future
 
    def result(self, timeout):
        self._completion_event.wait(timeout)
        with self._active_grpc_future_lock:
            if self._active_grpc_future is None:
                raise RuntimeError("AsyncFuture never had an active future set")
            self._active_grpc_future.result()

Then within async_call_with_retries we basically keep the same logic that was outlined above with a few modifications:

  • we create the completion event at the top, and create the wrapping AsyncFuture as well
  • instead of accepting done_callback and invoking it, we call .set() on the completion event
  • when we launch a new RPC, we also call ._set_active_future() and set it as the active future for the AsyncFuture (we should do this before invoking .add_done_callback() to avoid any race condition)
  • finally, we update the whole thing to return the AsyncFuture instance

All told it would look like this:

def async_call_with_retries(
    api_method,
    request,
    clock=None,
):
    if clock is None:
        clock = time
    logger.debug("Async RPC call %s with request: %r", rpc_name, request)

    completion_event = threading.Event()
    async_future = AsyncFuture(completion_event)

    def async_call(handler):
        future = api_method.future(
            request,
            timeout=_GRPC_DEFAULT_TIMEOUT_SECS,
            metadata=version_metadata(),
        )
        # Ensure we set active future before invoking done callback, to avoid case where
        # done callback completes immediately and triggers completion event while
        # async_future still holds the old future.
        async_future._set_active_future(future)
        future.add_done_callback(handler)

    def retry_handler(future, num_attempts):
        e = future.exception()
        if e is None:
            completion_event.set()
            return
        else:
            logger.info("RPC call %s got error %s", rpc_name, e)
            # If unable to retry, proceed to completion.
            if e.code() not in _GRPC_RETRYABLE_STATUS_CODES:
                completion_event.set()
                return
            if num_attempts >= _GRPC_RETRY_MAX_ATTEMPTS:
                completion_event.set()
                return
            # If able to retry, wait then do so.
            backoff_secs = _compute_backoff_seconds(num_attempts + 1)
            clock.sleep(backoff_secs)
            async_call(functools.partial(retry_handler, num_attempts=num_attempts + 1))
            
    async_call(functools.partial(retry_handler, num_attempts=1))
    return async_future

With this approach, we're still ultimately relying on the callbacks to execute cleanly in order for the AsyncFuture to unblock (since otherwise the completion event will never be triggered). However, we've removed done_callback so there's no longer caller-provided code running in those callbacks, meaning we only need our own retry logic to avoid throwing exceptions, rather than also needing the arbitrary logic passed into done_callback doing the same thing. And this is a much simpler approach than the full-fledged version where we can recover even from errors in our own callbacks (since that basically requires two parallel retry procedures, one callback-based and one synchronous, plus the attendant coordination between the two).

Update: edited to replace threading.Condition with threading.Event since the latter has the semantics where waiting doesn't block if the event has already happened, which is what we want in this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to this API. Thanks for writing clear, working code!

@bileschi bileschi requested a review from nfelt April 12, 2021 19:07
appropriate.
Args:
timeout: How long to wait in seconds before giving up and raising
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written, this code won't necessarily raise a timeout exception, since it could be the case that we wait for completion and never get there, but then the active future is a completed one from the intermediate future. Imagine that we have timeout=0.000001 - then we basically skip the wait() on the completion event entirely, and we'll just return whatever the result of the first future was, when it finishes (could be success or failure).

Instead, it might be better to check the return value from wait() (docs) and then if it's False we raise a "timeout" exception ourselves. That seems better than exposing the intermediate state of whatever random underlying future happens to be active when the timeout expires.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks. Added explicit exception throwing and added a test for the case.

# * If the grpc call succeeds: trigger the `completion_event`.
# * If there are no more retries: trigger the `completion_event`.
# * Otherwise, invoke a new async_call with the same
# retry_handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...same retry_handler, but incrementing the number of attempts." ?

@bileschi bileschi merged commit 078337f into tensorflow:master Apr 14, 2021
@bileschi bileschi deleted the async_call_with_retries branch April 14, 2021 17:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants