diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 7fd18a0b1..6f6853ec1 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1280,7 +1280,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { auto& pmsg = get(msg.handle); squash_cmds.push_back(absl::MakeSpan(pmsg->args)); } - stats_->squashed_commands += squash_cmds.size(); + cc_->async_dispatch = true; size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get()); diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 01596a4b0..ed401b78d 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 120u); + static_assert(kSizeConnStats == 112u); ADD(read_buf_capacity); ADD(dispatch_queue_entries); @@ -37,7 +37,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(num_replicas); ADD(num_blocked_clients); ADD(num_migrations); - ADD(squashed_commands); return *this; } diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 5b8e0eff1..2ee50b655 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -117,7 +117,7 @@ struct ConnectionStats { uint32_t num_replicas = 0; uint32_t num_blocked_clients = 0; uint64_t num_migrations = 0; - uint64_t squashed_commands = 0; + ConnectionStats& operator+=(const ConnectionStats& o); }; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 2d20f457d..33cf54c2b 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1398,6 +1398,7 @@ size_t Service::DispatchManyCommands(absl::Span args_list, intrusive_ptr dist_trans; size_t dispatched = 0; + auto* ss = dfly::ServerState::tlocal(); auto perform_squash = [&] { if (stored_cmds.empty()) @@ -1416,11 +1417,12 @@ size_t Service::DispatchManyCommands(absl::Span args_list, dfly_cntx->transaction = nullptr; dispatched += stored_cmds.size(); + ss->stats.squashed_commands += stored_cmds.size(); stored_cmds.clear(); }; // Don't even start when paused. We can only continue if DispatchTracker is aware of us running. - if (dfly::ServerState::tlocal()->IsPaused()) + if (ss->IsPaused()) return 0; for (auto args : args_list) { @@ -1451,7 +1453,7 @@ size_t Service::DispatchManyCommands(absl::Span args_list, perform_squash(); // Stop accumulating when a pause is requested, fall back to regular dispatch - if (dfly::ServerState::tlocal()->IsPaused()) + if (ss->IsPaused()) break; // Dispatch non squashed command only after all squshed commands were executed and replied diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 80d518fbf..814db9ca9 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -191,8 +191,12 @@ bool MultiCommandSquasher::ExecuteSquashed() { if (order_.empty()) return true; - for (auto& sd : sharded_) + unsigned num_shards = 0; + for (auto& sd : sharded_) { sd.replies.reserve(sd.cmds.size()); + if (!sd.cmds.empty()) + ++num_shards; + } Transaction* tx = cntx_->transaction; ServerState::tlocal()->stats.multi_squash_executions++; @@ -207,8 +211,24 @@ bool MultiCommandSquasher::ExecuteSquashed() { tx->PrepareSquashedMultiHop(base_cid_, cb); tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); }); } else { +#if 1 + fb2::BlockingCounter bc(num_shards); + DVLOG(1) << "Squashing " << num_shards << " " << tx->DebugId(); + + auto cb = [this, tx, bc]() mutable { + this->SquashedHopCb(tx, EngineShard::tlocal()); + bc->Dec(); + }; + + for (unsigned i = 0; i < sharded_.size(); ++i) { + if (!sharded_[i].cmds.empty()) + shard_set->AddL2(i, cb); + } + bc->Wait(); +#else shard_set->RunBlockingInParallel([this, tx](auto* es) { SquashedHopCb(tx, es); }, [this](auto sid) { return !sharded_[sid].cmds.empty(); }); +#endif } uint64_t after_hop = proactor->GetMonotonicTimeNs(); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 5ae91b35f..c1e79693b 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2154,7 +2154,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("total_commands_processed", conn_stats.command_cnt); append("instantaneous_ops_per_sec", m.qps); append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt); - append("total_pipelined_squashed_commands", conn_stats.squashed_commands); + append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands); append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency); append("total_net_input_bytes", conn_stats.io_read_bytes); append("connection_migrations", conn_stats.num_migrations); diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 9039176fb..0bded3d31 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -27,25 +27,29 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) { } ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 16 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 17 * 8, "Stats size mismatch"); - this->eval_io_coordination_cnt += other.eval_io_coordination_cnt; - this->eval_shardlocal_coordination_cnt += other.eval_shardlocal_coordination_cnt; - this->eval_squashed_flushes += other.eval_squashed_flushes; +#define ADD(x) this->x += (other.x) - this->tx_global_cnt += other.tx_global_cnt; - this->tx_normal_cnt += other.tx_normal_cnt; - this->tx_inline_runs += other.tx_inline_runs; - this->tx_schedule_cancel_cnt += other.tx_schedule_cancel_cnt; + ADD(eval_io_coordination_cnt); - this->multi_squash_executions += other.multi_squash_executions; - this->multi_squash_exec_hop_usec += other.multi_squash_exec_hop_usec; - this->multi_squash_exec_reply_usec += other.multi_squash_exec_reply_usec; + ADD(eval_shardlocal_coordination_cnt); + ADD(eval_squashed_flushes); - this->blocked_on_interpreter += other.blocked_on_interpreter; - this->rdb_save_usec += other.rdb_save_usec; - this->rdb_save_count += other.rdb_save_count; - this->oom_error_cmd_cnt += other.oom_error_cmd_cnt; + ADD(tx_global_cnt); + ADD(tx_normal_cnt); + ADD(tx_inline_runs); + ADD(tx_schedule_cancel_cnt); + + ADD(multi_squash_executions); + ADD(multi_squash_exec_hop_usec); + ADD(multi_squash_exec_reply_usec); + ADD(squashed_commands); + + ADD(blocked_on_interpreter); + ADD(rdb_save_usec); + ADD(rdb_save_count); + ADD(oom_error_cmd_cnt); if (this->tx_width_freq_arr.size() > 0) { DCHECK_EQ(this->tx_width_freq_arr.size(), other.tx_width_freq_arr.size()); @@ -54,6 +58,7 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { this->tx_width_freq_arr = other.tx_width_freq_arr; } return *this; +#undef ADD } void MonitorsRepo::Add(facade::Connection* connection) { diff --git a/src/server/server_state.h b/src/server/server_state.h index bd0a25583..ecb392e74 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -116,7 +116,7 @@ class ServerState { // public struct - to allow initialization. uint64_t multi_squash_executions = 0; uint64_t multi_squash_exec_hop_usec = 0; uint64_t multi_squash_exec_reply_usec = 0; - + uint64_t squashed_commands = 0; uint64_t blocked_on_interpreter = 0; uint64_t rdb_save_usec = 0; diff --git a/tools/local/monitoring/grafana/provisioning/datasources/datasource.yml b/tools/local/monitoring/grafana/provisioning/datasources/datasource.yml index c02bb38b3..a9c404c9d 100644 --- a/tools/local/monitoring/grafana/provisioning/datasources/datasource.yml +++ b/tools/local/monitoring/grafana/provisioning/datasources/datasource.yml @@ -40,6 +40,7 @@ datasources: graphiteVersion: "1.1" tlsAuth: false tlsAuthWithCACert: false + timeInterval: 1s # Based on https://stackoverflow.com/a/66830690 # json object of data that will be encrypted. secureJsonData: tlsCACert: "..."