chore: Log db_index in traffic logger (#2951)

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-04-24 15:13:53 +03:00 committed by GitHub
parent 89b1d7d52a
commit df598e4825
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 87 additions and 56 deletions

View File

@ -1,5 +1,5 @@
add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc service_interface.cc
reply_capture.cc resp_expr.cc cmd_arg_parser.cc tls_error.cc)
if (DF_USE_SSL)

View File

@ -164,9 +164,14 @@ void OpenTrafficLogger(string_view base_path) {
#else
LOG(WARNING) << "Traffic logger is only supported on Linux";
#endif
// Write version, incremental numbering :)
uint8_t version[1] = {2};
tl_traffic_logger.log_file->Write(version);
}
void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp) {
void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp,
ServiceInterface::ContextInfo ci) {
string_view cmd = resp.front().GetView();
if (absl::EqualsIgnoreCase(cmd, "debug"sv))
return;
@ -176,28 +181,33 @@ void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp) {
char stack_buf[1024];
char* next = stack_buf;
// We write id, timestamp, has_more, num_parts, part_len, part_len, part_len, ...
// We write id, timestamp, db_index, has_more, num_parts, part_len, part_len, part_len, ...
// And then all the part blobs concatenated together.
auto write_u32 = [&next](uint32_t i) {
absl::little_endian::Store32(next, i);
next += 4;
};
// id
write_u32(id);
// timestamp
absl::little_endian::Store64(next, absl::GetCurrentTimeNanos());
next += 8;
// db_index
write_u32(ci.db_index);
// has_more, num_parts
write_u32(has_more ? 1 : 0);
write_u32(uint32_t(resp.size()));
// Grab the lock and check if the file is still open.
lock_guard lk{tl_traffic_logger.mutex};
if (!tl_traffic_logger.log_file)
return;
// Proceed with writing the blob lengths.
// part_len, ...
for (auto part : resp) {
if (size_t(next - stack_buf + 4) > sizeof(stack_buf)) {
if (!tl_traffic_logger.Write(string_view{stack_buf, size_t(next - stack_buf)})) {
@ -743,7 +753,7 @@ std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() co
string_view phase_name = PHASE_NAMES[phase_];
if (cc_) {
string cc_info = service_->GetContextInfo(cc_.get());
string cc_info = service_->GetContextInfo(cc_.get()).Format();
if (cc_->reply_builder()->IsSendActive())
phase_name = "send";
absl::StrAppend(&after, " ", cc_info);
@ -921,7 +931,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
string conn_info = service_->GetContextInfo(cc_.get());
string conn_info = service_->GetContextInfo(cc_.get()).Format();
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName()
<< " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message();
}
@ -983,10 +993,8 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder)
bool has_more = consumed < io_buf_.InputLen();
if (tl_traffic_logger.log_file) {
if (IsMain()) { // log only on the main interface.
LogTraffic(id_, has_more, absl::MakeSpan(parse_args));
}
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get()));
}
DispatchCommand(has_more, dispatch_sync, dispatch_async);
}

View File

@ -0,0 +1,34 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "facade/service_interface.h"
#include <absl/strings/str_cat.h>
namespace facade {
std::string ServiceInterface::ContextInfo::Format() const {
char buf[16] = {0};
std::string res = absl::StrCat("db=", db_index);
unsigned index = 0;
if (async_dispatch)
buf[index++] = 'a';
if (conn_closing)
buf[index++] = 't';
if (subscribers)
buf[index++] = 'P';
if (blocked)
buf[index++] = 'b';
if (index)
absl::StrAppend(&res, " flags=", buf);
return res;
}
} // namespace facade

View File

