diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index e8c264c74..809f28100 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -401,7 +401,7 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { " to meet value size.", " If RAND is specified then value will be set to random hex string in specified size.", " If SLOTS is specified then create keys only in given slots range.", - " TYPE specifies data type (must be STRING/LIST/SET/HSET/ZSET/JSON), default STRING.", + " TYPE specifies data type (must be STRING/LIST/SET/HASH/ZSET/JSON), default STRING.", " ELEMENTS specifies how many sub elements if relevant (like entries in a list / set).", "OBJHIST", " Prints histogram of object sizes.", diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 0b3aa70bb..a29b55667 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -157,25 +157,32 @@ class RdbRestoreValue : protected RdbLoaderBase { const RestoreArgs& args, EngineShard* shard); private: - std::optional Parse(std::string_view payload); + std::optional Parse(io::Source* source); + int rdb_type_ = -1; }; -std::optional RdbRestoreValue::Parse(std::string_view payload) { - InMemSource source(payload); - src_ = &source; - if (io::Result type_id = FetchType(); type_id && rdbIsObjectTypeDF(type_id.value())) { - OpaqueObj obj; - error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream - if (ec) { - LOG(ERROR) << "failed to load data for type id " << (unsigned int)type_id.value(); - return std::nullopt; +std::optional RdbRestoreValue::Parse(io::Source* source) { + src_ = source; + if (pending_read_.remaining == 0) { + io::Result type_id = FetchType(); + if (type_id && rdbIsObjectTypeDF(type_id.value())) { + rdb_type_ = *type_id; } + } - return std::optional(std::move(obj)); - } else { + if (rdb_type_ == -1) { LOG(ERROR) << "failed to load type id from the input stream or type id is invalid"; return std::nullopt; } + + OpaqueObj obj; + error_code ec = ReadObj(rdb_type_, &obj); // load the type from the input stream + if (ec) { + LOG(ERROR) << "failed to load data for type id " << rdb_type_; + return std::nullopt; + } + + return std::optional(std::move(obj)); } std::optional RdbRestoreValue::Add(std::string_view data, @@ -183,17 +190,31 @@ std::optional RdbRestoreValue::Add(std::string_view data, const DbContext& cntx, const RestoreArgs& args, EngineShard* shard) { - auto opaque_res = Parse(data); - if (!opaque_res) { - return std::nullopt; - } - + InMemSource data_src(data); PrimeValue pv; - if (auto ec = FromOpaque(*opaque_res, &pv); ec) { - // we failed - report and exit - LOG(WARNING) << "error while trying to save data: " << ec; - return std::nullopt; - } + bool first_parse = true; + do { + auto opaque_res = Parse(&data_src); + if (!opaque_res) { + return std::nullopt; + } + + LoadConfig config; + if (first_parse) { + first_parse = false; + } else { + config.append = true; + } + if (pending_read_.remaining > 0) { + config.streamed = true; + } + + if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) { + // we failed - report and exit + LOG(WARNING) << "error while trying to read data: " << ec; + return std::nullopt; + } + } while (pending_read_.remaining > 0); if (auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime()); res) { res->it->first.SetSticky(args.Sticky()); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index a6873a14c..fe42407eb 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2676,10 +2676,6 @@ void RdbLoader::FlushAllShards() { FlushShardAsync(i); } -std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) { - return RdbLoaderBase::FromOpaque(opaque, LoadConfig{}, pv); -} - std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig config, CompactObj* pv) { OpaqueObjLoader visitor(opaque.rdb_type, pv, config); diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 830ee59cc..78733912b 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -139,7 +139,6 @@ class RdbLoaderBase { template io::Result FetchInt(); - static std::error_code FromOpaque(const OpaqueObj& opaque, CompactObj* pv); static std::error_code FromOpaque(const OpaqueObj& opaque, LoadConfig config, CompactObj* pv); io::Result LoadLen(bool* is_encoded); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 66f70856e..83acd11f7 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -13,7 +13,7 @@ from .replication_test import check_all_replicas_finished from redis.cluster import RedisCluster from redis.cluster import ClusterNode from .proxy import Proxy -from .seeder import SeederBase +from .seeder import StaticSeeder from . import dfly_args @@ -1773,6 +1773,45 @@ async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory): assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") +@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) +@pytest.mark.asyncio +async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory): + instances = [ + df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + ] + df_factory.start_all(instances) + + nodes = [await create_node_info(instance) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Generating huge containers") + seeder = StaticSeeder( + key_target=10, + data_size=10_000_000, + collection_size=10_000, + variance=1, + samples=1, + types=["LIST", "HASH", "SET", "ZSET", "STRING"], + ) + await seeder.run(nodes[0].client) + source_data = await StaticSeeder.capture(nodes[0].client) + + nodes[0].migrations = [ + MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id) + ] + logging.debug("Migrating slots") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Waiting for migration to finish") + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") + + target_data = await StaticSeeder.capture(nodes[1].client) + assert source_data == target_data + + def parse_lag(replication_info: str): lags = re.findall("lag=([0-9]+)\r\n", replication_info) assert len(lags) == 1 diff --git a/tests/dragonfly/generic_test.py b/tests/dragonfly/generic_test.py index 020d364a9..c4ceef3d6 100644 --- a/tests/dragonfly/generic_test.py +++ b/tests/dragonfly/generic_test.py @@ -1,4 +1,5 @@ import os +import logging import pytest import redis import asyncio @@ -7,6 +8,7 @@ from redis import asyncio as aioredis from . import dfly_multi_test_args, dfly_args from .instance import DflyStartException from .utility import batch_fill_data, gen_test_data, EnvironCntx +from .seeder import StaticSeeder @dfly_multi_test_args({"keys_output_limit": 512}, {"keys_output_limit": 1024}) @@ -168,3 +170,38 @@ async def test_denyoom_commands(df_factory): # mget should not be rejected await client.execute_command("mget x") + + +@pytest.mark.parametrize("type", ["LIST", "HASH", "SET", "ZSET", "STRING"]) +@dfly_args({"proactor_threads": 4}) +@pytest.mark.asyncio +async def test_rename_huge_values(df_factory, type): + df_server = df_factory.create() + df_server.start() + client = df_server.client() + + logging.debug(f"Generating huge {type}") + seeder = StaticSeeder( + key_target=1, + data_size=10_000_000, + collection_size=10_000, + variance=1, + samples=1, + types=[type], + ) + await seeder.run(client) + source_data = await StaticSeeder.capture(client) + logging.debug(f"src {source_data}") + + # Rename multiple times to make sure the key moves between shards + orig_name = (await client.execute_command("keys *"))[0] + old_name = orig_name + new_name = "" + for i in range(10): + new_name = f"new:{i}" + await client.execute_command(f"rename {old_name} {new_name}") + old_name = new_name + await client.execute_command(f"rename {new_name} {orig_name}") + target_data = await StaticSeeder.capture(client) + + assert source_data == target_data