Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
421 views
in Technique[技术] by (71.8m points)

postgresql - Python Postgres psycopg2 ThreadedConnectionPool exhausted

I have looked into several 'too many clients' related topic here but still can't solve my problem, so I have to ask this again, for me specific case.

Basically, I set up my local Postgres server and need to do tens of thousands of queries, so I used the Python psycopg2package. Here are my codes:

import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor

df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times

DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)

def do_one_query(inputS, inputT):
    conn = tcp.getconn()
    c = conn.cursor()

    q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"   

    c.execute(q)
    all_results = c.fetchall()
    for row in all_results:
        return row
    tcp.putconn(conn, close=True)

cnt=0
for idx, row in df.iterrows():

    cnt+=1
    with ThreadPoolExecutor(max_workers=1) as pool:
        ret = pool.submit(do_one_query,  row["S"], row["T"])
        print ret.result()
    print cnt

The code runs well with a small df. If I repeat df by 10000 times, I got error message saying connection pool exhausted . I though the connections I used have been closed by this line:

tcp.putconn(conn, close=True) But I guess actually they are not closed? How can I get around this issue?

Question&Answers:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I've struggled to find really detailed information on how the ThreadedConnectionPool works. https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html ain't bad, but it turns out that its claim that getconn blocks until a connection becomes available is incorrect. Checking the code, all ThreadedConnectionPool adds is a lock around the AbstractConnectionPool methods to prevent race conditions. If more than maxconn connections are attempted used at any point, the connection pool exhausted PoolError will be raised.

If you want something a bit simpler than the accepted answer, further wrapping the methods in a Semaphore providing the blocking until a connection becomes available should do the trick:

from psycopg2.pool import ThreadedConnectionPool
from threading import Semaphore

class ReallyThreadedConnectionPool(ThreadedConnectionPool):
    def __init__(self, minconn, maxconn, *args, **kwargs):
        self._semaphore = Semaphore(maxconn)
        super().__init__(minconn, maxconn, *args, **kwargs)

    def getconn(self, *args, **kwargs):
        self._semaphore.acquire()
        return super().getconn(*args, **kwargs)

    def putconn(self, *args, **kwargs):
        super().putconn(*args, **kwargs)
        self._semaphore.release()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...