Ethereum: Python ThreadPoolExecutor closes connection before task completes
When using the ThreadPoolExecutor class in Python, it is important to properly manage connections to avoid concurrency issues. In this article, we will look at a common problem that can occur when retrieving historical cryptocurrency data from the Binance API and saving it to a PostgreSQL database using ThreadPoolExecutor.
Reference: why the connection is closed
The ThreadPoolExecutor class creates separate threads for each task, which can lead to premature closing of connections if not managed correctly. This is because:
- Each task (for example, retrieving historical price data) can open a connection to the Binance API.
- A thread responsible for closing the connection after completing its task may terminate before completing all tasks.
Problem: Closing the connection before completing the task
In your script, you ask to retrieve historical cryptocurrency data and store it in a PostgreSQL database using ThreadPoolExecutor as follows:
import configuration
from client import binance.client
import psychopg2
protect fetch_data():

Binance API request
client = Client(config.BINANCE_API_KEY, config.BINANCE_API_SECRET)
response = client.get_order_book()
We save the data in the PostgreSQL database
conn = psycopg2.connect(
host = config.DB_HOST,
user = config.DB_USER,
password=config.DB_PASSWORD,
database = config.DB_NAME
)
cur = conn.cursor()
for position in response['bids']:
cur.execute("INSERT INTO historical_data (symbol, price) VALUES (%s, %s)", (item['id'], item['price']))
conn.commit()
conn.close()
protection thin():
topics = []
for _ in range (config.NUM_THREADS):
stream = stream ( target = sample_data )
threads.append(thread)
stream.start()
if __name__ == '__main__':
slender()
In this example, the fetch_data function creates a connection to the PostgreSQL database and stores the data in it. However, ThreadPoolExecutor creates multiple threads to retrieve historical price data from the Binance API. Since each thread closes the connection after completing its task, the next thread tries to complete the second task without waiting for the previous one to finish.
Solution: using concurrent.futures
To solve this problem, you can use the concurrent.futures module, which provides a high-level interface for executing callable objects asynchronously. Here is the updated code snippet:
import configuration
from client import binance.client
import psycopg2
from concurrent.futures import ThreadPoolExecutor
protect fetch_data():
Binance API request
client = Client(config.BINANCE_API_KEY, config.BINANCE_API_SECRET)
response = client.get_order_book()
We save the data in the PostgreSQL database
conn = psycopg2.connect(
host = config.DB_HOST,
user = config.DB_USER,
password=config.DB_PASSWORD,
database = config.DB_NAME
)
cur = conn.cursor()
for position in response['bids']:
to ask:
cur.execute("INSERT INTO historical_data (symbol, price) VALUES (%s, %s)", (item['id'], item['price']))
except psycopg2.Error as e:
print(f"An error occurred: {e}")
conn.commit()
conn.close()
protection thin():
with ThreadPoolExecutor(max_workers=config.NUM_THREADS) as executor:
executor.map(fetch_data, range(config.NUM Threads))
if __name__ == '__main__':
slender()
In this updated code snippet, we use ThreadPoolExecutor to manage threads and execute tasks asynchronously. The fetch_data function is called with an index range from 0 to config.NUM Threads – 1. This ensures that each thread processes its tasks simultaneously without waiting for other threads to finish.
Conclusion
Using `concurrent.