fix: Huge entries fail to load outside RDB / replication (#4154)

* fix: Huge entries fail to load outside RDB / replication

We have an internal utility tool that we use to deserialize values in
some use cases:

* `RESTORE`
* Cluster slot migration
* `RENAME`, if the source and target shards are different

We [recently](https://github.com/dragonflydb/dragonfly/issues/3760)
changed this area of the code, which caused this regression as it only
handled RDB / replication streams.

Fixes #4143
This commit is contained in:
Shahar Mike 2024-11-20 16:00:07 +02:00 committed by GitHub
parent 36135f516f
commit 24a1ec6ab2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 121 additions and 29 deletions

View File

@ -401,7 +401,7 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
" to meet value size.", " to meet value size.",
" If RAND is specified then value will be set to random hex string in specified size.", " If RAND is specified then value will be set to random hex string in specified size.",
" If SLOTS is specified then create keys only in given slots range.", " If SLOTS is specified then create keys only in given slots range.",
" TYPE specifies data type (must be STRING/LIST/SET/HSET/ZSET/JSON), default STRING.", " TYPE specifies data type (must be STRING/LIST/SET/HASH/ZSET/JSON), default STRING.",
" ELEMENTS specifies how many sub elements if relevant (like entries in a list / set).", " ELEMENTS specifies how many sub elements if relevant (like entries in a list / set).",
"OBJHIST", "OBJHIST",
" Prints histogram of object sizes.", " Prints histogram of object sizes.",

View File

@ -157,25 +157,32 @@ class RdbRestoreValue : protected RdbLoaderBase {
const RestoreArgs& args, EngineShard* shard); const RestoreArgs& args, EngineShard* shard);
private: private:
std::optional<OpaqueObj> Parse(std::string_view payload); std::optional<OpaqueObj> Parse(io::Source* source);
int rdb_type_ = -1;
}; };
std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view payload) { std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(io::Source* source) {
InMemSource source(payload); src_ = source;
src_ = &source; if (pending_read_.remaining == 0) {
if (io::Result<uint8_t> type_id = FetchType(); type_id && rdbIsObjectTypeDF(type_id.value())) { io::Result<uint8_t> type_id = FetchType();
OpaqueObj obj; if (type_id && rdbIsObjectTypeDF(type_id.value())) {
error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream rdb_type_ = *type_id;
if (ec) {
LOG(ERROR) << "failed to load data for type id " << (unsigned int)type_id.value();
return std::nullopt;
} }
}
return std::optional<OpaqueObj>(std::move(obj)); if (rdb_type_ == -1) {
} else {
LOG(ERROR) << "failed to load type id from the input stream or type id is invalid"; LOG(ERROR) << "failed to load type id from the input stream or type id is invalid";
return std::nullopt; return std::nullopt;
} }
OpaqueObj obj;
error_code ec = ReadObj(rdb_type_, &obj); // load the type from the input stream
if (ec) {
LOG(ERROR) << "failed to load data for type id " << rdb_type_;
return std::nullopt;
}
return std::optional<OpaqueObj>(std::move(obj));
} }
std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data, std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
@ -183,17 +190,31 @@ std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
const DbContext& cntx, const DbContext& cntx,
const RestoreArgs& args, const RestoreArgs& args,
EngineShard* shard) { EngineShard* shard) {
auto opaque_res = Parse(data); InMemSource data_src(data);
if (!opaque_res) {
return std::nullopt;
}
PrimeValue pv; PrimeValue pv;
if (auto ec = FromOpaque(*opaque_res, &pv); ec) { bool first_parse = true;
// we failed - report and exit do {
LOG(WARNING) << "error while trying to save data: " << ec; auto opaque_res = Parse(&data_src);
return std::nullopt; if (!opaque_res) {
} return std::nullopt;
}
LoadConfig config;
if (first_parse) {
first_parse = false;
} else {
config.append = true;
}
if (pending_read_.remaining > 0) {
config.streamed = true;
}
if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) {
// we failed - report and exit
LOG(WARNING) << "error while trying to read data: " << ec;
return std::nullopt;
}
} while (pending_read_.remaining > 0);
if (auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime()); res) { if (auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime()); res) {
res->it->first.SetSticky(args.Sticky()); res->it->first.SetSticky(args.Sticky());

View File

@ -2676,10 +2676,6 @@ void RdbLoader::FlushAllShards() {
FlushShardAsync(i); FlushShardAsync(i);
} }
std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) {
return RdbLoaderBase::FromOpaque(opaque, LoadConfig{}, pv);
}
std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig config, std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig config,
CompactObj* pv) { CompactObj* pv) {
OpaqueObjLoader visitor(opaque.rdb_type, pv, config); OpaqueObjLoader visitor(opaque.rdb_type, pv, config);

View File

@ -139,7 +139,6 @@ class RdbLoaderBase {
template <typename T> io::Result<T> FetchInt(); template <typename T> io::Result<T> FetchInt();
static std::error_code FromOpaque(const OpaqueObj& opaque, CompactObj* pv);
static std::error_code FromOpaque(const OpaqueObj& opaque, LoadConfig config, CompactObj* pv); static std::error_code FromOpaque(const OpaqueObj& opaque, LoadConfig config, CompactObj* pv);
io::Result<uint64_t> LoadLen(bool* is_encoded); io::Result<uint64_t> LoadLen(bool* is_encoded);

View File

@ -13,7 +13,7 @@ from .replication_test import check_all_replicas_finished
from redis.cluster import RedisCluster from redis.cluster import RedisCluster
from redis.cluster import ClusterNode from redis.cluster import ClusterNode
from .proxy import Proxy from .proxy import Proxy
from .seeder import SeederBase from .seeder import StaticSeeder
from . import dfly_args from . import dfly_args
@ -1773,6 +1773,45 @@ async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory):
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.asyncio
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_factory.start_all(instances)
nodes = [await create_node_info(instance) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
logging.debug("Generating huge containers")
seeder = StaticSeeder(
key_target=10,
data_size=10_000_000,
collection_size=10_000,
variance=1,
samples=1,
types=["LIST", "HASH", "SET", "ZSET", "STRING"],
)
await seeder.run(nodes[0].client)
source_data = await StaticSeeder.capture(nodes[0].client)
nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id)
]
logging.debug("Migrating slots")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
logging.debug("Waiting for migration to finish")
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")
target_data = await StaticSeeder.capture(nodes[1].client)
assert source_data == target_data
def parse_lag(replication_info: str): def parse_lag(replication_info: str):
lags = re.findall("lag=([0-9]+)\r\n", replication_info) lags = re.findall("lag=([0-9]+)\r\n", replication_info)
assert len(lags) == 1 assert len(lags) == 1

