KEMBAR78
gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock by elfstrom · Pull Request #108513 · python/cpython · GitHub
Skip to content

Conversation

@elfstrom
Copy link
Contributor

@elfstrom elfstrom commented Aug 26, 2023

This fixes issue #105829

The _ExecutorManagerThread wake-up code could deadlock if the wake-up pipe filled up and blocked.

Please review this very carefully, I'm not 100% convinced I understand all the details here. I'm mostly submitting this as a proof of concept test/fix due to the slow progress on the issue. Hopefully this can serve as a starting point at least.

If the management thread does not clear the wakeup pipe fast enough the
wakeup code will block holding the shutdown lock causing deadlock.

python#105829
This fixes issue python#105829, python#105829

The _ExecutorManagerThread wake-up code could deadlock if the wake-up
pipe filled up and blocked. The relevant code looked like this:

class _ThreadWakeup:

    def wakeup(self):
        if not self._closed:
            self._writer.send_bytes(b"")

    def clear(self):
        if not self._closed:
            while self._reader.poll():
                self._reader.recv_bytes()

class ProcessPoolExecutor(_base.Executor):

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            ...
            # Wake up queue management thread
            self._executor_manager_thread_wakeup.wakeup()

class _ExecutorManagerThread(threading.Thread):

    def wait_result_broken_or_wakeup(self):
        ...
        with self.shutdown_lock:
            self.thread_wakeup.clear()

The shutdown_lock must be taken for both reads and writes of the
wake-up pipe. If a read or a write of the pipe blocks, the code will
deadlock. It looks like reads can't block (a poll() is done before
doing any reads) but writes have not protection against blocking.

If the _ExecutorManagerThread cannot keep up and clear the wake-up
pipe it will fill up and block. This seems to have been rather easy to
do in the real world as long as the number of tasks is more than
100000 or so.

With this change we make the writes to the wake-up pipe non
blocking. If the pipe blocks we will simply skip the write. This
should be OK since the reason for the problem is that both reader and
writer must hold the shutdown_lock when accessing the pipe. That
should imply that we don't need to send anything if the pipe is full,
the reader can't be reading it concurrently, it will eventually wake
up due to the data already in the pipe.
@ghost
Copy link

ghost commented Aug 26, 2023

All commit authors signed the Contributor License Agreement.
CLA signed

@bedevere-bot
Copy link

Most changes to Python require a NEWS entry.

Please add it using the blurb_it web app or the blurb command-line tool.

@cjw296 cjw296 self-requested a review August 30, 2023 08:09
Try to reduce the size of the pipe to make the test faster.
The solution only works on Unix-like platforms, we fall back
on a hardcoded size for other platforms.
@elfstrom
Copy link
Contributor Author

elfstrom commented Sep 3, 2023

@cjw296 Should we switch to the fix proposed by @pitrou? If we are confident it is correct it seems like a better solution, removing the lock contention completely for the wakeup pipe.

@cjw296
Copy link
Contributor

cjw296 commented Sep 4, 2023

@elfstrom - does the test still not hang with @pitrou 's patch in place of your original one?

@elfstrom
Copy link
Contributor Author

elfstrom commented Sep 4, 2023

@elfstrom - does the test still not hang with @pitrou 's patch in place of your original one?

Yes, the test confirms that deadlocks does not happen using either solution. The benefit of pitrou's version is that we also remove the lock contention for read/write of the wakeup pipe. The submitting main thread can still potentially block in that version but as soon as the manager thread wakes up it will clear the wakeup pipe and the writer should be able to submit several thousand more tasks before it could potentially block again, 16k tasks for the default linux 64k pipe. Since there are already other cases in the submit code that could block I don't think this is an issue. The important thing is that the blocking wont cause a deadlock in this case.

The bigger issue might be if we can't say for sure that removing the lock on the reader is safe. Both @pitrou and I have looked at the code and we believe it is safe to do but that is no guarantee. It would be really nice if there was some sort of stress test that could be used to really hammer this code for a while to increase confidence in this type of change but I haven't found anything like that in the tests. Is there something like that available somewhere?

@gciaberi
Copy link

gciaberi commented Sep 7, 2023

