dragonfly/tests/integration/async.py
Vladislav 072cb2e8d9
refactor(tests): Refactor pytest (#449)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
2022-10-31 16:39:20 +02:00

59 lines
1.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""
This is the script that helped to reproduce https://github.com/dragonflydb/dragonfly/issues/150
The outcome - stalled code with all its connections deadlocked.
Reproduced only with dragonfly in release mode on multi-core machine.
"""
import asyncio
import aioredis
from loguru import logger as log
import sys
import random
connection_pool = aioredis.ConnectionPool(host="localhost", port=6379,
db=1, decode_responses=True, max_connections=16)
key_index = 1
async def post_to_redis(sem, db_name, index):
global key_index
async with sem:
results = None
try:
redis_client = aioredis.Redis(connection_pool=connection_pool)
async with redis_client.pipeline(transaction=True) as pipe:
for i in range(1, 15):
pipe.hsetnx(name=f"key_{key_index}", key="name", value="bla")
key_index += 1
#log.info(f"after first half {key_index}")
for i in range(1, 15):
pipe.hsetnx(name=f"bla_{key_index}", key="name2", value="bla")
key_index += 1
assert len(pipe.command_stack) > 0
log.info(f"before pipe.execute {key_index}")
results = await pipe.execute()
log.info(f"after pipe.execute {key_index}")
finally:
# log.info(f"before close {index}")
await redis_client.close()
#log.info(f"after close {index} {len(results)}")
async def do_concurrent(db_name):
tasks = []
sem = asyncio.Semaphore(10)
for i in range(1, 3000):
tasks.append(post_to_redis(sem, db_name, i))
res = await asyncio.gather(*tasks)
if __name__ == '__main__':
log.remove()
log.add(sys.stdout, enqueue=True, level='INFO')
loop = asyncio.get_event_loop()
loop.run_until_complete(do_concurrent("my_db"))