@ -42,7 +42,14 @@ class ServiceInterface {
virtual void OnClose(ConnectionContext* cntx) {
}
virtual std::string GetContextInfo(ConnectionContext* cntx) {
struct ContextInfo {
std::string Format() const;
unsigned db_index;
bool async_dispatch, conn_closing, subscribers, blocked;
};
virtual ContextInfo GetContextInfo(ConnectionContext* cntx) const {
return {};
}
};

View File

@ -2556,29 +2556,13 @@ void Service::OnClose(facade::ConnectionContext* cntx) {
cntx->conn()->SetClientTrackingSwitch(false);
}
string Service::GetContextInfo(facade::ConnectionContext* cntx) {
char buf[16] = {0};
unsigned index = 0;
Service::ContextInfo Service::GetContextInfo(facade::ConnectionContext* cntx) const {
ConnectionContext* server_cntx = static_cast<ConnectionContext*>(cntx);
string res = absl::StrCat("db=", server_cntx->db_index());
if (server_cntx->async_dispatch)
buf[index++] = 'a';
if (server_cntx->conn_closing)
buf[index++] = 't';
if (server_cntx->conn_state.subscribe_info)
buf[index++] = 'P';
if (server_cntx->blocked)
buf[index++] = 'b';
if (index) {
absl::StrAppend(&res, " flags=", buf);
}
return res;
return {.db_index = server_cntx->db_index(),
.async_dispatch = server_cntx->async_dispatch,
.conn_closing = server_cntx->conn_closing,
.subscribers = bool(server_cntx->conn_state.subscribe_info),
.blocked = server_cntx->blocked};
}
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);

View File

@ -94,7 +94,8 @@ class Service : public facade::ServiceInterface {
void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final;
void OnClose(facade::ConnectionContext* cntx) final;
std::string GetContextInfo(facade::ConnectionContext* cntx) final;
Service::ContextInfo GetContextInfo(facade::ConnectionContext* cntx) const final;
uint32_t shard_count() const {
return shard_set->size();

View File

@ -21,6 +21,7 @@ var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per clie
type RecordHeader struct {
Client uint32
Time uint64
DbIndex uint32
HasMore uint32
}
@ -47,6 +48,7 @@ func DetermineBaseTime(files []string) time.Time {
type ClientWorker struct {
redis *redis.Client
incoming chan Record
processed uint
}
// Handles a single file and distributes messages to clients
@ -62,6 +64,11 @@ type FileWorker struct {
func (c ClientWorker) Run(worker *FileWorker) {
for msg := range c.incoming {
if c.processed == 0 && msg.DbIndex != 0 {
// There is no easy way to switch, we rely on connection pool consisting only of one connection
c.redis.Do(context.Background(), []interface{}{"SELECT", fmt.Sprint(msg.DbIndex)})
}
lag := time.Until(worker.HappensAt(time.Unix(0, int64(msg.Time))))
if lag < 0 {
atomic.AddUint64(&worker.delayed, 1)
@ -70,6 +77,7 @@ func (c ClientWorker) Run(worker *FileWorker) {
c.redis.Do(context.Background(), msg.values...).Result()
atomic.AddUint64(&worker.processed, 1)
c.processed += 1
}
worker.clientGroup.Done()
}

View File

@ -3,7 +3,6 @@ package main
import (
"bufio"
"encoding/binary"
"errors"
"io"
"log"
"os"
@ -29,25 +28,8 @@ func parseStrings(file io.Reader) (out []interface{}, err error) {
for i := range out {
strLen = out[i].(uint32)
if strLen == 0 {
err = binary.Read(file, binary.LittleEndian, &strLen)
if err != nil {
return nil, err
}
if strLen > 100000000 {
log.Printf("Bad string length %v, index %v out of %v", strLen, i, num)
for j := 0; j < i; j++ {
log.Printf("Str %v %v", j, out[j])
}
return nil, errors.New("failed to parse a string len ")
}
out[i] = kBigEmptyBytes[:strLen]
continue
}
buf := make([]byte, strLen)
_, err := io.ReadFull(file, buf)
if err != nil {
return nil, err
@ -66,6 +48,13 @@ func parseRecords(filename string, cb func(Record) bool) error {
defer file.Close()
reader := bufio.NewReader(file)
var version uint8
binary.Read(reader, binary.LittleEndian, &version)
if version != 2 {
panic("Requires version two replayer, roll back in commits!")
}
recordNum := 0
for {
var rec Record