KEMBAR78
Signal handling in DataLoader workers; Timeout option by ssnl · Pull Request #3474 · pytorch/pytorch · GitHub
Skip to content

Conversation

@ssnl
Copy link
Collaborator

@ssnl ssnl commented Nov 3, 2017

  1. Make DataLoader workers a bit more verbose on bus error and segfault, implemented with C++ signal handlers. Partially addresses possible deadlock in dataloader #1355
  2. Adding timeout option to DataLoader, implemented with SIGALRM. Timeout option for parallel DataLoader #2474

Test plan (modified from script by @fmassa in #1595 ):

  1. Segfault case:
    script:
class DS(object):
    def __getitem__(self, idx):
        import ctypes;ctypes.string_at(0)
        return torch.rand(1000000)
    def __len__(self):
        return 200

ds = DS()
it = torch.utils.data.DataLoader(ds, batch_size=10, num_workers=10)

for i, data in enumerate(it):
    print(i)

output:

[~] python ds.py
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
ERROR: Unexpected segmentation fault encountered in worker.
[~] # Doesn't hang anymore
  1. Timeout:
    script:
import time

class DS(object):
    def __getitem__(self, idx):
        time.sleep(10)
        return torch.rand(1000000)
    def __len__(self):
        return 200

ds = DS()
it = torch.utils.data.DataLoader(ds, batch_size=10, num_workers=10, timeout=2)

for i, data in enumerate(it):
    print(i)

output:

[~] python ds.py
ERROR: Time out when fetching data in DataLoader.
[~]

Since this is deep into multiprocessing and low level signal handling, there might be cases/things I haven't considered. I'll also leave some comment below.

cc @apaszke @colesbury

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

Copy link
Member

@colesbury colesbury left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty good. Can you add an automated test:

  1. That the main process exits if the child segfaults
  2. That the main process exits if the get times out (use a very small timeout)

You can probably trigger a segfault in the child with ctypes.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

@vadimkantorov
Copy link
Contributor

vadimkantorov commented Nov 4, 2017

Does it address the possible hangs of index_queue.put or for data_queue.get? Usually it's data_queue.get that can hang, but in some corner cases (like priming) index_queue.put can hang too (in the case all worker threads are waiting on something, say, on data_queue.put).

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 4, 2017

@vadimkantorov It addresses it in two cases:

  1. In case where workers die and causing loader proc to hang, now the loader proc will exit since it checks in SIGCHLD handler. In addition, workers dying from segfault and bus error (e.g. shm issue) now prints some message.
  2. In case where workers hang, well, there is not much we can do from PyTorch side. But the new timeout option should allow the loader to at least not hang, if you use it.

@vadimkantorov
Copy link
Contributor

@ssnl Regarding (2): There seem at least two places where the loader can hang: data_queue.get and index_queue.put (less obvious). Does the timeout apply to both?

Copy link
Contributor

@apaszke apaszke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about the safety and interactions of this patch. Signals are a process-wide mechanism and might interact both with Python runtime, and mix if you have multiple data loaders. Handling SIGCHLD makes sense (if only it doesn't break multiprocessing), but I'd rather not use SIGALRM

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

@peterjc123
Copy link
Collaborator

There're two kinds of possible solutions for cross-platform SIGALARM.

  1. Use a thread based model.(threading/Event) Kills/Locks the thread after timeout. Light but will not be safe if there's something to do with GIL. examples: 1 and 2
  2. Use a process based model.(multiprocessing/subprocess) Kills the process after timeout. It's safe and easy to implement, but will be heavy. examples: 1 and 2

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 6, 2017

One of the link @peterjc123 mentioned (https://eli.thegreenplace.net/2011/08/22/how-not-to-set-a-timeout-on-a-computation-in-python) makes me wonder if our _pin_memory_loop can possibly live as a zombie thread in case such as a worker hangs, main thread is interrupted and the thread waits forever on getting data.

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 6, 2017

@vadimkantorov Sorry I missed your comment earlier. Why should index_queue.put() hang? According to my understanding, only the main proc puts to that queue.

@vadimkantorov
Copy link
Contributor

vadimkantorov commented Nov 6, 2017

@ssnl Indeed, only the main proc puts to that queue and it may lock during doing so. It will lock if DataLoader puts more than 64Kb (SimpleQueue is implemented by default using system pipes, if I understand correctly). This may happen in the extreme case when index is some serialized object or a very long Python array. You can easily repro this by creating a SimpleQueue and putting range(1000000) in it.

Then, if for some reason the reason the workers are blocked too, no-one will read from the other side of index_queue, so index_queue.put would lock indefinitely. This may theoretically happen during data loader priming, when the main thread can't consume from data_queue, so in the case of small number of workers and large serialized batch objects and indices data loader priming should hang.

I tried to showcase this scenario here but hit another problem first: #2474 (comment)

Having the timeout option cover this case will be a great safeguard.

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 6, 2017

@vadimkantorov Thanks for the explanation! Unfortunately this PR won't solve that issue. Hopefully such an extreme issue won't come up often :)

@vadimkantorov
Copy link
Contributor

vadimkantorov commented Nov 6, 2017

@ssnl This would happen even if indices object is small, but the workers for some reason are very slow or locked (it just would take some time to fill 64Kb queue). Can we detect if the queue contains a near-critical number of bytes before putting on it? and showing a UserWarning the first time it happens

Though it would sure complicate the logic.

In practice this situation happens when multiprocessing.set_start_method('spawn') is not called and OpenCV (resizing and image decoding primarily) is used in worker threads without explicit cv2.setNumThreads(0) (for python2 which doens't have spawn). Then the workers would block, but the user sees an exception in index_queue.put. This happened for me, and that's how I became aware of all this.

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 6, 2017

@vadimkantorov I see. But that is not easy to do in this PR, especially now that I moved away from using SIGALRM due to concerns raised by @apaszke . I think adding another thread to data loader will solve it, but I'm not sure if it is necessary. :) I'll keep this in mind, thanks for bringing it up!

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 6, 2017

Addressing the comments by @colesbury and @apaszke , I updated PR:

  1. Removed SIGALRM logic. Now the DataLoader will create a separate thread if eitherpin_memory is set or timeout > 0. Since the thread communicates with the main thread with a queue.Queue, we do timeout there.

  2. The SIGCHLD handler now only waitpid on the child processes of interest, so Python can update the is_alive() and exitcode for other processes.

  3. Re-wrote the part to remove pids from the collections so that at any time, the array contains only valid pids (some might be dead workers), and includes all alive pids.

  4. Added std::atomic on several variables. Did is_log_free check.

  5. Added basic level error handling code for library functions. Used HANDLE_TH_ERRORS to wrap the C++ functions.

  6. Moved the new C++ functions to a separate file. Added dummy implementation for Windows.

  7. Add tests. Unfortunately, for test_segfault, the print to stderr is from C++, so I don't know if there is a way to hide that on Python side. So passing the tests will still show a single line error message ERROR: Unexpected segmentation fault encountered in worker..

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

@vadimkantorov
Copy link
Contributor

@ssnl I saw you used queue.Queue (instead of multiprocessing.SimpleQueue for data_queue if timeout > 0: https://github.com/SsnL/pytorch/blob/55158acd5c8c51e6ebad13f52a53c108580d8caf/torch/utils/data/dataloader.py#L176-L177

Could you use queue.Queue for index_queue in that case as well? put method also has an optional timeout argument

And probably queue.Queue has a larger message size compared to multiprocessing.SimpleQueue.

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 7, 2017 via email

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

@ssnl ssnl force-pushed the to branch 2 times, most recently from 6715db9 to afe8bf1 Compare November 7, 2017 21:29
@ssnl
Copy link
Collaborator Author

ssnl commented Nov 7, 2017

Updated PR for @colesbury 's and @apaszke 's comments:

  1. Use WNOWAIT with waitid so that Python can get the status.

  2. Call the overridden SIGCHLD handler.

  3. Changed the set handler macro to a helper function.

  4. Now the pids array and vector do not share same underlying storage. Instead, a helper function is added to copy vector content to the array.

  5. Hide stderr outputs in tests by sys.stderr.close()

  6. Fixed outdated comments and checks. Addressed other comments on best practices.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

@ssnl ssnl force-pushed the to branch 2 times, most recently from aed0c3b to d37c716 Compare November 21, 2017 22:06
@ssnl
Copy link
Collaborator Author

ssnl commented Nov 21, 2017

After thinking about this, I realize that we can safely use Python's signal module to register a pure Python side SIGCHLD handler because SIGCHLD is not an error and thus not executing in C is not an issue.

In the Python SIGCHLD handler, I call a C side function which then access a map of id(DataLoader)->set_of_worker_pids. The function checks each worker of each loader, and throws a runtime error if any of them errored.

This comment was marked as off-topic.

This comment was marked as off-topic.

@apaszke
Copy link
Contributor

apaszke commented Nov 21, 2017

If you're going to move the handler to Python, why can't you move all the data structures to Python too?

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 21, 2017

@apaszke Because waitid is not available in Python 2.7

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 21, 2017

Oh no these gloo changes again D: How did you solve these last time @soumith ?

Copy link
Contributor

@apaszke apaszke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks much better. I have a few minor comments, but should be good to go after that

This comment was marked as off-topic.

(infop.si_code == CLD_KILLED) ||
(infop.si_code == CLD_DUMPED)) {
std::ostringstream oss;
oss << "DataLoader worker (pid " << pid << ") exited unexpectedly.";

This comment was marked as off-topic.

try:
signal.signal(signal.SIGCHLD, handler)
except ValueError as _:
return # Windows doesn't support this

This comment was marked as off-topic.

def handler(signum, frame):
_error_if_any_worker_fails()
if callable(previous_handler):
previous_handler(signum, frame)

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.



_use_shared_memory = False
_SIGCHLD_handler_set = False

This comment was marked as off-topic.

# if all workers hang, no None is sent to worker_manager_thread, we
# put None to let worker_manager_thread exit
# empty check prevents put from hanging
if self.worker_result_queue.empty():

This comment was marked as off-topic.

This comment was marked as off-topic.

self.done_event.set()
# if worker_manager_thread is waiting to put
if not self.data_queue.empty():
self.data_queue.get()

This comment was marked as off-topic.


def __del__(self):
if self.num_workers > 0:
self._remove_worker_pids_information()

This comment was marked as off-topic.

r = index_queue.get()
if r is None:
data_queue.put(None)
break

This comment was marked as off-topic.

Copy link
Contributor

@apaszke apaszke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. The only thing I thought about now is that I think WNOWAIT is a Linux-only thing. It would be good to verify that this diff doesn't break OS X builds.

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 29, 2017

The set was not properly cleared. This is fixed in new commit.

@apaszke I tested on OSX and they all work just as in CentOS except that the main process can't get what signal kills the worker process. The exit code != 0 case is still fine. I don't think it is a big issue. Here is an output comparison:

OSX:

ERROR: Unexpected segmentation fault encountered in worker.
Traceback (most recent call last):
  File "ds.py", line 17, in <module>
    for i, data in enumerate(it):
  File "/Users/ssnl/anaconda3/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 254, in __next__
    idx, batch = self._get_batch()
  File "/Users/ssnl/anaconda3/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 233, in _get_batch
    return self.data_queue.get()
  File "/Users/ssnl/anaconda3/lib/python3.6/multiprocessing/queues.py", line 342, in get
    res = self._reader.recv_bytes()
  File "/Users/ssnl/anaconda3/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Users/ssnl/anaconda3/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/Users/ssnl/anaconda3/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/Users/ssnl/anaconda3/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 163, in handler
    _error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 16545) is killed by signal: Unknown signal: 0.

CentOS

ERROR: Unexpected segmentation fault encountered in worker.
Traceback (most recent call last):
  File "ds.py", line 17, in <module>
    for i, data in enumerate(it):
  File "/home/ssnl/sftp/pytorch/torch/utils/data/dataloader.py", line 254, in __next__
    idx, batch = self._get_batch()
  File "/home/ssnl/sftp/pytorch/torch/utils/data/dataloader.py", line 233, in _get_batch
    return self.data_queue.get()
  File "/home/ssnl/anaconda3/lib/python3.6/multiprocessing/queues.py", line 343, in get
    res = self._reader.recv_bytes()
  File "/home/ssnl/anaconda3/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/home/ssnl/anaconda3/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/home/ssnl/anaconda3/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/home/ssnl/sftp/pytorch/torch/utils/data/dataloader.py", line 163, in handler
    _error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 1864508) is killed by signal: Segmentation fault.

@ssnl
Copy link
Collaborator Author

ssnl commented Nov 29, 2017

Sorry I forgot error checking for reset signal handler in worker handler. I also shouldn't use the nonportable signal(2). Fixed in new commit.

if (sigemptyset(&sa.sa_mask) != 0 || sigaction(SIGNAL, &sa, NULL) != 0) { \
_exit(EXIT_FAILURE); \
} else { \
raise(SIGNAL); \

This comment was marked as off-topic.

This comment was marked as off-topic.

sa.sa_flags = SA_RESTART|SA_SIGINFO|SA_NOCLDSTOP;
sigemptyset(&sa.sa_mask);
if (sigaction(signal, &sa, old_sa_ptr) != 0) {
sa.sa_flags = SA_RESTART|SA_SIGINFO|SA_NOCLDSTOP|SA_NODEFER;

This comment was marked as off-topic.

This comment was marked as off-topic.

This comment was marked as off-topic.

@apaszke apaszke merged commit 1661370 into pytorch:master Nov 29, 2017
@ssnl ssnl deleted the to branch November 29, 2017 22:55
@soumith soumith added the 0.3.1 label Feb 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants