mirror of
http://github.com/valkey-io/valkey
synced 2024-11-22 00:52:38 +00:00
PSYNC2: Fix the way replication info is saved/loaded from RDB.
This commit attempts to fix a number of bugs reported in #4316. They are related to the way replication info like replication ID, offsets, and currently selected DB in the master client, are stored and loaded by Redis. In order to avoid inconsistencies the changes in this commit try to enforce that: 1. Replication information are only stored when the RDB file is generated by a slave that has a valid 'master' client, so that we can always extract the currently selected DB. 2. When replication informations are persisted in the RDB file, all the info for a successful PSYNC or nothing is persisted. 3. The RDB replication informations are only loaded if the instance is configured as a slave, otherwise a master can start with IDs that relate to a different history of the data set, and stil retain such IDs in the future while receiving unrelated writes.
This commit is contained in:
parent
a4152119c6
commit
c1c99e9f4e
34
src/rdb.c
34
src/rdb.c
@ -858,16 +858,14 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
|
||||
|
||||
/* Handle saving options that generate aux fields. */
|
||||
if (rsi) {
|
||||
if (rsi->repl_stream_db &&
|
||||
rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
|
||||
== -1)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
|
||||
== -1) return -1;
|
||||
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
|
||||
== -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
|
||||
== -1) return -1;
|
||||
}
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) == -1) return -1;
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -2017,3 +2015,23 @@ void bgsaveCommand(client *c) {
|
||||
addReply(c,shared.err);
|
||||
}
|
||||
}
|
||||
|
||||
/* Populate the rdbSaveInfo structure used to persist the replication
|
||||
* information inside the RDB file. Currently the structure explicitly
|
||||
* contains just the currently selected DB from the master stream, however
|
||||
* if the rdbSave*() family functions receive a NULL rsi structure also
|
||||
* the Replication ID/offset is not saved. The function popultes 'rsi'
|
||||
* that is normally stack-allocated in the caller, returns the populated
|
||||
* pointer if the instance has a valid master client, otherwise NULL
|
||||
* is returned, and the RDB savign wil not persist any replication related
|
||||
* information. */
|
||||
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
|
||||
rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
|
||||
*rsi = rsi_init;
|
||||
if (server.master) {
|
||||
rsi->repl_stream_db = server.master->db->id;
|
||||
return rsi;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -147,5 +147,6 @@ int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
|
||||
int rdbSaveBinaryFloatValue(rio *rdb, float val);
|
||||
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
|
||||
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi);
|
||||
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
|
||||
|
||||
#endif
|
||||
|
@ -569,18 +569,12 @@ int startBgsaveForReplication(int mincapa) {
|
||||
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
|
||||
socket_target ? "slaves sockets" : "disk");
|
||||
|
||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||
/* If we are saving for a chained slave (that is, if we are,
|
||||
* in turn, a slave of another instance), make sure after
|
||||
* loadig the RDB, our slaves select the right DB: we'll just
|
||||
* send the replication stream we receive from our master, so
|
||||
* no way to send SELECT commands. */
|
||||
if (server.master) rsi.repl_stream_db = server.master->db->id;
|
||||
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (socket_target)
|
||||
retval = rdbSaveToSlavesSockets(&rsi);
|
||||
retval = rdbSaveToSlavesSockets(rsiptr);
|
||||
else
|
||||
retval = rdbSaveBackground(server.rdb_filename,&rsi);
|
||||
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
|
||||
|
||||
/* If we failed to BGSAVE, remove the slaves waiting for a full
|
||||
* resynchorinization from the list of salves, inform them with
|
||||
|
23
src/server.c
23
src/server.c
@ -1092,7 +1092,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
{
|
||||
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
|
||||
sp->changes, (int)sp->seconds);
|
||||
rdbSaveBackground(server.rdb_filename,NULL);
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
rdbSaveBackground(server.rdb_filename,rsiptr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -1164,7 +1166,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
|
||||
server.lastbgsave_status == C_OK))
|
||||
{
|
||||
if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK)
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
|
||||
server.rdb_bgsave_scheduled = 0;
|
||||
}
|
||||
|
||||
@ -2546,7 +2550,9 @@ int prepareForShutdown(int flags) {
|
||||
if ((server.saveparamslen > 0 && !nosave) || save) {
|
||||
serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
|
||||
/* Snapshotting. Perform a SYNC SAVE and exit */
|
||||
if (rdbSave(server.rdb_filename,NULL) != C_OK) {
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
|
||||
/* Ooops.. error saving! The best we can do is to continue
|
||||
* operating. Note that if there was a background saving process,
|
||||
* in the next cron() Redis will be notified that the background
|
||||
@ -3526,13 +3532,20 @@ void loadDataFromDisk(void) {
|
||||
(float)(ustime()-start)/1000000);
|
||||
|
||||
/* Restore the replication ID / offset from the RDB file. */
|
||||
if (rsi.repl_id_is_set && rsi.repl_offset != -1) {
|
||||
if (server.masterhost &&
|
||||
rsi.repl_id_is_set &&
|
||||
rsi.repl_offset != -1 &&
|
||||
/* Note that older implementations may save a repl_stream_db
|
||||
* of -1 inside the RDB file. */
|
||||
rsi.repl_stream_db != -1)
|
||||
{
|
||||
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
|
||||
server.master_repl_offset = rsi.repl_offset;
|
||||
/* If we are a slave, create a cached master from this
|
||||
* information, in order to allow partial resynchronizations
|
||||
* with masters. */
|
||||
if (server.masterhost) replicationCacheMasterUsingMyself();
|
||||
replicationCacheMasterUsingMyself();
|
||||
selectDb(server.cached_master,rsi.repl_stream_db);
|
||||
}
|
||||
} else if (errno != ENOENT) {
|
||||
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
|
||||
|
Loading…
Reference in New Issue
Block a user