diff --git a/tools/cluster_mgr.py b/tools/cluster_mgr.py index dbdd5117a..b104fb46a 100755 --- a/tools/cluster_mgr.py +++ b/tools/cluster_mgr.py @@ -12,21 +12,22 @@ To install: pip install -r requirements.txt class Node: - def __init__(self, port, admin_port): + def __init__(self, port): self.id = '' self.port = port - self.admin_port = admin_port + self.admin_port = port + 10_000 class Master: - def __init__(self, port, admin_port): - self.node = Node(port, admin_port) + def __init__(self, port): + self.node = Node(port) self.replicas = [] def start_node(node, threads): f = open(f'/tmp/dfly.cluster.node.{node.port}.log', 'w') - print(f'- Log file for node {node.port}: {f.name}') + print( + f'- Log file for node {node.port}: {f.name}') subprocess.Popen(['../build-dbg/dragonfly', f'--port={node.port}', f'--admin_port={node.admin_port}', '--cluster_mode=yes', f'--proactor_threads={threads}', '--dbfilename=', f'--logtostderr'], stderr=f) @@ -42,7 +43,7 @@ def send_command(node, command): client.close() return result except: - time.sleep(0.1) + time.sleep(0.1 * i) print( f'Unable to connect to localhost:{node.admin_port} after 5 attempts!') @@ -54,7 +55,7 @@ def update_id(node): print(f'- ID for {node.port}: {id}') -def build_config(masters): +def build_config_from_list(masters): total_slots = 16384 slots_per_node = math.floor(total_slots / len(masters)) @@ -81,50 +82,45 @@ def build_config(masters): config.append(c) config[-1]["slot_ranges"][-1]["end"] += total_slots % len(masters) - return json.dumps(config, indent=2) + return config -def push_config(nodes, config): - for node in nodes: - response = send_command(node, ['dflycluster', 'config', config]) - print(f'- Push into {node.port}: {response}') +def get_nodes_from_config(config): + nodes = [] + for shard in config: + nodes.append(Node(shard["master"]["port"])) + for replica in shard["replicas"]: + nodes.append(Node(replica["port"])) + return nodes -def main(): - parser = argparse.ArgumentParser(description='Local Cluster Manager') - parser.add_argument('--num_masters', type=int, default=3, - help='Number of master nodes in cluster') - parser.add_argument('--replicas_per_master', type=int, default=0, - help='How many replicas for each master') - parser.add_argument('--first_port', type=int, - default=7001, help="First master's port") - parser.add_argument('--first_admin_port', type=int, - default=17_001, help="First master's admin port") - parser.add_argument('--threads', type=int, default=2, - help="Threads per node") - args = parser.parse_args() +def push_config(config): + def push_to_node(node, config): + config_str = json.dumps(config, indent=2) + response = send_command(node, ['dflycluster', 'config', config_str]) + print(f'- Push to {node.port}: {response}') + for node in get_nodes_from_config(config): + push_to_node(node, config) + + +def create(args): print(f'Setting up a Dragonfly cluster:') print(f'- Master nodes: {args.num_masters}') print( f'- Ports: {args.first_port}...{args.first_port + args.num_masters - 1}') - print( - f'- Admin ports: {args.first_admin_port}...{args.first_admin_port + args.num_masters - 1}') print(f'- Replicas for each master: {args.replicas_per_master}') print() next_port = args.first_port - next_admin_port = args.first_admin_port masters = [] for i in range(args.num_masters): - master = Master(next_port, next_admin_port) + master = Master(next_port) next_port += 1 - next_admin_port += 1 for j in range(args.replicas_per_master): - replica = Node(next_port, next_admin_port) + replica = Node(next_port) master.replicas.append(replica) next_port += 1 - next_admin_port += 1 masters.append(master) nodes = [] @@ -153,11 +149,192 @@ def main(): update_id(n) print() - config = build_config(masters) - print(f'Pushing config...') - push_config(nodes, config) + config = build_config_from_list(masters) + print(f'Pushing config:\n{config}\n') + push_config(config) print() +def build_config_from_existing(args): + def list_to_dict(l): + return {l[i]: l[i + 1] for i in range(0, len(l), 2)} + + def build_node(node_list): + d = list_to_dict(node_list) + return { + "id": d["id"], + "ip": d["endpoint"], + "port": d["port"] + } + + def build_slots(slot_list): + slots = [] + for i in range(0, len(slot_list), 2): + slots.append( + { + "start": slot_list[i], + "end": slot_list[i+1] + } + ) + return slots + + client = redis.Redis(decode_responses=True, + host="localhost", port=args.first_port) + existing = client.execute_command("cluster", "shards") + config = [] + for shard_list in existing: + shard = list_to_dict(shard_list) + config.append({ + "slot_ranges": build_slots(shard["slots"]), + "master": build_node(shard["nodes"][0]), + "replicas": [build_node(replica) for replica in shard["nodes"][1::]] + }) + return config + + +def move(args): + config = build_config_from_existing(args) + + new_owner = None + for shard in config: + if shard["master"]["port"] == args.target_port: + new_owner = shard + break + else: + print( + f"Can't find master with port {args.target_port} (hint: use flag --target_port).") + exit(-1) + + def remove_slot(slot, from_range, from_shard): + if from_range["start"] == slot: + from_range["start"] += 1 + if from_range["start"] > from_range["end"]: + from_shard["slot_ranges"].remove(from_range) + elif from_range["end"] == slot: + from_range["end"] -= 1 + if from_range["start"] > from_range["end"]: + from_shard["slot_ranges"].remove(from_range) + else: + assert slot > from_range["start"] and slot < from_range[ + "end"], f'{slot} {from_range["start"]} {from_range["end"]}' + from_shard["slot_ranges"].append( + {"start": slot + 1, "end": from_range["end"]}) + from_range["end"] = slot - 1 + + def add_slot(slot, to_shard): + for slot_range in to_shard["slot_ranges"]: + if slot == slot_range["start"] - 1: + slot_range["start"] -= 1 + return + if slot == slot_range["end"] + 1: + slot_range["end"] += 1 + return + to_shard["slot_ranges"].append({"start": slot, "end": slot}) + + def find_slot(slot, config): + for shard in config: + if shard == new_owner: + continue + for slot_range in shard["slot_ranges"]: + if slot >= slot_range["start"] and slot <= slot_range["end"]: + return shard, slot_range + return None, None + + def pack(slot_ranges): + new_range = [] + while True: + changed = False + new_range = [] + slot_ranges.sort(key=lambda x: x["start"]) + for i, slot_range in enumerate(slot_ranges): + added = False + for j in range(i): + prev_slot_range = slot_ranges[j] + if prev_slot_range["end"] + 1 == slot_range["start"]: + prev_slot_range["end"] = slot_range["end"] + changed = True + added = True + break + if not added: + new_range.append(slot_range) + slot_ranges = new_range + if not changed: + break + return new_range + + for slot in range(args.slot_start, args.slot_end+1): + shard, slot_range = find_slot(slot, config) + if shard == None: + continue + if shard == new_owner: + continue + remove_slot(slot, slot_range, shard) + add_slot(slot, new_owner) + + for shard in config: + shard["slot_ranges"] = pack(shard["slot_ranges"]) + + print(f'Pushing new config:\n{json.dumps(config, indent=2)}\n') + push_config(config) + + +def print_config(args): + config = build_config_from_existing(args) + print(json.dumps(config, indent=2)) + + +def shutdown(args): + config = build_config_from_existing(args) + for node in get_nodes_from_config(config): + send_command(node, ["shutdown"]) + + +def main(): + parser = argparse.ArgumentParser(description=''' +Local Cluster Manager + +Example usage: + +Create a 3+3 nodes cluster: + ./cluster_mgr.py --action=create --num_masters=3 --replicas_per_master=1 + +Connect to cluster and print current config: + ./cluster_mgr.py --action=print + +Connect to cluster and shutdown all nodes: + ./cluster_mgr.py --action=shutdown + +Connect to cluster and move slots 10-20 to master with port 7002: + ./cluster_mgr.py --action=move --slot_start=10 --slot_end=20 --new_owner=7002 +''') + parser.add_argument('--action', default='', + help='Which action to take? create: start a new instance, move=move slots') + parser.add_argument('--num_masters', type=int, default=3, + help='Number of master nodes in cluster') + parser.add_argument('--replicas_per_master', type=int, default=0, + help='How many replicas for each master') + parser.add_argument('--first_port', type=int, + default=7001, help="First master's port") + parser.add_argument('--threads', type=int, default=2, + help="Threads per node") + parser.add_argument('--slot_start', type=int, + default=0, help="First slot to move (inclusive)") + parser.add_argument('--slot_end', type=int, + default=100, help="Last slot to move (inclusive)") + parser.add_argument('--target_port', type=int, default=0, + help="Master port to take ownership over slots in range " + "[--slot_start, --slot_end]") + args = parser.parse_args() + + actions = {'create': create, 'shutdown': shutdown, + 'move': move, 'print': print_config} + action = actions.get(args.action.lower()) + if (action): + action(args) + else: + print(f'Error - unknown action "{args.action}"') + exit(-1) + + if __name__ == "__main__": main()