chore: export pipeline related metrics (#3104)

* chore: export pipeline related metrics

Export in /metrics
1. Total pipeline queue length
2. Total pipeline commands
3. Total pipelined duration

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-05-30 19:10:35 +03:00 committed by GitHub
parent 137bd313ef
commit 0394387a5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 169 additions and 57 deletions

View File

@ -7,6 +7,10 @@ on:
branches: [main] branches: [main]
workflow_dispatch: workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs: jobs:
pre-commit: pre-commit:
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@ -97,10 +97,14 @@ class ConnectionContext {
bool async_dispatch : 1; // whether this connection is amid an async dispatch bool async_dispatch : 1; // whether this connection is amid an async dispatch
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
bool journal_emulated : 1; // whether it is used to dispatch journal commands bool journal_emulated : 1; // whether it is used to dispatch journal commands
bool paused = false; // whether this connection is paused due to CLIENT PAUSE
bool paused = false; // whether this connection is paused due to CLIENT PAUSE
// whether it's blocked on blocking commands like BLPOP, needs to be addressable // whether it's blocked on blocking commands like BLPOP, needs to be addressable
bool blocked = false; bool blocked = false;
// Skip ACL validation, used by internal commands and commands run on admin port
bool skip_acl_validation = false;
// How many async subscription sources are active: monitor and/or pubsub - at most 2. // How many async subscription sources are active: monitor and/or pubsub - at most 2.
uint8_t subscriptions; uint8_t subscriptions;
@ -108,8 +112,6 @@ class ConnectionContext {
std::string authed_username{"default"}; std::string authed_username{"default"};
uint32_t acl_categories{dfly::acl::ALL}; uint32_t acl_categories{dfly::acl::ALL};
std::vector<uint64_t> acl_commands; std::vector<uint64_t> acl_commands;
// Skip ACL validation, used by internal commands and commands run on admin port
bool skip_acl_validation = false;
// keys // keys
dfly::acl::AclKeys keys{{}, true}; dfly::acl::AclKeys keys{{}, true};

View File

@ -393,19 +393,6 @@ size_t Connection::MessageHandle::UsedMemory() const {
return sizeof(MessageHandle) + visit(MessageSize{}, this->handle); return sizeof(MessageHandle) + visit(MessageSize{}, this->handle);
} }
bool Connection::MessageHandle::IsIntrusive() const {
return holds_alternative<AclUpdateMessagePtr>(handle) ||
holds_alternative<CheckpointMessage>(handle);
}
bool Connection::MessageHandle::IsPipelineMsg() const {
return holds_alternative<PipelineMessagePtr>(handle);
}
bool Connection::MessageHandle::IsPubMsg() const {
return holds_alternative<PubMessagePtr>(handle);
}
bool Connection::MessageHandle::IsReplying() const { bool Connection::MessageHandle::IsReplying() const {
return IsPipelineMsg() || IsPubMsg() || holds_alternative<MonitorMessage>(handle) || return IsPipelineMsg() || IsPubMsg() || holds_alternative<MonitorMessage>(handle) ||
(holds_alternative<MCPipelineMessagePtr>(handle) && (holds_alternative<MCPipelineMessagePtr>(handle) &&
@ -751,6 +738,9 @@ std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() co
string after; string after;
absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id)); absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id));
if (dispatch_q_.size()) {
absl::StrAppend(&after, " pipeline=", dispatch_q_.size());
}
absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_); absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_);
string_view phase_name = PHASE_NAMES[phase_]; string_view phase_name = PHASE_NAMES[phase_];
@ -1272,7 +1262,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
cc_->async_dispatch = false; cc_->async_dispatch = false;
auto it = dispatch_q_.begin(); auto it = dispatch_q_.begin();
while (it->IsIntrusive()) // Skip all newly received intrusive messages while (it->IsControl()) // Skip all newly received intrusive messages
++it; ++it;
for (auto rit = it; rit != it + dispatched; ++rit) for (auto rit = it; rit != it + dispatched; ++rit)
@ -1291,7 +1281,7 @@ void Connection::ClearPipelinedMessages() {
// As well as to avoid pubsub backpressure leakege. // As well as to avoid pubsub backpressure leakege.
for (auto& msg : dispatch_q_) { for (auto& msg : dispatch_q_) {
FiberAtomicGuard guard; // don't suspend when concluding to avoid getting new messages FiberAtomicGuard guard; // don't suspend when concluding to avoid getting new messages
if (msg.IsIntrusive()) if (msg.IsControl())
visit(dispatch_op, msg.handle); // to not miss checkpoints visit(dispatch_op, msg.handle); // to not miss checkpoints
RecycleMessage(std::move(msg)); RecycleMessage(std::move(msg));
} }
@ -1309,7 +1299,7 @@ std::string Connection::DebugInfo() const {
absl::StrAppend(&info, "closing=", cc_->conn_closing, ", "); absl::StrAppend(&info, "closing=", cc_->conn_closing, ", ");
absl::StrAppend(&info, "dispatch_fiber:joinable=", dispatch_fb_.IsJoinable(), ", "); absl::StrAppend(&info, "dispatch_fiber:joinable=", dispatch_fb_.IsJoinable(), ", ");
bool intrusive_front = dispatch_q_.size() > 0 && dispatch_q_.front().IsIntrusive(); bool intrusive_front = dispatch_q_.size() > 0 && dispatch_q_.front().IsControl();
absl::StrAppend(&info, "dispatch_queue:size=", dispatch_q_.size(), ", "); absl::StrAppend(&info, "dispatch_queue:size=", dispatch_q_.size(), ", ");
absl::StrAppend(&info, "dispatch_queue:pipelined=", pending_pipeline_cmd_cnt_, ", "); absl::StrAppend(&info, "dispatch_queue:pipelined=", pending_pipeline_cmd_cnt_, ", ");
absl::StrAppend(&info, "dispatch_queue:intrusive=", intrusive_front, ", "); absl::StrAppend(&info, "dispatch_queue:intrusive=", intrusive_front, ", ");
@ -1549,7 +1539,7 @@ void Connection::SendAsync(MessageHandle msg) {
// "Closing" connections might be still processing commands, as we don't interrupt them. // "Closing" connections might be still processing commands, as we don't interrupt them.
// So we still want to deliver control messages to them (like checkpoints). // So we still want to deliver control messages to them (like checkpoints).
if (cc_->conn_closing && !msg.IsIntrusive()) if (cc_->conn_closing && !msg.IsControl())
return; return;
// If we launch while closing, it won't be awaited. Control messages will be processed on cleanup. // If we launch while closing, it won't be awaited. Control messages will be processed on cleanup.
@ -1573,9 +1563,9 @@ void Connection::SendAsync(MessageHandle msg) {
pending_pipeline_cmd_cnt_++; pending_pipeline_cmd_cnt_++;
} }
if (msg.IsIntrusive()) { if (msg.IsControl()) {
auto it = dispatch_q_.begin(); auto it = dispatch_q_.begin();
while (it < dispatch_q_.end() && it->IsIntrusive()) while (it < dispatch_q_.end() && it->IsControl())
++it; ++it;
dispatch_q_.insert(it, std::move(msg)); dispatch_q_.insert(it, std::move(msg));
} else { } else {

View File

@ -147,12 +147,21 @@ class Connection : public util::Connection {
struct MessageHandle { struct MessageHandle {
size_t UsedMemory() const; // How much bytes this handle takes up in total. size_t UsedMemory() const; // How much bytes this handle takes up in total.
// Intrusive messages put themselves at the front of the queue, but only after all other // Control messages put themselves at the front of the queue, but only after all other
// intrusive ones. Used for quick transfer of control / update messages. // control ones. Used for management messages.
bool IsIntrusive() const; bool IsControl() const {
return std::holds_alternative<AclUpdateMessagePtr>(handle) ||
std::holds_alternative<CheckpointMessage>(handle);
}
bool IsPipelineMsg() const {
return std::holds_alternative<PipelineMessagePtr>(handle);
}
bool IsPubMsg() const {
return std::holds_alternative<PubMessagePtr>(handle);
}
bool IsPipelineMsg() const;
bool IsPubMsg() const;
bool IsReplying() const; // control messges don't reply, messages carrying data do bool IsReplying() const; // control messges don't reply, messages carrying data do
std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, MCPipelineMessagePtr, std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, MCPipelineMessagePtr,

View File

@ -48,13 +48,13 @@ inline std::string_view ToSV(std::string_view slice) {
struct ConnectionStats { struct ConnectionStats {
size_t read_buf_capacity = 0; // total capacity of input buffers size_t read_buf_capacity = 0; // total capacity of input buffers
size_t dispatch_queue_entries = 0; // total number of dispatch queue entries uint64_t dispatch_queue_entries = 0; // total number of dispatch queue entries
size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries
size_t dispatch_queue_subscriber_bytes = 0; // total size of all publish messages size_t dispatch_queue_subscriber_bytes = 0; // total size of all publish messages
size_t pipeline_cmd_cache_bytes = 0; size_t pipeline_cmd_cache_bytes = 0;
size_t io_read_cnt = 0; uint64_t io_read_cnt = 0;
size_t io_read_bytes = 0; size_t io_read_bytes = 0;
uint64_t command_cnt = 0; uint64_t command_cnt = 0;

View File

@ -1071,10 +1071,17 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
MetricType::GAUGE, &resp->body()); MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("blocked_clients", "", conn_stats.num_blocked_clients, AppendMetricWithoutLabels("blocked_clients", "", conn_stats.num_blocked_clients,
MetricType::GAUGE, &resp->body()); MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("dispatch_queue_bytes", "", conn_stats.dispatch_queue_bytes, AppendMetricWithoutLabels("pipeline_queue_bytes", "", conn_stats.dispatch_queue_bytes,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_queue_length", "", conn_stats.dispatch_queue_entries,
MetricType::GAUGE, &resp->body()); MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_cmd_cache_bytes", "", conn_stats.pipeline_cmd_cache_bytes, AppendMetricWithoutLabels("pipeline_cmd_cache_bytes", "", conn_stats.pipeline_cmd_cache_bytes,
MetricType::GAUGE, &resp->body()); MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_commands_total", "", conn_stats.pipelined_cmd_cnt,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("pipeline_commands_duration_seconds", "",
conn_stats.pipelined_cmd_latency * 1e-6, MetricType::COUNTER,
&resp->body());
// Memory metrics // Memory metrics
auto sdata_res = io::ReadStatusInfo(); auto sdata_res = io::ReadStatusInfo();
@ -1977,7 +1984,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("max_clients", GetFlag(FLAGS_maxclients)); append("max_clients", GetFlag(FLAGS_maxclients));
append("client_read_buffer_bytes", m.facade_stats.conn_stats.read_buf_capacity); append("client_read_buffer_bytes", m.facade_stats.conn_stats.read_buf_capacity);
append("blocked_clients", m.facade_stats.conn_stats.num_blocked_clients); append("blocked_clients", m.facade_stats.conn_stats.num_blocked_clients);
append("dispatch_queue_entries", m.facade_stats.conn_stats.dispatch_queue_entries); append("pipeline_queue_length", m.facade_stats.conn_stats.dispatch_queue_entries);
} }
if (should_enter("MEMORY")) { if (should_enter("MEMORY")) {

View File

@ -6,7 +6,7 @@ volumes:
services: services:
prometheus: prometheus:
image: prom/prometheus image: prom/prometheus:v2.45.5
restart: always restart: always
volumes: volumes:
- ./prometheus:/etc/prometheus/ - ./prometheus:/etc/prometheus/

View File

@ -105,7 +105,7 @@
}, },
"textMode": "auto" "textMode": "auto"
}, },
"pluginVersion": "9.3.6", "pluginVersion": "10.1.10",
"targets": [ "targets": [
{ {
"datasource": { "datasource": {
@ -191,7 +191,7 @@
}, },
"textMode": "auto" "textMode": "auto"
}, },
"pluginVersion": "9.3.6", "pluginVersion": "10.1.10",
"targets": [ "targets": [
{ {
"datasource": { "datasource": {
@ -282,7 +282,7 @@
"showThresholdLabels": false, "showThresholdLabels": false,
"showThresholdMarkers": true "showThresholdMarkers": true
}, },
"pluginVersion": "9.3.6", "pluginVersion": "10.1.10",
"targets": [ "targets": [
{ {
"datasource": { "datasource": {
@ -1609,10 +1609,24 @@
"fullMetaSearch": false, "fullMetaSearch": false,
"includeNullMetadata": false, "includeNullMetadata": false,
"instant": false, "instant": false,
"legendFormat": "__auto", "legendFormat": "switch",
"range": true, "range": true,
"refId": "A", "refId": "A",
"useBackend": false "useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr":
"rate(dragonfly_fiber_longrun_seconds[$__rate_interval])/rate(dragonfly_fiber_longrun_total[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "longrun",
"range": true,
"refId": "B"
} }
], ],
"title": "FiberSwitchDelay", "title": "FiberSwitchDelay",
@ -1622,7 +1636,7 @@
{ {
"datasource": { "datasource": {
"type": "prometheus", "type": "prometheus",
"uid": "${DS_PROMETHEUS}" "uid": "PBFA97CFB590B2093"
}, },
"fieldConfig": { "fieldConfig": {
"defaults": { "defaults": {
@ -1673,8 +1687,7 @@
"value": 80 "value": 80
} }
] ]
}, }
"unit": "s"
}, },
"overrides": [] "overrides": []
}, },
@ -1684,7 +1697,7 @@
"x": 12, "x": 12,
"y": 30 "y": 30
}, },
"id": 20, "id": 22,
"options": { "options": {
"legend": { "legend": {
"calcs": [], "calcs": [],
@ -1703,21 +1716,15 @@
"type": "prometheus", "type": "prometheus",
"uid": "PBFA97CFB590B2093" "uid": "PBFA97CFB590B2093"
}, },
"disableTextWrap": false,
"editorMode": "code", "editorMode": "code",
"expr": "expr": "dragonfly_pipeline_queue_length/dragonfly_connected_clients",
"rate(dragonfly_fiber_longrun_seconds_total[$__rate_interval])/rate(dragonfly_fiber_longrun_total[$__rate_interval])",
"fullMetaSearch": false,
"includeNullMetadata": false,
"instant": false, "instant": false,
"legendFormat": "__auto", "legendFormat": "avr_pipeline_depth",
"range": true, "range": true,
"refId": "A", "refId": "A"
"useBackend": false
} }
], ],
"title": "FiberSwitchDelay", "title": "Pipeline length",
"transformations": [],
"type": "timeseries" "type": "timeseries"
}, },
{ {
@ -1826,6 +1833,102 @@
], ],
"title": "Master Replication memory", "title": "Master Replication memory",
"type": "timeseries" "type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 38
},
"id": 23,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"editorMode": "code",
"expr":
"rate(dragonfly_pipeline_commands_duration_seconds[$__rate_interval])/rate(dragonfly_pipeline_commands_total[$__rate_interval])",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Pipeline latency",
"type": "timeseries"
} }
], ],
"refresh": "1m", "refresh": "1m",
@ -1948,7 +2051,7 @@
] ]
}, },
"time": { "time": {
"from": "now-5m", "from": "now-15m",
"to": "now" "to": "now"
}, },
"timepicker": { "timepicker": {
@ -1979,6 +2082,6 @@
"timezone": "browser", "timezone": "browser",
"title": "Dragonfly Dashboard", "title": "Dragonfly Dashboard",
"uid": "xDLNRKUWz", "uid": "xDLNRKUWz",
"version": 1, "version": 4,
"weekStart": "" "weekStart": ""
} }

View File

@ -1,8 +1,7 @@
# my global config # my global config
global: global:
scrape_interval: 15s # By default, scrape targets every 15 seconds. scrape_interval: 5s
evaluation_interval: 15s # By default, scrape targets every 15 seconds. evaluation_interval: 5s
# scrape_timeout is set to the global default (10s).
# Attach these labels to any time series or alerts when communicating with # Attach these labels to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager). # external systems (federation, remote storage, Alertmanager).
@ -43,8 +42,6 @@ scrape_configs:
- job_name: 'node-exporter' - job_name: 'node-exporter'
# Override the global default and scrape targets from this job every 5 seconds.
scrape_interval: 1s scrape_interval: 1s
static_configs: static_configs:
- targets: ['node-exporter:9100'] - targets: ['node-exporter:9100']