View File

@ -1,4 +1,5 @@
import os import os
import logging
import pytest import pytest
import redis import redis
import asyncio import asyncio
@ -7,6 +8,7 @@ from redis import asyncio as aioredis
from . import dfly_multi_test_args, dfly_args from . import dfly_multi_test_args, dfly_args
from .instance import DflyStartException from .instance import DflyStartException
from .utility import batch_fill_data, gen_test_data, EnvironCntx from .utility import batch_fill_data, gen_test_data, EnvironCntx
from .seeder import StaticSeeder
@dfly_multi_test_args({"keys_output_limit": 512}, {"keys_output_limit": 1024}) @dfly_multi_test_args({"keys_output_limit": 512}, {"keys_output_limit": 1024})
@ -168,3 +170,38 @@ async def test_denyoom_commands(df_factory):
# mget should not be rejected # mget should not be rejected
await client.execute_command("mget x") await client.execute_command("mget x")
@pytest.mark.parametrize("type", ["LIST", "HASH", "SET", "ZSET", "STRING"])
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_rename_huge_values(df_factory, type):
df_server = df_factory.create()
df_server.start()
client = df_server.client()
logging.debug(f"Generating huge {type}")
seeder = StaticSeeder(
key_target=1,
data_size=10_000_000,
collection_size=10_000,
variance=1,
samples=1,
types=[type],
)
await seeder.run(client)
source_data = await StaticSeeder.capture(client)
logging.debug(f"src {source_data}")
# Rename multiple times to make sure the key moves between shards
orig_name = (await client.execute_command("keys *"))[0]
old_name = orig_name
new_name = ""
for i in range(10):
new_name = f"new:{i}"
await client.execute_command(f"rename {old_name} {new_name}")
old_name = new_name
await client.execute_command(f"rename {new_name} {orig_name}")
target_data = await StaticSeeder.capture(client)
assert source_data == target_data