mymarilyn/aioch

Error on socket shutdown: [Errno 107] Transport endpoint is not connected

pochenok opened this issue · 2 comments

GCP - 24 CPU, 32 RAM.
Semaphore - just 4
Query is fast enough

Script starts running well but after some time I start getting this error:
Error on socket shutdown: [Errno 107] Transport endpoint is not connected

Before I was getting another error - OSError: [Errno 24] Too many open files
I fixed by raising limits - ulimit -n 100000

async def get_result(comb):
    sem = asyncio.Semaphore(4)
    async with sem:
        client = Client('localhost', database='sna_gandalf')
        num = await client.execute(
            'select count(distinct id_follower)*20 from followers_women sample 0.05 where arrayExists(x -> x = id_blogger, ' + str(comb) + ') = 1  select sum(followers) from bloggers_tmp_price where arrayExists(x -> x = id, ' + str(comb) + ') = 1 select sum(money) from bloggers_tmp_price where arrayExists(x -> x = id, ' + str(comb) + ') = 1')
        combinations_dict[str(comb)] = str(num[0][0]) + ', ' + str(num[1][0]) + ', ' + str(num[2][0])


combinations_dict = {}
for i in range(1, 8):
    print('range ', i, ' done')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(
        [get_result(list(comb)) for comb in itertools.combinations(list(followers_df.id_blogger.unique()), i)]))

Google does not help me =(

Hi @pochenok
I think, that your not right using semaphores - you declare semaphore in the get_result method and each coroutine has own semaphore instance, as result you try to create a lot of connection to the clickhouse server. For doing it right you must to share one semaphore instance between all you coroutines.

async def get_result(semaphore, combination):
    async with semaphore:
        ...

semaphore = asyncio.Semaphore(4)
combinations_dict = {}

for i in range(1, 8):
    print('range ', i, ' done')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(
        [get_result(semaphore, list(comb)) for comb in itertools.combinations(list(followers_df.id_blogger.unique()), i)]))

Hi. @pochenok is this issue still actual?