dragonfly/tools/cache_logs_player.py

199 lines
6.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import argparse
from datetime import datetime
import aioredis
import asyncio
from aiocsv import AsyncReader
import aiofiles
'''
To install: pip install -r requirements.txt
'''
class Command:
args = None
sync_id = 0 # Commands with the same sync_id will be executed synchrnously
class TwitterCacheTraceParser:
"""
https://github.com/twitter/cache-trace
"""
def parse(self, csv) -> Command:
operation = csv[5]
key = csv[1] + "a"
value_size = int(csv[3])
synthetic_value = "".zfill(value_size)
client_id = csv[4]
ttl = csv[6]
cmd = Command()
cmd.sync_id = client_id
if operation == "get":
cmd.args = ["GET", key]
elif operation == 'gets':
cmd.args = ["GET", key]
elif operation == 'set':
cmd.args = ["SET", key, synthetic_value]
elif operation == 'add':
cmd.args = ["SET", key, synthetic_value]
elif operation == 'replace':
cmd.args = ["SET", key, synthetic_value]
elif operation == 'cas':
cmd.args = ["SET", key, synthetic_value]
elif operation == 'append':
cmd.args = ["APPEND", key, synthetic_value]
elif operation == 'prepend':
cmd.args = ["SET", key, synthetic_value]
elif operation == 'delete':
cmd.args = ["DEL", key]
elif operation == 'incr':
cmd.args = ["INCR", key]
elif operation == 'decr':
cmd.args = ["DECR", key]
return cmd
class AsyncWorker:
QUEUE_SIZE = 100000
def __init__(self, redis_client) -> None:
self.queue = asyncio.Queue(self.QUEUE_SIZE)
self.redis_client = redis_client
self.working = False
async def put(self, batch: list) -> None:
await self.queue.put(batch)
async def work(self) -> None:
self.working = True
while self.working or not self.queue.empty() :
batch = await self.queue.get()
await self.execute(batch)
async def execute(self, batch) -> None:
async with self.redis_client.pipeline(transaction=False) as pipe:
for cmd in batch:
pipe.execute_command(*cmd.args)
await pipe.execute()
def start(self) -> asyncio.Task:
return asyncio.create_task(self.work())
def stop(self) -> None:
self.working = False
class AsyncWorkerPool:
"""
Mangaes worker pool to send commands in parallel
Maintains synchronous order for commands with the same sync_id
"""
def __init__(self, redis_client, num_workers) -> None:
self.redis_client = redis_client
self.num_workers = num_workers
self.workers = []
self.tasks = []
self.sync_id_to_worker = {}
self.next_worker_index = -1
def allocate(self, sync_id) -> AsyncWorker:
if not sync_id in self.sync_id_to_worker:
self.next_worker_index = (self.next_worker_index + 1) % self.num_workers
if len(self.workers) <= self.next_worker_index:
assert len(self.workers) == self.next_worker_index
self.workers.append(AsyncWorker(self.redis_client))
self.tasks.append(self.workers[self.next_worker_index].start())
self.sync_id_to_worker[sync_id] = self.workers[self.next_worker_index]
return self.sync_id_to_worker[sync_id]
async def put(self, batch: list, sync_id: int) -> None:
worker = self.allocate(sync_id)
await worker.put(batch)
async def stop(self):
for worker in self.workers:
worker.stop()
await asyncio.gather(*self.tasks)
class AsyncPlayer:
READ_BATCH_SIZE = 10 * 1000 * 1000
def __init__(self, redis_uri, num_workers) -> None:
self.redis_uri = redis_uri
self.redis_client = aioredis.from_url(f"redis://{self.redis_uri}", encoding="utf-8", decode_responses=True)
self.worker_pool = AsyncWorkerPool(self.redis_client, 100)
self.batch_by_sync_id = {}
async def dispatch_batches(self):
for sync_id in self.batch_by_sync_id:
await self.worker_pool.put(self.batch_by_sync_id[sync_id], sync_id)
self.batch_by_sync_id.clear()
async def read_and_dispatch(self, csv_file, parser):
print(f"dispatching from {csv_file}")
line_count = 0
async with aiofiles.open(csv_file, mode="r", encoding="utf-8", newline="") as afp:
async for row in AsyncReader(afp):
cmd = parser.parse(row)
if not self.batch_by_sync_id.get(cmd.sync_id):
self.batch_by_sync_id[cmd.sync_id] = []
batch = self.batch_by_sync_id[cmd.sync_id]
batch.append(cmd)
line_count = line_count + 1
if (line_count >= self.READ_BATCH_SIZE):
await self.dispatch_batches()
line_count = 0
# handle the remaining lines
await self.dispatch_batches()
async def print_stats(self):
info = await self.redis_client.execute_command("info", "stats")
print(f"{datetime.now()}: {info}")
async def report_stats(self):
while True:
self.print_stats()
async def report_stats(self):
while True:
await self.print_stats()
await asyncio.sleep(10)
async def play(self, csv_file, parser) -> None:
print(f"pinging {self.redis_uri} successful?")
print(await self.redis_client.ping())
read_dispatch_task = asyncio.create_task(self.read_and_dispatch(csv_file, parser))
stats_task = asyncio.create_task(self.report_stats())
await read_dispatch_task
print(f"finished reading {csv_file}")
await self.worker_pool.stop()
stats_task.cancel()
print("all done")
await self.print_stats()
def main():
parser = argparse.ArgumentParser(description='Cache Logs Player')
parser.add_argument('-u', '--uri', type=str, default='localhost:6379', help='Redis server URI')
parser.add_argument('-f', '--csv_file', type=str, default='/home/ari/Downloads/cluster017.csv', help='Redis server URI')
parser.add_argument('--num_workers', type=int, default=100, help='Maximum number of workers sending commands in parllel')
args = parser.parse_args()
player = AsyncPlayer(redis_uri=args.uri, num_workers=args.num_workers)
asyncio.run(player.play(args.csv_file, TwitterCacheTraceParser()))
if __name__ == "__main__":
main()