-
-
Notifications
You must be signed in to change notification settings - Fork 33.2k
gh-109917, gh-105829: Fix concurrent.futures _ThreadWakeup.wakeup() #110129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The previous fix also removed a lock: commit 405b063 |
Lib/concurrent/futures/process.py
Outdated
# wait_result_broken_or_wakeup() does not even read the written | ||
# byte. It only check if something was written to the pipe. | ||
return | ||
self._writer.send_bytes(b"x") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get how it was possible to wake up the pipe before, if no data is written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An empty bytes object is still a thing. It will cause 4 bytes to be sent on the pipe (A 4 byte zero encoding the length of the byte string followed by zero bytes of data). b"x"
will be sent as 5 bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok. I suppose that writing 4 or 5 bytes doesn't make any big difference, as soon as we only send 5 bytes and don't fill the pipe.
self.assertEqual(job_num, len(list(executor.map(int, job_data)))) | ||
finally: | ||
signal.alarm(0) | ||
signal.signal(signal.SIGALRM, old_handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test should be kept in some form regardless of the implementation strategy used to solve the deadlock as we want to test that we have actually removed this bug. See 995a14c for a version that could be used to test for the issue if we use an implementation that relies on making sure we never block on the pipe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the purpose of the test. The test is supposed to check wakeup() implementation, but instead, it overrides wakeup() with a different implementation which blocks if it's called more than once. My change makes sure that calling wakeup() twice or more is safe.
The test also overrides _ExecutorManagerThread.run()
whereas this method is responsible to check for the wakeup pipe and stop the loop when the wakeup pipe becomes non-empty. The test is supposed to check that concurrent.futures doesn't hang when it gets a lot of jobs, but instead it mocks parts of concurrent.futures which caused the bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote a new functional test instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of the test is to test that we don't deadlock when the pipe blocks. In this version the test is a bit opinionated about what type of implementation it accepts. This was noted in the commit message but should probably have been documented in a comment in the test as well to be extra clear.
The point of this test version is to force wakeup to block and see that we still don't deadlock. This is done by augmenting the normal wakeup operation with an additional .put
to a size 1 queue that will block on the second wakeup, which is more or less guaranteed since we also delay the startup of the _ExecutorManagerThread.
The initial test was less opinionated and only made sure to delay the _ExecutorManagerThread startup enough to allow the wakeup pipe to fill. The drawback with that version was runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, to be extra clear, as I noted in #109917 (comment), I believe the instability issue we are having in the test right now is due to the ordering of the dummy queue operation and the real wakeup pipe operations. Both primitives are thread safe but not done atomically as a single update and may interleave arbitrarily. With the current order of operations this can lead to an incorrect state where the dummy queue is full but the wakeup pipe is empty. By doing the swap suggested I think this can no longer happen in any possible operation interleaving. There is no indication that the actual implementation is incorrect, this looks like purely a test issue.
…p.wakeup() Replace test_gh105829_should_not_deadlock_if_wakeup_pipe_full() test which was mocking too many concurrent.futures internals with a new test_wakeup() functional test. Co-Authored-By: elfstrom <elfstrom@users.noreply.github.com>
@elfstrom: I wrote a new test and put you as a co-author, thanks for your help :-) Would you mind to review the updated PR? It's now ready for a review. You can validate that the test works as expected by reintroducing the bug on purpose: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 51fd953d65..2f429e50db 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -94,7 +94,6 @@ def wakeup(self):
# full. wait_result_broken_or_wakeup() ignores the message anyway,
# it just calls clear().
return
- self._awaken = True
self._writer.send_bytes(self._wakeup_msg)
def clear(self): |
Hum, wait, you need to introduced two bugs to reproduce the hang: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 51fd953d65..4d28817baf 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -94,7 +94,7 @@ def wakeup(self):
# full. wait_result_broken_or_wakeup() ignores the message anyway,
# it just calls clear().
return
- self._awaken = True
+ #self._awaken = True
self._writer.send_bytes(self._wakeup_msg)
def clear(self):
@@ -457,7 +457,8 @@ def wait_result_broken_or_wakeup(self):
# 2. we're also the only thread to call thread_wakeup.close()
# 3. we want to avoid a possible deadlock when both reader and writer
# would block (gh-105829)
- self.thread_wakeup.clear()
+ with self.shutdown_lock:
+ self.thread_wakeup.clear()
return result_item, is_broken, cause
|
|
||
class _ThreadWakeup: | ||
# Constant overriden by tests to make them faster | ||
_wakeup_msg = b'x' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see no reason to not define this as b""
. Giving it a value makes it seem like it matters in some way and it is also more bytes to send.
return | ||
self._closed = True | ||
self._writer.close() | ||
self._reader.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems to serve no purpose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's just that for me, it's more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's distracting in this PR though...
if self._awaken: | ||
# gh-105829: Send a single message to not block if the pipe is | ||
# full. wait_result_broken_or_wakeup() ignores the message anyway, | ||
# it just calls clear(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is misleading as this change has no impact on the gh-105829 issue in the current state. As long as the shutdown lock is not taken by both wakeup
and clear
it does not matter if we block on the pipe here. It might in some sense actually be desirable as it could perhaps give the manager thread a better chance of running and moving some objects to the worker queue.
At this state, this is just a potential optimization to avoid sending unnecessary wakeup messages. Also, as mentioned before, it is not entirely clear to me that this does not introduce some sort of bug.
self._writer.send_bytes(b"") | ||
if self._closed: | ||
return | ||
if self._awaken: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By adding reads and writes of self._awaken
we now have a race in the code. Since it is only assignment and reading it should be fine if one is happy with relying on the atomicity of these bytecode primitives as implemented in CPython currently. Opinions differ on that point I think, and I'm no expert, but it should probably be considered somewhat carefully.
self.assertEqual(job_num, len(list(executor.map(int, job_data)))) | ||
finally: | ||
signal.alarm(0) | ||
signal.signal(signal.SIGALRM, old_handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of the test is to test that we don't deadlock when the pipe blocks. In this version the test is a bit opinionated about what type of implementation it accepts. This was noted in the commit message but should probably have been documented in a comment in the test as well to be extra clear.
The point of this test version is to force wakeup to block and see that we still don't deadlock. This is done by augmenting the normal wakeup operation with an additional .put
to a size 1 queue that will block on the second wakeup, which is more or less guaranteed since we also delay the startup of the _ExecutorManagerThread.
The initial test was less opinionated and only made sure to delay the _ExecutorManagerThread startup enough to allow the wakeup pipe to fill. The drawback with that version was runtime.
nonlocal nwakeup | ||
nwakeup += 1 | ||
orig_wakeup() | ||
thread_wakeup.wakeup = wrap_wakeup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This substitution is now done after the executor has started. Is this safe, and if so, under what assumptions?
def get_pipe_size(connection): | ||
try: | ||
import fcntl | ||
return fcntl.fcntl(connection.fileno(), fcntl.F_GETPIPE_SZ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all platforms support F_GETPIPE_SZ so this will fail on macos, etc. Fixed by:
import fcntl
from fcntl import F_GETPIPE_SZ # Does not exist in all fcntl implementations
return fcntl.fcntl(connection.fileno(), F_GETPIPE_SZ)
thread_wakeup._wakeup_msg = b'x' * msg_len | ||
msg_size = 4 + len(thread_wakeup._wakeup_msg) | ||
|
||
njob = pipe_size // msg_size + 10 # Add some margin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems quite confusing to me. First we specify the number of jobs we want and compute a message size based on that. Then we loop back around and recompute the number of jobs based on the message size. This feels circular and redundant. Personally, if we are going to set the message size ourselves, I would stop messing with the pipe and just make sure the data we send is overkill.
# Use longer "wakeup message" to make the hang more likely
# and to speed up the test. The true size will be slightly
# larger than this due to overhead but that is irrelevant
# for our purposes.
msg_size = 2048
thread_wakeup._wakeup_msg = b'x' * msg_size
njob = 2 ** 20 // msg_size # A megabyte should fill up most pipes
job_data = range(njob)
With this, the test still takes just 0.2s and works the same on all platforms.
if support.verbose: | ||
print(f"run {njob:,} jobs") | ||
|
||
self.assertEqual(len(list(executor.map(int, job_data))), njob) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test wont fail gracefully without using sigalarm or some other mechanism to trigger a timeout in case of deadlock. Right now you have to wait a really long time (15 min?) for the test to fail.
print(f"run {njob:,} jobs") | ||
|
||
self.assertEqual(len(list(executor.map(int, job_data))), njob) | ||
self.assertGreaterEqual(nwakeup, njob) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another issue here is that the test relies on the startup time of the ExecutorManagerThread but we dont do anything here to make sure that this is sufficiently slow. I tested what happens if the thread had already started when we reach the test point and the test passed even with a broken implementation.
So either we could make sure the startup is slow or that the test fails regardless. I tried it with my suggested size/jobs setting above and that was sufficient to trigger failure even if the thread was started. As always, it is hard to judge how reliable that will be on other systems but I suspect it is overkill enough to fail in most cases.
In summary, I would suggest dropping all changes to the implementation except for the extraction of Also, someone else should definitely review this as well, maybe @pitrou, @cjw296 or @tomMoral could also take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elfstrom raises so many valid and concerning points about this PR that I really don't want it to land in its current form.
This whole area is fiendishly complicated and painful to test. If the unstable test is the problem, I'd like to see that fixed in isolation, be that moving to a functional test as you describe (but such that @elfstrom and others are happy) or just making the current test more stable.
I close my PR. If someone wants to propose a better fix for issue gh-109917, please go ahead :-) |
Remove test_gh105829_should_not_deadlock_if_wakeup_pipe_full() of test_concurrent_futures.test_deadlock. The test is no longer relevant.
concurrent.futures.ProcessPoolExecutor
pool deadlocks when submitting many tasks #105829