-
Notifications
You must be signed in to change notification settings - Fork 25.7k
[c10d] add an API to get the future result(success or failure) of a collective and customize error handling #137799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary: This is to allow users to check the status of each collective call, using the ivalue::future libs in PT core. This also allows users to attach its customized failure handling functions by: work.get_future_result().then(erro_handling_func) Note that the above call is also non-blocking for CPU thread Test Plan: Added a new test: test_get_future_result to verify the workResutl is correctly propagated to the users Tags: [ghstack-poisoned]
🔗 Helpful Links🧪 See artifacts and rendered test results at hud.pytorch.org/pr/137799
Note: Links to docs will display an error until the docs builds have been completed. ✅ No FailuresAs of commit 52468c5 with merge base 8c860ae ( This comment was automatically generated by Dr. CI and updates every 15 minutes. |
Summary: This is to allow users to check the status of each collective call, using the ivalue::future libs in PT core. This also allows users to attach its customized failure handling functions by: work.get_future_result().then(erro_handling_func) Note that the above call is also non-blocking for CPU thread Test Plan: Added a new test: test_get_future_result to verify the workResutl is correctly propagated to the users Tags: ghstack-source-id: a4394b0 Pull Request resolved: #137799
numelIn_(w.numelIn_), | ||
numelOut_(w.numelOut_), | ||
store_(w.store_), | ||
futureWorkResult_(w.futureWorkResult_), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increment the ref of the same future
…ure) of a collective and customize error handling" Summary: This PR is trying to let users to know what exact collective call from the python thread is failing, and customize their own error handling function, instead of watchdog thread crashing everything. This is potentially very useful in fault tolerant training, in which we can have in-process restart. E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job. This is to allow users to check the status of each collective call, using the ivalue::future libs in PT core. This also allows users to attach its customized failure handling functions by: work.get_future_result().then(erro_handling_func) Note that the above call is also non-blocking for CPU thread Test Plan: Added a new test: test_get_future_result to verify the workResutl is correctly propagated to the users Tags: cc XilunWu H-Huang awgu kwen2501 wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
Summary: This is to allow users to check the status of each collective call, using the ivalue::future libs in PT core. This also allows users to attach its customized failure handling functions by: work.get_future_result().then(erro_handling_func) Note that the above call is also non-blocking for CPU thread Test Plan: Added a new test: test_get_future_result to verify the workResutl is correctly propagated to the users Tags: ghstack-source-id: bab8ee4 Pull Request resolved: #137799
if self.rank == 0: | ||
work = process_group.allreduce(torch.rand(10).cuda(self.rank)) | ||
work.wait() | ||
result = work.get_future_result().wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this line would block the CPU until the collective timed out?
is this API still useful in the case where watchdog is enabled? or is it 'undefined' whether watchdog would time out first or this future result would return first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, blocks until the nccl error happens, the users can decide if they want to wait/block the main CPU thread. This mode is useful when TORCH_NCCL_ASYNC_ERROR_HANDLING is 0 (no handling) currently, such that watchdog only monitors/detects timeout but does not 'kill'/'tear down the process. even watchdog thread kill the process (TORCH_NCCL_ASYNC_ERROR_HANDLING = teardown), the behavior should be defined as the call back triggered first by markCompleted (if registered) and then watchdog thread throws exceptions to crash the process
enum class WorkResult : std::uint8_t { | ||
SUCCESS = 0, | ||
FAILURE = 1, | ||
UNKNOWN = 100, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why jump from 1 to 100?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no special meaning, just followed the enum tradition the line of 34 (enum class OpType) for unknown result.
users can use ``fut.wait()`` to blocking wait for the completion of the work and | ||
get the WorkResult by ``fut.value()``. | ||
Also, users can use ``fut.then(call_back_func)`` to register a callback function to be called | ||
when the work is completed, without blocking the current thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which thread will the callback run on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually the callback would be run on the thread which marks the future as 'complete', e.g., (https://www.internalfb.com/code/fbsource/[9d20cc497d9438a63edfc443cdb319936f008d66]/fbcode/caffe2/aten/src/ATen/core/ivalue_inl.h?lines=962). But if the future is already completed when users registers the callback, the callback function would be ran on the same thread where usr registration happens
…ure) of a collective and customize error handling" Summary: This PR is trying to let users to know what exact collective call from the python thread is failing, and customize their own error handling function, instead of watchdog thread crashing everything. This is potentially very useful in fault tolerant training, in which we can have in-process restart. E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job. This is to allow users to check the status of each collective call, using the ivalue::future libs in PT core. This also allows users to attach its customized failure handling functions by: work.get_future_result().then(erro_handling_func) Note that the above call is also non-blocking for CPU thread Test Plan: Added a new test: test_get_future_result to verify the workResutl is correctly propagated to the users Tags: cc XilunWu H-Huang awgu kwen2501 wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
Summary: This is to allow users to check the status of each collective call, using the ivalue::future libs in PT core. This also allows users to attach its customized failure handling functions by: work.get_future_result().then(erro_handling_func) Note that the above call is also non-blocking for CPU thread Test Plan: Added a new test: test_get_future_result to verify the workResutl is correctly propagated to the users Tags: ghstack-source-id: bfa69dd Pull Request resolved: #137799
…ure) of a collective and customize error handling" Summary: This PR is trying to let users to know what exact collective call from the python thread is failing, and customize their own error handling function, instead of watchdog thread crashing everything. This is potentially very useful in fault tolerant training, in which we can have in-process restart. E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job. This is to allow users to check the status of each collective call, using the ivalue::future libs in PT core. This also allows users to attach its customized failure handling functions by: work.get_future_result().then(erro_handling_func) Note that the above call is also non-blocking for CPU thread Test Plan: Added a new test: test_get_future_result to verify the workResutl is correctly propagated to the users Tags: cc XilunWu H-Huang awgu kwen2501 wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
def print_fut(fut): | ||
print("future is ready:") | ||
print(fut.value()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally we can use a callback function to manipulate some values so that we can verify the time when it is called rather than just do the printing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update in the stacked PR: #138001
@pytorchbot merge |
Merge startedYour change will be merged once all checks pass (ETA 0-4 Hours). Learn more about merging in the wiki. Questions? Feedback? Please reach out to the PyTorch DevX Team |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is good. But can you find the right place to update docs? I think our main doc page should include a section on error-handling modes and describe which modes we encourage.
And the doc should include the detail you answered to one of my questions- the semantics of enabling watchdog for teardown, but also registering a callback. -- The user can use the callback for printing/logging, but still rely on watchdog to tear down the process.
Sure, Will update. This new API would still needs watchdog thread to call cudaEventDestroy, which is risky in hanging. I am contemplating some further changes. Will update the doc after that |
Summary: Blocking wait mode is not widely used, probably useful in debugging. in blockingWait mode, we don't need to enable the watchdog thread to check the timeout or nccl error because the main thread would throw an exception if error happens and it is obvious to user which work fails and its user's responsibility to handle the exception. Test Plan: CI Pull Request resolved: #138001 Approved by: https://github.com/fduwjj, https://github.com/c-p-i-o ghstack dependencies: #137799
…ollective and customize error handling (#137799) Summary: This PR is trying to let users to know what exact collective call from the python thread is failing, and customize their own error handling function, instead of watchdog thread crashing everything. This is potentially very useful in fault tolerant training, in which we can have in-process restart. E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job. This is to allow users to check the status of each collective call, using the ivalue::future libs in PT core. This also allows users to attach its customized failure handling functions by: work.get_future_result().then(erro_handling_func) Note that the above call is also non-blocking for CPU thread Test Plan: Added a new test: test_get_future_result to verify the workResutl is correctly propagated to the users Tags: Pull Request resolved: #137799 Approved by: https://github.com/fduwjj, https://github.com/wconstab
Summary: Blocking wait mode is not widely used, probably useful in debugging. in blockingWait mode, we don't need to enable the watchdog thread to check the timeout or nccl error because the main thread would throw an exception if error happens and it is obvious to user which work fails and its user's responsibility to handle the exception. Test Plan: CI Pull Request resolved: #138001 Approved by: https://github.com/fduwjj, https://github.com/c-p-i-o ghstack dependencies: #137799
How does this compare with the |
Shall we use "COMPLETE" instead of "SUCCESS" to align with the |
Stack from ghstack (oldest at bottom):
Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and
customize their own error handling function, instead of watchdog thread crashing everything.
This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.
This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)
Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users
Tags:
cc @XilunWu @H-Huang @awgu @kwen2501 @wanchaol @fegin @fduwjj @wz337 @wconstab @d4l3k @c-p-i-o