dragonfly/tools/defrag_mem_test.py

203 lines
7.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import asyncio
import aioredis
import async_timeout
import sys
import argparse
"""
To install: pip install -r requirements.txt
Run
dragonfly --mem_defrag_threshold=0.01 --mem_defrag_waste_threshold=0.01
defrag_mem_test.py -k 8000000 -v 645
This program would try to re-create the issue with memory defragmentation.
See issue number 448 for more details.
To run this:
You can just execute this from the command line without any arguemnts.
Or you can run with --help to see the options.
The defaults are:
number of keys: 800,000
value size: 64 bytes
key name pattern: key-for-testing
host: localhost
port: default redis port
Please note that this would create 4 * number of keys entries
You can see the memory usage/defrag state with the monitoring task that
prints the current state
NOTE:
If this seems to get stuck please kill it with ctrl+c
This can happen in case we don't have "defrag_realloc_total > 0"
"""
class TaskCancel:
def __init__(self):
self.run = True
def dont_stop(self):
return self.run
def stop(self):
self.run = False
async def run_cmd(connection, cmd, sub_val):
val = await connection.execute_command(cmd, sub_val)
return val
async def handle_defrag_stats(connection, prev):
info = await run_cmd(connection, "info", "stats")
if info is not None:
if info["defrag_task_invocation_total"] != prev:
print("--------------------------------------------------------------")
print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
print(f"defrag_attempt_total: {info['defrag_attempt_total']:,}")
print("--------------------------------------------------------------")
if info["defrag_realloc_total"] > 0:
return True, None
return False, info["defrag_task_invocation_total"]
return False, None
async def memory_stats(connection):
print("--------------------------------------------------------------")
info = await run_cmd(connection, "info", "memory")
# print(f"memory commited: {info['comitted_memory']:,}")
print(f"memory used: {info['used_memory']:,}")
# print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
print("--------------------------------------------------------------")
async def stats_check(connection, condition):
try:
defrag_task_invocation_total = 0
runs = 0
while condition.dont_stop():
await asyncio.sleep(0.3)
done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
if done:
print("defrag task successfully found memory locations to reallocate")
condition.stop()
else:
if d is not None:
defrag_task_invocation_total = d
runs += 1
if runs % 3 == 0:
await memory_stats(connection)
for i in range(5):
done, d = await handle_defrag_stats(connection, -1)
if done:
print("defrag task successfully found memory locations to reallocate")
return True
else:
await asyncio.sleep(2)
return True
except Exception as e:
print(f"failed to run monitor task: {e}")
return False
async def delete_keys(connection, keys):
results = await connection.delete(*keys)
return results
def generate_keys(pattern: str, count: int, batch_size: int) -> list:
for i in range(1, count, batch_size):
batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
yield batch
async def mem_cleanup(connection, pattern, num, cond, keys_count):
counter = 0
for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
if cond.dont_stop() == False:
print(f"task number {num} that deleted keys {pattern} finished")
return counter
counter += await delete_keys(connection, keys)
await asyncio.sleep(0.2)
print(f"task number {num} that deleted keys {pattern} finished")
return counter
async def run_tasks(pool, key_name, value_size, keys_count):
keys = [f"{key_name}-{i}" for i in range(4)]
stop_cond = TaskCancel()
try:
connection = aioredis.Redis(connection_pool=pool)
for key in keys:
print(f"creating key {key} with size {value_size} of count {keys_count}")
await connection.execute_command("DEBUG", "POPULATE", keys_count, key, value_size)
await asyncio.sleep(2)
tasks = []
count = 0
for key in keys:
pattern = f"{key}:"
print(f"deleting keys from {pattern}")
tasks.append(
mem_cleanup(
connection=connection,
pattern=pattern,
num=count,
cond=stop_cond,
keys_count=int(keys_count),
)
)
count += 1
monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
total = await asyncio.gather(*tasks, return_exceptions=True)
print(f"successfully deleted {sum(total)} keys")
stop_cond.stop()
await monitor_task
print("finish executing")
return True
except Exception as e:
print(f"got error {e} while running delete keys")
return False
def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
async_pool = aioredis.ConnectionPool(
host=host, port=port, db=0, decode_responses=True, max_connections=16
)
loop = asyncio.new_event_loop()
success = loop.run_until_complete(
run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count)
)
return success
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="active memory testing", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("-k", "--keys", type=int, default=800000, help="total number of keys")
parser.add_argument("-v", "--value_size", type=int, default=645, help="size of the values")
parser.add_argument(
"-n", "--key_name", type=str, default="key-for-testing", help="the base key name"
)
parser.add_argument("-s", "--server", type=str, default="localhost", help="server host name")
parser.add_argument("-p", "--port", type=int, default=6379, help="server port number")
args = parser.parse_args()
keys_num = args.keys
key_name = args.key_name
value_size = args.value_size
host = args.server
port = args.port
print(
f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}"
)
result = connect_and_run(
key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port
)
if result == True:
print("finished successfully")
else:
print("failed")