-
-
Notifications
You must be signed in to change notification settings - Fork 33.2k
gh-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshot #134174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This PR significantly improves performance when transferring future state from `concurrent.futures.Future` to `asyncio.Future`, a common operation when dispatching executor jobs in asyncio applications.
The current `_copy_future_state` implementation requires multiple method calls and lock acquisitions to retrieve the source future's state:
1. `done()` - acquires lock to check state
2. `cancelled()` - acquires lock again
3. `exception()` - acquires lock to get exception
4. `result()` - acquires lock to get result
Each method call involves thread synchronization overhead, making this operation a bottleneck for high-frequency executor dispatches.
Our use case involves dispatching a large number of small executor jobs from `asyncio` to a thread pool. These jobs typically involve `open` or `stat` on files that are already cached by the OS, so the actual I/O returns almost instantly. However, we still have to offload them to avoid blocking the event loop, since there's no reliable way to determine in advance whether a read will hit the cache.
As a result, the majority of the overhead isn't from the I/O itself, but from the cost of scheduling. Most of the time is spent copying future state, which involves locking. This PR reduces that overhead, which has a meaningful impact at scale.
Add a new `_get_snapshot()` method to `concurrent.futures.Future` that atomically retrieves all state information in a single lock acquisition:
- Returns tuple: `(done, cancelled, result, exception)`
- Uses optimized fast path for already-finished futures (no lock needed)
- Provides atomic state capture for other states
The `_copy_future_state` function in `asyncio` now uses this snapshot method when available, falling back to the traditional approach for backwards compatibility.
Benchmark results show dramatic improvements for the common case:
- **concurrent.futures→asyncio transfer: 4.12x faster**
- asyncio→asyncio transfer: Slightly slower (1.05x) due to hasattr check (I couldn't find any places where this actually happens though as it looks like `_chain_future` the only entry point to `_copy_future_state` and it is always called with `concurrent.futures.Future`)
This optimization particularly benefits applications that:
- Dispatch many small executor jobs (e.g., filesystem operations, DNS lookups)
- Use thread pools for I/O-bound operations in asyncio
- Have high frequency of executor task completion
- Adds `_get_snapshot()` to `concurrent.futures.Future` for atomic state retrieval
- Updates `_copy_future_state()` to prefer snapshot method when available
- Maintains full backwards compatibility with existing code
- Minimal code changes with focused optimization
These show consistent 4x+ speedup for the critical concurrent.futures→asyncio path.
```
=== 1. Benchmarking concurrent.futures -> asyncio ===
Running original...
concurrent_to_asyncio: Mean +- std dev: 986 ns +- 16 ns
Running optimized...
concurrent_to_asyncio: Mean +- std dev: 239 ns +- 4 ns
Comparison:
Mean +- std dev: [concurrent_original] 986 ns +- 16 ns -> [concurrent_optimized] 239 ns +- 4 ns: 4.12x faster
=== 2. Benchmarking asyncio -> asyncio ===
Running original...
asyncio_to_asyncio: Mean +- std dev: 221 ns +- 4 ns
Running optimized...
asyncio_to_asyncio: Mean +- std dev: 232 ns +- 4 ns
Comparison:
Mean +- std dev: [asyncio_original] 221 ns +- 4 ns -> [asyncio_optimized] 232 ns +- 4 ns: 1.05x slower
Cleaning up...
```
```python
import pyperf
import concurrent.futures
import asyncio
import subprocess
import os
import sys
def write_benchmark_scripts():
"""Write individual benchmark scripts for each scenario."""
# Common helper code
common_imports = '''
import pyperf
import concurrent.futures
import asyncio
def _convert_future_exc(exc):
exc_class = type(exc)
if exc_class is concurrent.futures.CancelledError:
return asyncio.CancelledError(*exc.args)
elif exc_class is concurrent.futures.TimeoutError:
return asyncio.TimeoutError(*exc.args)
elif exc_class is concurrent.futures.InvalidStateError:
return asyncio.InvalidStateError(*exc.args)
else:
return exc
'''
# Optimization patch code
optimization_patch = '''
FINISHED = concurrent.futures._base.FINISHED
CANCELLED = concurrent.futures._base.CANCELLED
CANCELLED_AND_NOTIFIED = concurrent.futures._base.CANCELLED_AND_NOTIFIED
def _get_snapshot_implementation(self):
"""Get a snapshot of the future's current state."""
# Fast path: check if already finished without lock
if self._state == FINISHED:
return True, False, self._result, self._exception
# Need lock for other states since they can change
with self._condition:
if self._state == FINISHED:
return True, False, self._result, self._exception
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
return True, True, None, None
return False, False, None, None
concurrent.futures.Future._get_snapshot = _get_snapshot_implementation
'''
# Original copy implementation
original_copy = '''
def copy_future_original(source, dest):
"""Original implementation using individual method calls."""
if dest.cancelled():
return
if hasattr(source, 'done'):
assert source.done()
if source.cancelled():
dest.cancel()
else:
exception = source.exception()
if exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)
'''
# Optimized copy implementation
optimized_copy = '''
def copy_future_optimized(source, dest):
"""Optimized implementation using _get_snapshot when available."""
if dest.cancelled():
return
# Use _get_snapshot for futures that support it
if hasattr(source, '_get_snapshot'):
done, cancelled, result, exception = source._get_snapshot()
assert done
if cancelled:
dest.cancel()
elif exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
dest.set_result(result)
return
# Traditional fallback for asyncio.Future
if hasattr(source, 'done'):
assert source.done()
if source.cancelled():
dest.cancel()
else:
exception = source.exception()
if exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)
'''
# 1. concurrent.futures -> asyncio (original)
with open('bench_concurrent_to_asyncio_original.py', 'w') as f:
f.write(common_imports + original_copy + '''
source = concurrent.futures.Future()
source.set_result(42)
loop = asyncio.new_event_loop()
def task():
"""Single copy operation benchmark."""
dest = asyncio.Future(loop=loop)
copy_future_original(source, dest)
dest.cancel()
runner = pyperf.Runner()
runner.bench_func('concurrent_to_asyncio', task)
''')
# 2. concurrent.futures -> asyncio (optimized)
with open('bench_concurrent_to_asyncio_optimized.py', 'w') as f:
f.write(common_imports + optimization_patch + optimized_copy + '''
source = concurrent.futures.Future()
source.set_result(42)
loop = asyncio.new_event_loop()
def task():
"""Single copy operation benchmark."""
dest = asyncio.Future(loop=loop)
copy_future_optimized(source, dest)
dest.cancel()
runner = pyperf.Runner()
runner.bench_func('concurrent_to_asyncio', task)
''')
# 3. asyncio -> asyncio (original)
with open('bench_asyncio_to_asyncio_original.py', 'w') as f:
f.write(common_imports + original_copy + '''
loop = asyncio.new_event_loop()
source = asyncio.Future(loop=loop)
source.set_result(42)
def task():
"""Single copy operation benchmark."""
dest = asyncio.Future(loop=loop)
copy_future_original(source, dest)
dest.cancel()
runner = pyperf.Runner()
runner.bench_func('asyncio_to_asyncio', task)
''')
# 4. asyncio -> asyncio (optimized - should use fallback)
with open('bench_asyncio_to_asyncio_optimized.py', 'w') as f:
f.write(common_imports + optimization_patch + optimized_copy + '''
loop = asyncio.new_event_loop()
source = asyncio.Future(loop=loop)
source.set_result(42)
def task():
"""Single copy operation benchmark."""
dest = asyncio.Future(loop=loop)
copy_future_optimized(source, dest)
dest.cancel()
runner = pyperf.Runner()
runner.bench_func('asyncio_to_asyncio', task)
''')
def run_benchmarks():
"""Run all benchmarks and compare results."""
print("Writing benchmark scripts...")
write_benchmark_scripts()
# Clean up old results
for f in ['concurrent_original.json', 'concurrent_optimized.json',
'asyncio_original.json', 'asyncio_optimized.json']:
if os.path.exists(f):
os.remove(f)
print("\n=== 1. Benchmarking concurrent.futures -> asyncio ===")
print("Running original...")
subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_original.py',
'-o', 'concurrent_original.json', '--quiet'])
print("Running optimized...")
subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_optimized.py',
'-o', 'concurrent_optimized.json', '--quiet'])
print("\nComparison:")
subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',
'concurrent_original.json', 'concurrent_optimized.json'])
print("\n=== 2. Benchmarking asyncio -> asyncio ===")
print("Running original...")
subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_original.py',
'-o', 'asyncio_original.json', '--quiet'])
print("Running optimized...")
subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_optimized.py',
'-o', 'asyncio_optimized.json', '--quiet'])
print("\nComparison:")
subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',
'asyncio_original.json', 'asyncio_optimized.json'])
# Clean up
print("\nCleaning up...")
for f in ['bench_concurrent_to_asyncio_original.py',
'bench_concurrent_to_asyncio_optimized.py',
'bench_asyncio_to_asyncio_original.py',
'bench_asyncio_to_asyncio_optimized.py']:
if os.path.exists(f):
os.remove(f)
print("\n=== Summary ===")
print("concurrent.futures -> asyncio: Should show significant speedup")
print("asyncio -> asyncio: Should show no regression (fallback path)")
if __name__ == "__main__":
run_benchmarks()
```
Misc/NEWS.d/next/Library/2025-05-18-07-25-15.gh-issue-134173.53oOoF.rst
Outdated
Show resolved
Hide resolved
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
kumaraditya303
approved these changes
May 18, 2025
|
Thanks! |
bdraco
added a commit
to home-assistant/docker-base
that referenced
this pull request
May 18, 2025
…s.Future and asyncio.Future This is a backport of python/cpython#134174
Pranjal095
pushed a commit
to Pranjal095/cpython
that referenced
this pull request
Jul 12, 2025
…Future` and `asyncio.Future` (python#134174) Co-authored-by: Kumar Aditya <kumaraditya@python.org>
taegyunkim
pushed a commit
to taegyunkim/cpython
that referenced
this pull request
Aug 4, 2025
…Future` and `asyncio.Future` (python#134174) Co-authored-by: Kumar Aditya <kumaraditya@python.org>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
👋 from PyCon @ Pittsburgh
This PR significantly improves performance when transferring future state from
concurrent.futures.Futuretoasyncio.Future, a common operation when dispatching executor jobs in asyncio applications.The current
_copy_future_stateimplementation requires multiple method calls and lock acquisitions to retrieve the source future's state:done()- acquires lock to check statecancelled()- acquires lock againexception()- acquires lock to get exceptionresult()- acquires lock to get resultEach method call involves thread synchronization overhead, making this operation a bottleneck for high-frequency executor dispatches.
Our use case involves dispatching a large number of small executor jobs from
asyncioto a thread pool. These jobs typically involveopenorstaton files that are already cached by the OS, so the actual I/O returns almost instantly. However, we still have to offload them to avoid blocking the event loop, since there's no reliable way to determine in advance whether a read will hit the cache.As a result, the majority of the overhead isn't from the I/O itself, but from the cost of scheduling. Most of the time is spent copying future state, which involves locking. This PR reduces that overhead, which has a meaningful impact at scale.
Add a new
_get_snapshot()method toconcurrent.futures.Futurethat atomically retrieves all state information in a single lock acquisition:(done, cancelled, result, exception)The
_copy_future_statefunction inasyncionow uses this snapshot method to retrieve the state from theconcurrent.future.FutureBenchmark results show dramatic improvements for the common case:
This optimization particularly benefits applications that:
Dispatch many small executor jobs (e.g., filesystem operations, DNS lookups)
Use thread pools for I/O-bound operations in asyncio
Have high frequency of executor task completion
Adds
_get_snapshot()toconcurrent.futures.Futurefor atomic state retrievalUpdates
_copy_future_state()to use the snapshot methodMaintains full backwards compatibility with existing code
Minimal code changes with focused optimization
These show consistent 4.4x+ speedup for the critical concurrent.futures→asyncio path.