-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add async_call_with_retries method to grpc_util.
#4825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async_call_with_retries method to grpc_util.
#4825
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High-level seems reasonable (haven't reviewed tests yet).
I do wonder a bit whether a callback API is really what we want here; the disadvantage is that (as add_done_callback() explains) the error handling story is pretty bad in the event that some exception happens within the handling logic itself (which includes the callback provided by the caller of this utility):
Exceptions raised in the callback will be logged at ERROR level, but will not terminate any threads of execution.
So I guess depending on how you plan to structure the calling code, this seems like it could be an issue that either causes the uploader to hang forever (if the calling thread waits on the completion callback to flip some success flag) or causes it to drop data at upload time "quietly", aka with only an error in the logs rather than actually aborting (if we fire-and-forget the write RPC and basically don't check if the completion callback is ever called).
A possible compromise is to have the calling thread wait on completion with a timeout that's greater than the maximum of each RPC wait time plus the total possible backoff, but it's a little ugly to have to essentially "know" how long the retrying logic might run in order to wait for the right length of time.
The alternative API that resolves this would be making async_call_with_retries() return a future that the calling thread can hold onto and wait on confidently (knowing it'll unblock eventually). That future would essentially be a proxy over the future from the last attempt at sending the async RPC; blocking on the proxy future effectively just turns async_call_with_retries into call_with_retries. In that case, we can still set add_done_callback() on the underlying future to use as a hint to basically trigger a preemptive retry in the event of a retryable failure, without waiting for the calling thread to block on our completion, but we're protected in the event that the callback doesn't exit cleanly.
tensorboard/util/grpc_util.py
Outdated
| Args: | ||
| api_method: Callable for the API method to invoke. | ||
| request: Request protocol buffer to pass to the API method. | ||
| completion_handler: A function which takes a `grpc.Future` object as an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we call this done_callback to match the underlying grpc.Future naming and API? Then we can just point to the docs at https://grpc.github.io/grpc/python/grpc.html#grpc.Future.add_done_callback for describing the callback.
tensorboard/util/grpc_util.py
Outdated
| api_method, | ||
| request, | ||
| completion_handler, | ||
| num_remaining_tries=_GRPC_RETRY_MAX_ATTEMPTS - 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it's better not to expose details like this that are only needed for recursive calls in the function signature that's exposed to callers, since they shouldn't ever set num_remaining_tries or num_tries_so_far.
Instead we can just have two dedicated smaller functions nested inside this one, which close over the unchanging parts of the recursion (like the request and the API method and the clock) without having to pass them as arguments in the recursion; rather, we only pass arguments for the state that needs to be updated. E.g. like this:
def async_call_with_retries(
api_method,
request,
done_callback,
clock=None,
):
if clock is None:
clock = time
logger.debug("Async RPC call %s with request: %r", rpc_name, request)
def async_call(handler):
api_method.future(
request,
timeout=_GRPC_DEFAULT_TIMEOUT_SECS,
metadata=version_metadata(),
).add_done_callback(handler)
def retry_handler(future, num_attempts):
e = future.exception()
if e is None:
done_callback(future)
return
else:
logger.info("RPC call %s got error %s", rpc_name, e)
# If unable to retry, proceed to completion_handler.
if e.code() not in _GRPC_RETRYABLE_STATUS_CODES:
done_callback(future)
return
if num_attempts >= _GRPC_RETRY_MAX_ATTEMPTS:
done_callback(future)
return
# If able to retry, wait then do so.
backoff_secs = _compute_backoff_seconds(num_attempts + 1)
clock.sleep(backoff_secs)
async_call(functools.partial(retry_handler, num_attempts=num_attempts + 1))
return async_call(functools.partial(retry_handler, num_attempts=1))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do wonder a bit whether a callback API is really what we want here; the disadvantage is that (as
add_done_callback()explains) the error handling story is pretty bad in the event that some exception happens within the handling logic itself (which includes the callback provided by the caller of this utility):Exceptions raised in the callback will be logged at ERROR level, but will not terminate any threads of execution.
Agreed.
A possible compromise is to have the calling thread wait on completion with a timeout that's greater than the maximum of each RPC wait time plus the total possible backoff, but it's a little ugly to have to essentially "know" how long the retrying logic might run in order to wait for the right length of time.
Yeah, I've tried building the uploader using this logic, and you have to be careful to ensure that the callback passed to the async caller triggers some sort of watchable event, otherwise the uploader may complete before a retryable future is continued.
The alternative API that resolves this would be making
async_call_with_retries()return a future that the calling thread can hold onto and wait on confidently (knowing it'll unblock eventually). That future would essentially be a proxy over the future from the last attempt at sending the async RPC; blocking on the proxy future effectively just turnsasync_call_with_retriesintocall_with_retries. In that case, we can still setadd_done_callback()on the underlying future to use as a hint to basically trigger a preemptive retry in the event of a retryable failure, without waiting for the calling thread to block on our completion, but we're protected in the event that the callback doesn't exit cleanly.
Agreed that returning a grpc.Future would be a superior API. I spent around an hour trying to make this work, but was confounded by the following.
- I do not see how, in the given public API, to construct a grpc.Future outside of actually invoking the gRPC call. This implies that we must return one of the futures created by generating an RPC call.
- The future we wish to return the caller corresponds to exactly one of the gRPC calls, but we do not know which one at the time of the initial invocation, in fact, in general that future will not exist yet, since we will not want to create the second future until we are sure the first one has failed for a retryable reason.
I looked to see if there was a library or other relevant advice. The closest thing I could find was this SO question. https://stackoverflow.com/questions/61443958/how-to-wrap-rendezvous-future-returned-by-grpc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe using grpc.Method.future() isn't the right API here. Maybe we should be using https://github.com/lidizheng/proposal/blob/grpc-python-async-api/L58-python-async-api.md ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was envisioning that we'd just implement our own future class, which proxies to the appropriate underlying grpc.Future as need be, but doesn't expose it directly. We could probably have it subclass grpc.Future if we wanted, but I don't think it's strictly necessary, since all we really need in terms of an API for the caller is a single result() method that the calling code can invoke in order to block on the future completing.
Perhaps a simple way to avoid the issue of "exception in the caller-provided done callback gets swallowed` would be this. Define a Future class that will block on a completion event, representing the "completion of all possible retries":
class AsyncCallFuture:
def __init__(self, completion_event):
self._active_grpc_future = None
self._active_grpc_future_lock = threading.Lock()
self._completion_event = completion_event
def _set_active_future(self, grpc_future):
# maybe add validation logic here that future is not None
with self._active_grpc_future_lock:
self._active_grpc_future = grpc_future
def result(self, timeout):
self._completion_event.wait(timeout)
with self._active_grpc_future_lock:
if self._active_grpc_future is None:
raise RuntimeError("AsyncFuture never had an active future set")
self._active_grpc_future.result()Then within async_call_with_retries we basically keep the same logic that was outlined above with a few modifications:
- we create the completion event at the top, and create the wrapping
AsyncFutureas well - instead of accepting
done_callbackand invoking it, we call.set()on the completion event - when we launch a new RPC, we also call
._set_active_future()and set it as the active future for theAsyncFuture(we should do this before invoking.add_done_callback()to avoid any race condition) - finally, we update the whole thing to return the
AsyncFutureinstance
All told it would look like this:
def async_call_with_retries(
api_method,
request,
clock=None,
):
if clock is None:
clock = time
logger.debug("Async RPC call %s with request: %r", rpc_name, request)
completion_event = threading.Event()
async_future = AsyncFuture(completion_event)
def async_call(handler):
future = api_method.future(
request,
timeout=_GRPC_DEFAULT_TIMEOUT_SECS,
metadata=version_metadata(),
)
# Ensure we set active future before invoking done callback, to avoid case where
# done callback completes immediately and triggers completion event while
# async_future still holds the old future.
async_future._set_active_future(future)
future.add_done_callback(handler)
def retry_handler(future, num_attempts):
e = future.exception()
if e is None:
completion_event.set()
return
else:
logger.info("RPC call %s got error %s", rpc_name, e)
# If unable to retry, proceed to completion.
if e.code() not in _GRPC_RETRYABLE_STATUS_CODES:
completion_event.set()
return
if num_attempts >= _GRPC_RETRY_MAX_ATTEMPTS:
completion_event.set()
return
# If able to retry, wait then do so.
backoff_secs = _compute_backoff_seconds(num_attempts + 1)
clock.sleep(backoff_secs)
async_call(functools.partial(retry_handler, num_attempts=num_attempts + 1))
async_call(functools.partial(retry_handler, num_attempts=1))
return async_futureWith this approach, we're still ultimately relying on the callbacks to execute cleanly in order for the AsyncFuture to unblock (since otherwise the completion event will never be triggered). However, we've removed done_callback so there's no longer caller-provided code running in those callbacks, meaning we only need our own retry logic to avoid throwing exceptions, rather than also needing the arbitrary logic passed into done_callback doing the same thing. And this is a much simpler approach than the full-fledged version where we can recover even from errors in our own callbacks (since that basically requires two parallel retry procedures, one callback-based and one synchronous, plus the attendant coordination between the two).
Update: edited to replace threading.Condition with threading.Event since the latter has the semantics where waiting doesn't block if the event has already happened, which is what we want in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to this API. Thanks for writing clear, working code!
tensorboard/util/grpc_util.py
Outdated
| appropriate. | ||
| Args: | ||
| timeout: How long to wait in seconds before giving up and raising |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As written, this code won't necessarily raise a timeout exception, since it could be the case that we wait for completion and never get there, but then the active future is a completed one from the intermediate future. Imagine that we have timeout=0.000001 - then we basically skip the wait() on the completion event entirely, and we'll just return whatever the result of the first future was, when it finishes (could be success or failure).
Instead, it might be better to check the return value from wait() (docs) and then if it's False we raise a "timeout" exception ourselves. That seems better than exposing the intermediate state of whatever random underlying future happens to be active when the timeout expires.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks. Added explicit exception throwing and added a test for the case.
| # * If the grpc call succeeds: trigger the `completion_event`. | ||
| # * If there are no more retries: trigger the `completion_event`. | ||
| # * Otherwise, invoke a new async_call with the same | ||
| # retry_handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"...same retry_handler, but incrementing the number of attempts." ?
Motivation for features / changes
Towards making uploader asynchronous (for upload speed to TensorBoard.dev). Actually making the uploader use this new utility will follow in a separate PR. This portion is submitted separately in the spirit of smaller, easier to understand PRs.
Technical description of changes
Creates a new util API
async_call_with_retrieswhich mirrors the existingcall_with_retriesbut (unsurprisingly) returns right away and completes the gRPC call asynchronously, using the gRPCfutureAPI. The retry functionality is implemented as a recursive call where the continuation calls the same API with a decremented number of remaining tries.Test Plan:
Adds unit tests covering cases very similar to the
call_with_retriesfunctionality. I've also tested that this API meets the needs to make the uploader asynchronous.