gh-96471: Add multiprocessing queue shutdown #138828
Open
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Add multiprocessing queue shutdown
This feature is already implemented in
asyncio(#104228) andthreading(#104750) modules.Reminders of new behaviors
When queue shuts down:
putmethod raises an exception. All the pending put calls are unblocked and raise an exception.getmethod returns available items until queue is empty, then raises Shutdown exception. All the pending get calls are unblocked and raise an exception.Updates to the main code.
There are 2 constraints here:
multiprocessing.Valueobject to store the 3 shutdown states because_ctypesmodule is not available on all platforms.get_valueSemaphore method because of not implemented on MacOSX, but we can just check if the semaphore is locked or not (internal counter is zero or not)The additional private variables are as follows:
Lockto prevent concurrent updates when shutting down,Semaphoreto store the 3 shutdown states,Semaphoreused to count pending getter and putter processes.It would also have been possible to use an array of integers from
mulitprocessing.heap.BufferWrapperbut I find simpler and more explicit to useSemaphore.The main additional codes are as follows: