chore: reduce pipelining latency by reusing existing shard fibers (#3494)

* chore: reduce pipelining latency by reusing existing shard fibers

To prove the benefits, run `./dfly_bench --pipeline=50   -n 20000  --ratio 0:1  --qps=0  --key_maximum=1`
Before: the average pipelining latency was 10ms
After: the average pipelining latency is 5ms.
Avg latency: pipelined_latency_usec / total_pipelined_squashed_commands

Also, improved counting of squashed commands - to count actual squashed ones.
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-08-14 14:45:54 +03:00 committed by GitHub
parent a2e63f144c
commit 93f6773297
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 51 additions and 24 deletions

View File

@ -1280,7 +1280,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
squash_cmds.push_back(absl::MakeSpan(pmsg->args));
}
stats_->squashed_commands += squash_cmds.size();
cc_->async_dispatch = true;
size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());

View File

@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 120u);
static_assert(kSizeConnStats == 112u);
ADD(read_buf_capacity);
ADD(dispatch_queue_entries);
@ -37,7 +37,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(num_replicas);
ADD(num_blocked_clients);
ADD(num_migrations);
ADD(squashed_commands);
return *this;
}

View File

@ -117,7 +117,7 @@ struct ConnectionStats {
uint32_t num_replicas = 0;
uint32_t num_blocked_clients = 0;
uint64_t num_migrations = 0;
uint64_t squashed_commands = 0;
ConnectionStats& operator+=(const ConnectionStats& o);
};

View File

@ -1398,6 +1398,7 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
intrusive_ptr<Transaction> dist_trans;
size_t dispatched = 0;
auto* ss = dfly::ServerState::tlocal();
auto perform_squash = [&] {
if (stored_cmds.empty())
@ -1416,11 +1417,12 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
dfly_cntx->transaction = nullptr;
dispatched += stored_cmds.size();
ss->stats.squashed_commands += stored_cmds.size();
stored_cmds.clear();
};
// Don't even start when paused. We can only continue if DispatchTracker is aware of us running.
if (dfly::ServerState::tlocal()->IsPaused())
if (ss->IsPaused())
return 0;
for (auto args : args_list) {
@ -1451,7 +1453,7 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
perform_squash();
// Stop accumulating when a pause is requested, fall back to regular dispatch
if (dfly::ServerState::tlocal()->IsPaused())
if (ss->IsPaused())
break;
// Dispatch non squashed command only after all squshed commands were executed and replied

View File

@ -191,8 +191,12 @@ bool MultiCommandSquasher::ExecuteSquashed() {
if (order_.empty())
return true;
for (auto& sd : sharded_)
unsigned num_shards = 0;
for (auto& sd : sharded_) {
sd.replies.reserve(sd.cmds.size());
if (!sd.cmds.empty())
++num_shards;
}
Transaction* tx = cntx_->transaction;
ServerState::tlocal()->stats.multi_squash_executions++;
@ -207,8 +211,24 @@ bool MultiCommandSquasher::ExecuteSquashed() {
tx->PrepareSquashedMultiHop(base_cid_, cb);
tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); });
} else {
#if 1
fb2::BlockingCounter bc(num_shards);
DVLOG(1) << "Squashing " << num_shards << " " << tx->DebugId();
auto cb = [this, tx, bc]() mutable {
this->SquashedHopCb(tx, EngineShard::tlocal());
bc->Dec();
};
for (unsigned i = 0; i < sharded_.size(); ++i) {
if (!sharded_[i].cmds.empty())
shard_set->AddL2(i, cb);
}
bc->Wait();
#else
shard_set->RunBlockingInParallel([this, tx](auto* es) { SquashedHopCb(tx, es); },
[this](auto sid) { return !sharded_[sid].cmds.empty(); });
#endif
}
uint64_t after_hop = proactor->GetMonotonicTimeNs();

View File

@ -2154,7 +2154,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("total_commands_processed", conn_stats.command_cnt);
append("instantaneous_ops_per_sec", m.qps);
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
append("total_pipelined_squashed_commands", conn_stats.squashed_commands);
append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands);
append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency);
append("total_net_input_bytes", conn_stats.io_read_bytes);
append("connection_migrations", conn_stats.num_migrations);

View File

@ -27,25 +27,29 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) {
}
ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
static_assert(sizeof(Stats) == 16 * 8, "Stats size mismatch");
static_assert(sizeof(Stats) == 17 * 8, "Stats size mismatch");
this->eval_io_coordination_cnt += other.eval_io_coordination_cnt;
this->eval_shardlocal_coordination_cnt += other.eval_shardlocal_coordination_cnt;
this->eval_squashed_flushes += other.eval_squashed_flushes;
#define ADD(x) this->x += (other.x)
this->tx_global_cnt += other.tx_global_cnt;
this->tx_normal_cnt += other.tx_normal_cnt;
this->tx_inline_runs += other.tx_inline_runs;
this->tx_schedule_cancel_cnt += other.tx_schedule_cancel_cnt;
ADD(eval_io_coordination_cnt);
this->multi_squash_executions += other.multi_squash_executions;
this->multi_squash_exec_hop_usec += other.multi_squash_exec_hop_usec;
this->multi_squash_exec_reply_usec += other.multi_squash_exec_reply_usec;
ADD(eval_shardlocal_coordination_cnt);
ADD(eval_squashed_flushes);
this->blocked_on_interpreter += other.blocked_on_interpreter;
this->rdb_save_usec += other.rdb_save_usec;
this->rdb_save_count += other.rdb_save_count;
this->oom_error_cmd_cnt += other.oom_error_cmd_cnt;
ADD(tx_global_cnt);
ADD(tx_normal_cnt);
ADD(tx_inline_runs);
ADD(tx_schedule_cancel_cnt);
ADD(multi_squash_executions);
ADD(multi_squash_exec_hop_usec);
ADD(multi_squash_exec_reply_usec);
ADD(squashed_commands);
ADD(blocked_on_interpreter);
ADD(rdb_save_usec);
ADD(rdb_save_count);
ADD(oom_error_cmd_cnt);
if (this->tx_width_freq_arr.size() > 0) {
DCHECK_EQ(this->tx_width_freq_arr.size(), other.tx_width_freq_arr.size());
@ -54,6 +58,7 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
this->tx_width_freq_arr = other.tx_width_freq_arr;
}
return *this;
#undef ADD
}
void MonitorsRepo::Add(facade::Connection* connection) {

View File

@ -116,7 +116,7 @@ class ServerState { // public struct - to allow initialization.
uint64_t multi_squash_executions = 0;
uint64_t multi_squash_exec_hop_usec = 0;
uint64_t multi_squash_exec_reply_usec = 0;
uint64_t squashed_commands = 0;
uint64_t blocked_on_interpreter = 0;
uint64_t rdb_save_usec = 0;

View File

@ -40,6 +40,7 @@ datasources:
graphiteVersion: "1.1"
tlsAuth: false
tlsAuthWithCACert: false
timeInterval: 1s # Based on https://stackoverflow.com/a/66830690
# <string> json object of data that will be encrypted.
secureJsonData:
tlsCACert: "..."