For those looking for a temporary fix, here is a drop-in replacement of concurrent.futures.ProcessPoolExecutor that I have successfully used to overcome the deadlock problem. It simply limits the number of submitted tasks using a bounded semaphore (credits to https://github.com/mowshon/bounded_pool_executor/tree/master for the idea).

from concurrent.futures import ProcessPoolExecutor
from threading import BoundedSemaphore

class BoundedProcessPoolExecutor(ProcessPoolExecutor):
    def __init__(self, max_workers):
        super().__init__(max_workers=max_workers)
        self.semaphore = BoundedSemaphore(value=2*max_workers)

    def submit(self, fn, *args, **kwargs):
        self.semaphore.acquire()
        future = super().submit(fn, *args, **kwargs)
        future.add_done_callback(self.release)

        return future
    
    def release(self, _):
        """Called once a task is done, releases one queue slot."""
        self.semaphore.release()

@cjw296
Copy link
Contributor

cjw296 commented Sep 9, 2023

@gciaberi - yes, I'd wondered why a Queue was used for this, pumping empty strings down feels hacky compared to a bounded semphore. @pitrou / @elfstrom - how do you both feel about switching ProcessPoolExecutor to use a BoundedSemaphore instead of a queue? @sharvil: is there any chance you could this this BoundedSemaphore approach instead and see if it fixes your issues?

@pitrou
Copy link
Member

pitrou commented Sep 9, 2023 via email

@elfstrom
Copy link
Contributor Author

elfstrom commented Sep 9, 2023

@gciaberi - yes, I'd wondered why a Queue was used for this, pumping empty strings down feels hacky compared to a bounded semphore. @pitrou / @elfstrom - how do you both feel about switching ProcessPoolExecutor to use a BoundedSemaphore instead of a queue?

My assumption about the current design is that a pipe was chosen for the wakeup mechanism because it allows the management thread to use select (connection.wait) to wait on the FDs of both the result queue, the worker process termination sentinels and the wakeup events at the same time and wake when any one of these events occur.

A semaphore is not compatible with this type of wait mechanism as far as I know. It might be possible to use a semaphore anyway by converting the other events to trigger the semaphore as well but that seems like it could be complicated and lead to unintended side-effects and problematic corner-case behaviour.

I would be very hesitant to make such a large change in the implementation strategy of this module given the inherent complexity of the problem and the relatively low amount of testing available.

@pitrou
Copy link
Member

pitrou commented Sep 9, 2023 via email

@cjw296
Copy link
Contributor

cjw296 commented Sep 10, 2023

@elfstrom - makes sense! Let's stick with @pitrou's approach then :-)

…adlock

This reverts the previous fix and instead opts to remove the locking
completely when clearing the wakeup pipe.

We can do this because clear() and close() are both called from the
same thread and nowhere else.

In this version of this fix, the call to ProcessPoolExecutor.submit
can still block on the wakeup pipe if it happens to fill up. This
should not be an issue as there are already other cases where the
submit call can block and if the wakeup pipe is full it implies
there is already a lot of work items queued up.

Co-authored-by: Antoine Pitrou <antoine@python.org>
@elfstrom
Copy link
Contributor Author

@elfstrom - makes sense! Let's stick with @pitrou's approach then :-)

I pushed a fixup for this. I'm not sure what the policy is for commit authorship attribution. I snuck a Co-authored-by: Antoine Pitrou <antoine@python.org> in there. Please let me know if that's wrong @pitrou.

@pitrou
Copy link
Member

pitrou commented Sep 11, 2023

@elfstrom It's entirely fine :-)

However, you will also need to merge from main and fix the conflicts now.

Co-authored-by: Thomas Moreau <thomas.moreau.2010@gmail.com>
@cjw296 cjw296 added 3.11 only security fixes 3.12 only security fixes 3.13 bugs and security fixes labels Sep 15, 2023
@cjw296 cjw296 removed 3.12 only security fixes 3.13 bugs and security fixes labels Sep 15, 2023
Change test strategy. We now force the main thread to block during the
wake-up call by mocking the wake-up object and artificially limiting
to a single wake-up before blocking.

This allows us to reduce some timeouts, number of tasks and lower the
total runtime of the test. It should also guarantee a blocking main
thread on all platforms, regardless of any pipe buffer sizes.

The drawback is that the test is now a bit opinionated on how we fix
this issue (i.e. just making the wake-up pipe non blocking would not
satisfy this test even though it is a valid fix for the issue).
@elfstrom
Copy link
Contributor Author

@cjw296 Is there something more that I need to do to get this ready for merge?

@cjw296 cjw296 merged commit 405b063 into python:main Sep 22, 2023
@miss-islington
Copy link
Contributor

Thanks @elfstrom for the PR, and @cjw296 for merging it 🌮🎉.. I'm working now to backport this PR to: 3.11, 3.12.
🐍🍒⛏🤖

@miss-islington
Copy link
Contributor

Sorry, @elfstrom and @cjw296, I could not cleanly backport this to 3.12 due to a conflict.
Please backport using cherry_picker on command line.
cherry_picker 405b06375a8a4cdb08ff53afade09a8b66ec23d5 3.12

@miss-islington
Copy link
Contributor

Sorry, @elfstrom and @cjw296, I could not cleanly backport this to 3.11 due to a conflict.
Please backport using cherry_picker on command line.
cherry_picker 405b06375a8a4cdb08ff53afade09a8b66ec23d5 3.11

@cjw296
Copy link
Contributor

cjw296 commented Sep 22, 2023

@elfstrom - are you able to do the backport PR's? (3.11 being the most pressing! :-) )

