KEMBAR78
[c10d] add an API to get the future result(success or failure) of a collective and customize error handling by shuqiangzhang · Pull Request #137799 · pytorch/pytorch · GitHub
Skip to content

Conversation

@shuqiangzhang
Copy link
Contributor

@shuqiangzhang shuqiangzhang commented Oct 11, 2024

Stack from ghstack (oldest at bottom):

Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and
customize their own error handling function, instead of watchdog thread crashing everything.

This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.

This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

cc @XilunWu @H-Huang @awgu @kwen2501 @wanchaol @fegin @fduwjj @wz337 @wconstab @d4l3k @c-p-i-o

Summary:
This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

[ghstack-poisoned]
@pytorch-bot
Copy link

pytorch-bot bot commented Oct 11, 2024

🔗 Helpful Links

🧪 See artifacts and rendered test results at hud.pytorch.org/pr/137799

Note: Links to docs will display an error until the docs builds have been completed.

✅ No Failures

As of commit 52468c5 with merge base 8c860ae (image):
💚 Looks good so far! There are no failures yet. 💚

This comment was automatically generated by Dr. CI and updates every 15 minutes.

@pytorch-bot pytorch-bot bot added oncall: distributed Add this issue/PR to distributed oncall triage queue release notes: distributed (c10d) release notes category labels Oct 11, 2024
shuqiangzhang added a commit that referenced this pull request Oct 11, 2024
Summary:
This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

ghstack-source-id: a4394b0
Pull Request resolved: #137799
@shuqiangzhang shuqiangzhang requested review from H-Huang, fduwjj, kwen2501 and wconstab and removed request for kwen2501 October 11, 2024 18:28
@shuqiangzhang shuqiangzhang changed the title [c10d] add an API to get the result(success or failure) of a collective [c10d] add an API to get the future result(success or failure) of a collective and customize error handling Oct 11, 2024
numelIn_(w.numelIn_),
numelOut_(w.numelOut_),
store_(w.store_),
futureWorkResult_(w.futureWorkResult_),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increment the ref of the same future

…ure) of a collective and customize error handling"


Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and 
customize their own error handling function, instead of watchdog thread crashing everything.

This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.

This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

cc XilunWu H-Huang awgu kwen2501 wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
shuqiangzhang added a commit that referenced this pull request Oct 11, 2024
Summary:
This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

ghstack-source-id: bab8ee4
Pull Request resolved: #137799
if self.rank == 0:
work = process_group.allreduce(torch.rand(10).cuda(self.rank))
work.wait()
result = work.get_future_result().wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this line would block the CPU until the collective timed out?

is this API still useful in the case where watchdog is enabled? or is it 'undefined' whether watchdog would time out first or this future result would return first?

Copy link
Contributor Author

@shuqiangzhang shuqiangzhang Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, blocks until the nccl error happens, the users can decide if they want to wait/block the main CPU thread. This mode is useful when TORCH_NCCL_ASYNC_ERROR_HANDLING is 0 (no handling) currently, such that watchdog only monitors/detects timeout but does not 'kill'/'tear down the process. even watchdog thread kill the process (TORCH_NCCL_ASYNC_ERROR_HANDLING = teardown), the behavior should be defined as the call back triggered first by markCompleted (if registered) and then watchdog thread throws exceptions to crash the process

enum class WorkResult : std::uint8_t {
SUCCESS = 0,
FAILURE = 1,
UNKNOWN = 100,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why jump from 1 to 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no special meaning, just followed the enum tradition the line of 34 (enum class OpType) for unknown result.

users can use ``fut.wait()`` to blocking wait for the completion of the work and
get the WorkResult by ``fut.value()``.
Also, users can use ``fut.then(call_back_func)`` to register a callback function to be called
when the work is completed, without blocking the current thread.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which thread will the callback run on?

Copy link
Contributor Author

@shuqiangzhang shuqiangzhang Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually the callback would be run on the thread which marks the future as 'complete', e.g., (https://www.internalfb.com/code/fbsource/[9d20cc497d9438a63edfc443cdb319936f008d66]/fbcode/caffe2/aten/src/ATen/core/ivalue_inl.h?lines=962). But if the future is already completed when users registers the callback, the callback function would be ran on the same thread where usr registration happens

…ure) of a collective and customize error handling"


Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and 
customize their own error handling function, instead of watchdog thread crashing everything.

This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.

This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

cc XilunWu H-Huang awgu kwen2501 wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
shuqiangzhang added a commit that referenced this pull request Oct 11, 2024
Summary:
This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

ghstack-source-id: bfa69dd
Pull Request resolved: #137799
…ure) of a collective and customize error handling"


Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and 
customize their own error handling function, instead of watchdog thread crashing everything.

This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.

This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

cc XilunWu H-Huang awgu kwen2501 wanchaol fegin fduwjj wz337 wconstab d4l3k c-p-i-o

[ghstack-poisoned]
Comment on lines +2655 to +2657
def print_fut(fut):
print("future is ready:")
print(fut.value())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we can use a callback function to manipulate some values so that we can verify the time when it is called rather than just do the printing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update in the stacked PR: #138001

@shuqiangzhang
Copy link
Contributor Author

@pytorchbot merge

@pytorch-bot pytorch-bot bot added the ciflow/trunk Trigger trunk jobs on your pull request label Oct 15, 2024
@pytorchmergebot
Copy link
Collaborator

Merge started

Your change will be merged once all checks pass (ETA 0-4 Hours).

Learn more about merging in the wiki.

Questions? Feedback? Please reach out to the PyTorch DevX Team

Advanced Debugging
Check the merge workflow status
here

Copy link
Contributor

@wconstab wconstab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good. But can you find the right place to update docs? I think our main doc page should include a section on error-handling modes and describe which modes we encourage.

And the doc should include the detail you answered to one of my questions- the semantics of enabling watchdog for teardown, but also registering a callback. -- The user can use the callback for printing/logging, but still rely on watchdog to tear down the process.

@shuqiangzhang
Copy link
Contributor Author

I think this is good. But can you find the right place to update docs? I think our main doc page should include a section on error-handling modes and describe which modes we encourage.

And the doc should include the detail you answered to one of my questions- the semantics of enabling watchdog for teardown, but also registering a callback. -- The user can use the callback for printing/logging, but still rely on watchdog to tear down the process.

Sure, Will update. This new API would still needs watchdog thread to call cudaEventDestroy, which is risky in hanging. I am contemplating some further changes. Will update the doc after that

pytorchmergebot pushed a commit that referenced this pull request Oct 16, 2024
Summary:
Blocking wait mode is not widely used, probably useful in debugging.
in blockingWait mode, we don't need to enable the watchdog thread to
check the timeout or nccl error because the main thread would throw an
exception if error happens and it is obvious to user which work fails
and its user's responsibility to handle the exception.
Test Plan:
CI

Pull Request resolved: #138001
Approved by: https://github.com/fduwjj, https://github.com/c-p-i-o
ghstack dependencies: #137799
jackzhxng pushed a commit that referenced this pull request Oct 16, 2024
…ollective and customize error handling (#137799)

Summary:
This PR is trying to let users to know what exact collective call from the python thread is failing, and
customize their own error handling function, instead of watchdog thread crashing everything.

This is potentially very useful in fault tolerant training, in which we can have in-process restart.
E.g., when an nccl error is detected, users can potentially abort comms, re-init comms and go back to the previous check pointed step and try again, instead of crashing the whole job.

This is to allow users to check the status of each collective call,
using the ivalue::future libs in PT core. This also allows users to
attach its customized failure handling functions by:
work.get_future_result().then(erro_handling_func)

Note that the above call is also non-blocking for CPU thread
Test Plan:
Added a new test: test_get_future_result to verify the workResutl is
correctly propagated to the users

Tags:

Pull Request resolved: #137799
Approved by: https://github.com/fduwjj, https://github.com/wconstab
jackzhxng pushed a commit that referenced this pull request Oct 16, 2024
Summary:
Blocking wait mode is not widely used, probably useful in debugging.
in blockingWait mode, we don't need to enable the watchdog thread to
check the timeout or nccl error because the main thread would throw an
exception if error happens and it is obvious to user which work fails
and its user's responsibility to handle the exception.
Test Plan:
CI

Pull Request resolved: #138001
Approved by: https://github.com/fduwjj, https://github.com/c-p-i-o
ghstack dependencies: #137799
@kwen2501
Copy link
Contributor

How does this compare with the onCompletionHook logic?
Is there a chance of consolidation?

@kwen2501
Copy link
Contributor

Shall we use "COMPLETE" instead of "SUCCESS" to align with the isComplete API?
There is another API called isSuccess but is being deprecated. (We don't want to attest a collective is successful)

@github-actions github-actions bot deleted the gh/shuqiangzhang/48/head branch November 18, 2024 02:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ciflow/trunk Trigger trunk jobs on your pull request Merged oncall: distributed Add this issue/PR to distributed oncall triage queue release notes: distributed (c10d) release notes category

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants