An unresponsive service can be worse than a down one. It can tie up your entire system if not handled properly. All network requests should have a timeout.
Here’s how to add timeouts for popular Python packages. All have been tested. The default is no timeout, unless otherwise specified. Enjoy!
Also available for Ruby, Node, Go, PHP, and Rust
Standard library
PyPI
- aiohttp
- asyncpg
- boto3
- cassandra-driver
- elasticsearch
- httpx
- influxdb
- mongoengine
- mysqlclient
- opensearch-py
- psycopg
- psycopg2
- pymemcache
- pymongo
- redis
- requests
- scs
- SQLAlchemy
- trino
- typesense
- urllib3
FTP(host, timeout=1)Raises socket.timeout
HTTPConnection(host, port, timeout=1)Raises socket.timeout
IMAP4(host, timeout=1)Raises socket.timeout
Note: Requires Python 3.9+
NNTP(host, timeout=1)Raises socket.timeout
POP3(host, timeout=1)Raises socket.timeout
SMTP(host, timeout=1)Raises
socket.timeouton connect timeoutsmtplib.SMTPServerDisconnectedon read timeout
sock.settimeout(1)Raises socket.timeout
subprocess.run(cmd, timeout=1)Raises subprocess.TimeoutExpired
Telnet(host, timeout=1)Raises socket.timeout
urlopen(url, timeout=1)Raises
urllib.error.URLErroron connect timeoutsocket.timeouton read timeout
timeout = aiohttp.ClientTimeout(total=1)
async with aiohttp.ClientSession(timeout=timeout) as session:
# ...Raises asyncio.exceptions.TimeoutError
asyncpg.connect(timeout=1)Default: 60s
Raises asyncio.exceptions.TimeoutError
boto3.client('s3', config=Config(connect_timeout=1, read_timeout=1))Raises
botocore.exceptions.ConnectTimeoutErroron connect timeoutbotocore.exceptions.ReadTimeoutErroron read timeout
Cluster([host], connect_timeout=1)Raises cassandra.cluster.NoHostAvailable on connect timeout
Elasticsearch(request_timeout=1)Raises elastic_transport.ConnectionTimeout
httpx.get(url, timeout=1)
# or
httpx.Client(timeout=1)Raises
httpx.ConnectTimeouton connect timeouthttpx.ReadTimeouton read timeout
InfluxDBClient(timeout=1)Raises
requests.exceptions.ConnectTimeouton connect timeoutrequests.exceptions.ReadTimeouton read timeout
connect(connectTimeoutMS=1000, socketTimeoutMS=1000, serverSelectionTimeoutMS=1000)Raises pymongo.errors.ServerSelectionTimeoutError
MySQLdb.connect(connect_timeout=1)Raises MySQLdb._exceptions.OperationalError
OpenSearch(timeout=1)Raises opensearchpy.exceptions.ConnectionError
psycopg.connect(connect_timeout=1)Raises psycopg.OperationalError
psycopg2.connect(connect_timeout=1)Raises psycopg2.OperationalError
Client(host, connect_timeout=1, timeout=1)Raises socket.timeout
MongoClient(connectTimeoutMS=1000, socketTimeoutMS=1000, serverSelectionTimeoutMS=1000)Default: 20s connect timeout, 30s server selection timeout
Raises pymongo.errors.ServerSelectionTimeoutError
Redis(socket_connect_timeout=1, socket_timeout=1)Raises redis.exceptions.TimeoutError
requests.get(url, timeout=1)Raises
requests.exceptions.ConnectTimeouton connect timeoutrequests.exceptions.ReadTimeouton read timeout
sol = scs.solve(data, cone, time_limit_secs=1)Check for a sol['info']['status'] of solved (inaccurate - reached time_limit_secs) for a timeout
create_engine(url, connect_args={'connect_timeout': 1})Raises sqlalchemy.exc.OperationalError
trino.dbapi.connect(request_timeout=1)
# or
trino.dbapi.connect(request_timeout=(1, 1)) # (connect, read)Raises trino.exceptions.TrinoConnectionError
Client({'connection_timeout_seconds': 1})Raises
requests.exceptions.ConnectTimeouton connect timeoutrequests.exceptions.ReadTimeouton read timeout
http = urllib3.PoolManager(timeout=urllib3.Timeout(connect=1, read=1))
# or
http.request('GET', url, timeout=urllib3.Timeout(connect=1, read=1))Raises urllib3.exceptions.MaxRetryError
Let us know. Even better, create a pull request for it.
git clone https://github.com/ankane/python-timeouts.git
cd python-timeouts
pip install -r requirements.txtTo run all tests, use:
pytestTo run individual tests, use:
pytest tests/test_redis.py