@cjw296
Copy link
Contributor

cjw296 commented Sep 22, 2023

@elfstrom - also, could you have a look at these main failure after this PR landed:
https://github.com/python/cpython/actions/runs/6274544958/job/17040156826

@elfstrom
Copy link
Contributor Author

@elfstrom - also, could you have a look at these main failure after this PR landed: https://github.com/python/cpython/actions/runs/6274544958/job/17040156826

It looks like flakiness to me:

    def test_future_times_out(self):
        """Test ``futures.as_completed`` timing out before
        completing it's final future."""
        already_completed = {CANCELLED_AND_NOTIFIED_FUTURE,
                             EXCEPTION_FUTURE,
                             SUCCESSFUL_FUTURE}

        for timeout in (0, 0.01):
            with self.subTest(timeout):

                future = self.executor.submit(time.sleep, 0.1)
                completed_futures = set()
                try:
                    for f in futures.as_completed(
                        already_completed | {future},
                        timeout
                    ):
                        completed_futures.add(f)
                except futures.TimeoutError:
                    pass

                # Check that ``future`` wasn't completed.
                self.assertEqual(completed_futures, already_completed)
======================================================================
FAIL: test_future_times_out (test.test_concurrent_futures.test_as_completed.ThreadPoolAsCompletedTest.test_future_times_out) [0.01]
Test ``futures.as_completed`` timing out before
----------------------------------------------------------------------
Traceback (most recent call last):
  File "D:\a\cpython\cpython\Lib\test\test_concurrent_futures\test_as_completed.py", line 60, in test_future_times_out
    self.assertEqual(completed_futures, already_completed)
AssertionError: Items in the first set but not the second:
<Future at 0x1b19820b850 state=finished returned NoneType>

It looks like the future complete when the test expects it not to. The future is just time.sleep(0.1) and the timeout is 0.01. Not sure why one would expect such a short sleep to be sufficient to always trigger the timeout before completion? The test passed on the second attempt.

@elfstrom
Copy link
Contributor Author

@elfstrom - are you able to do the backport PR's? (3.11 being the most pressing! :-) )

I will look into the backports tomorrow if I get some spare time.

@bedevere-app
Copy link

bedevere-app bot commented Sep 23, 2023

GH-109783 is a backport of this pull request to the 3.11 branch.

@bedevere-app bedevere-app bot removed the needs backport to 3.11 only security fixes label Sep 23, 2023
@bedevere-app
Copy link

bedevere-app bot commented Sep 23, 2023

GH-109784 is a backport of this pull request to the 3.12 branch.

@bedevere-app bedevere-app bot removed the needs backport to 3.12 only security fixes label Sep 23, 2023
elfstrom added a commit to elfstrom/cpython that referenced this pull request Sep 23, 2023
…adlock (pythonGH-108513)

This fixes issue pythonGH-105829, python#105829

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: Chris Withers <chris@withers.org>
Co-authored-by: Thomas Moreau <thomas.moreau.2010@gmail.com>.
(cherry picked from commit 405b063)

Co-authored-by: elfstrom <elfstrom@users.noreply.github.com>
cjw296 pushed a commit that referenced this pull request Sep 24, 2023
@cjw296
Copy link
Contributor

cjw296 commented Sep 24, 2023

@elfstrom - thanks for the backports!

It looks like the future complete when the test expects it not to. The future is just time.sleep(0.1) and the timeout is 0.01. Not sure why one would expect such a short sleep to be sufficient to always trigger the timeout before completion? The test passed on the second attempt.

Well, you've done an amazing job with this issue, if you're feeling super keen, you could always make these tests more reliable 😃

@vstinner
Copy link
Member

The added test is unstable: it fails randomly. See issue gh-109917.

It looks like flakiness to me:
FAIL: test_future_times_out (test.test_concurrent_futures.test_as_completed.ThreadPoolAsCompletedTest.test_future_times_out) [0.01]

I confirm that the test is unstable: issue gh-109565. Can someone have a look?

@cjw296
Copy link
Contributor

cjw296 commented Sep 27, 2023

@vstinner - that test wasn't added in this PR, though? The only test added here was test_gh105829_should_not_deadlock_if_wakeup_pipe_full

csm10495 pushed a commit to csm10495/cpython that referenced this pull request Sep 28, 2023
…ython#108513)

This fixes issue python#105829, python#105829

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: Chris Withers <chris@withers.org>
Co-authored-by: Thomas Moreau <thomas.moreau.2010@gmail.com>
Yhg1s pushed a commit that referenced this pull request Oct 2, 2023
Glyphack pushed a commit to Glyphack/cpython that referenced this pull request Sep 2, 2024
…ython#108513)

This fixes issue python#105829, python#105829

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: Chris Withers <chris@withers.org>
Co-authored-by: Thomas Moreau <thomas.moreau.2010@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants