diff --git a/src/facade/CMakeLists.txt b/src/facade/CMakeLists.txt index 408d17884..feef1da48 100644 --- a/src/facade/CMakeLists.txt +++ b/src/facade/CMakeLists.txt @@ -1,5 +1,5 @@ add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc - memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc + memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc service_interface.cc reply_capture.cc resp_expr.cc cmd_arg_parser.cc tls_error.cc) if (DF_USE_SSL) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 4011f83f8..c536cd27c 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -164,9 +164,14 @@ void OpenTrafficLogger(string_view base_path) { #else LOG(WARNING) << "Traffic logger is only supported on Linux"; #endif + + // Write version, incremental numbering :) + uint8_t version[1] = {2}; + tl_traffic_logger.log_file->Write(version); } -void LogTraffic(uint32_t id, bool has_more, absl::Span resp) { +void LogTraffic(uint32_t id, bool has_more, absl::Span resp, + ServiceInterface::ContextInfo ci) { string_view cmd = resp.front().GetView(); if (absl::EqualsIgnoreCase(cmd, "debug"sv)) return; @@ -176,28 +181,33 @@ void LogTraffic(uint32_t id, bool has_more, absl::Span resp) { char stack_buf[1024]; char* next = stack_buf; - // We write id, timestamp, has_more, num_parts, part_len, part_len, part_len, ... + // We write id, timestamp, db_index, has_more, num_parts, part_len, part_len, part_len, ... // And then all the part blobs concatenated together. auto write_u32 = [&next](uint32_t i) { absl::little_endian::Store32(next, i); next += 4; }; + // id write_u32(id); + // timestamp absl::little_endian::Store64(next, absl::GetCurrentTimeNanos()); next += 8; + // db_index + write_u32(ci.db_index); + + // has_more, num_parts write_u32(has_more ? 1 : 0); write_u32(uint32_t(resp.size())); // Grab the lock and check if the file is still open. lock_guard lk{tl_traffic_logger.mutex}; - if (!tl_traffic_logger.log_file) return; - // Proceed with writing the blob lengths. + // part_len, ... for (auto part : resp) { if (size_t(next - stack_buf + 4) > sizeof(stack_buf)) { if (!tl_traffic_logger.Write(string_view{stack_buf, size_t(next - stack_buf)})) { @@ -743,7 +753,7 @@ std::pair Connection::GetClientInfoBeforeAfterTid() co string_view phase_name = PHASE_NAMES[phase_]; if (cc_) { - string cc_info = service_->GetContextInfo(cc_.get()); + string cc_info = service_->GetContextInfo(cc_.get()).Format(); if (cc_->reply_builder()->IsSendActive()) phase_name = "send"; absl::StrAppend(&after, " ", cc_info); @@ -921,7 +931,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { } if (ec && !FiberSocketBase::IsConnClosed(ec)) { - string conn_info = service_->GetContextInfo(cc_.get()); + string conn_info = service_->GetContextInfo(cc_.get()).Format(); LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message(); } @@ -983,10 +993,8 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder) bool has_more = consumed < io_buf_.InputLen(); - if (tl_traffic_logger.log_file) { - if (IsMain()) { // log only on the main interface. - LogTraffic(id_, has_more, absl::MakeSpan(parse_args)); - } + if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) { + LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get())); } DispatchCommand(has_more, dispatch_sync, dispatch_async); } diff --git a/src/facade/service_interface.cc b/src/facade/service_interface.cc new file mode 100644 index 000000000..7bed34af8 --- /dev/null +++ b/src/facade/service_interface.cc @@ -0,0 +1,34 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "facade/service_interface.h" + +#include + +namespace facade { + +std::string ServiceInterface::ContextInfo::Format() const { + char buf[16] = {0}; + std::string res = absl::StrCat("db=", db_index); + + unsigned index = 0; + + if (async_dispatch) + buf[index++] = 'a'; + + if (conn_closing) + buf[index++] = 't'; + + if (subscribers) + buf[index++] = 'P'; + + if (blocked) + buf[index++] = 'b'; + + if (index) + absl::StrAppend(&res, " flags=", buf); + return res; +} + +} // namespace facade diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h index 8f28f01b1..901b06af5 100644 --- a/src/facade/service_interface.h +++ b/src/facade/service_interface.h @@ -42,7 +42,14 @@ class ServiceInterface { virtual void OnClose(ConnectionContext* cntx) { } - virtual std::string GetContextInfo(ConnectionContext* cntx) { + struct ContextInfo { + std::string Format() const; + + unsigned db_index; + bool async_dispatch, conn_closing, subscribers, blocked; + }; + + virtual ContextInfo GetContextInfo(ConnectionContext* cntx) const { return {}; } }; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 74a1d5daa..37ba88f1b 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -2556,29 +2556,13 @@ void Service::OnClose(facade::ConnectionContext* cntx) { cntx->conn()->SetClientTrackingSwitch(false); } -string Service::GetContextInfo(facade::ConnectionContext* cntx) { - char buf[16] = {0}; - unsigned index = 0; +Service::ContextInfo Service::GetContextInfo(facade::ConnectionContext* cntx) const { ConnectionContext* server_cntx = static_cast(cntx); - - string res = absl::StrCat("db=", server_cntx->db_index()); - - if (server_cntx->async_dispatch) - buf[index++] = 'a'; - - if (server_cntx->conn_closing) - buf[index++] = 't'; - - if (server_cntx->conn_state.subscribe_info) - buf[index++] = 'P'; - - if (server_cntx->blocked) - buf[index++] = 'b'; - - if (index) { - absl::StrAppend(&res, " flags=", buf); - } - return res; + return {.db_index = server_cntx->db_index(), + .async_dispatch = server_cntx->async_dispatch, + .conn_closing = server_cntx->conn_closing, + .subscribers = bool(server_cntx->conn_state.subscribe_info), + .blocked = server_cntx->blocked}; } using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx); diff --git a/src/server/main_service.h b/src/server/main_service.h index 0a645ceaa..0bc318378 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -94,7 +94,8 @@ class Service : public facade::ServiceInterface { void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final; void OnClose(facade::ConnectionContext* cntx) final; - std::string GetContextInfo(facade::ConnectionContext* cntx) final; + + Service::ContextInfo GetContextInfo(facade::ConnectionContext* cntx) const final; uint32_t shard_count() const { return shard_set->size(); diff --git a/tools/replay/main.go b/tools/replay/main.go index 608c4635c..77d71662c 100644 --- a/tools/replay/main.go +++ b/tools/replay/main.go @@ -21,6 +21,7 @@ var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per clie type RecordHeader struct { Client uint32 Time uint64 + DbIndex uint32 HasMore uint32 } @@ -45,8 +46,9 @@ func DetermineBaseTime(files []string) time.Time { // Handles a single connection/client type ClientWorker struct { - redis *redis.Client - incoming chan Record + redis *redis.Client + incoming chan Record + processed uint } // Handles a single file and distributes messages to clients @@ -62,6 +64,11 @@ type FileWorker struct { func (c ClientWorker) Run(worker *FileWorker) { for msg := range c.incoming { + if c.processed == 0 && msg.DbIndex != 0 { + // There is no easy way to switch, we rely on connection pool consisting only of one connection + c.redis.Do(context.Background(), []interface{}{"SELECT", fmt.Sprint(msg.DbIndex)}) + } + lag := time.Until(worker.HappensAt(time.Unix(0, int64(msg.Time)))) if lag < 0 { atomic.AddUint64(&worker.delayed, 1) @@ -70,6 +77,7 @@ func (c ClientWorker) Run(worker *FileWorker) { c.redis.Do(context.Background(), msg.values...).Result() atomic.AddUint64(&worker.processed, 1) + c.processed += 1 } worker.clientGroup.Done() } diff --git a/tools/replay/parsing.go b/tools/replay/parsing.go index 7d8021e33..de17c0dce 100644 --- a/tools/replay/parsing.go +++ b/tools/replay/parsing.go @@ -3,7 +3,6 @@ package main import ( "bufio" "encoding/binary" - "errors" "io" "log" "os" @@ -29,25 +28,8 @@ func parseStrings(file io.Reader) (out []interface{}, err error) { for i := range out { strLen = out[i].(uint32) - - if strLen == 0 { - err = binary.Read(file, binary.LittleEndian, &strLen) - if err != nil { - return nil, err - } - - if strLen > 100000000 { - log.Printf("Bad string length %v, index %v out of %v", strLen, i, num) - for j := 0; j < i; j++ { - log.Printf("Str %v %v", j, out[j]) - } - return nil, errors.New("failed to parse a string len ") - } - out[i] = kBigEmptyBytes[:strLen] - continue - } - buf := make([]byte, strLen) + _, err := io.ReadFull(file, buf) if err != nil { return nil, err @@ -66,6 +48,13 @@ func parseRecords(filename string, cb func(Record) bool) error { defer file.Close() reader := bufio.NewReader(file) + + var version uint8 + binary.Read(reader, binary.LittleEndian, &version) + if version != 2 { + panic("Requires version two replayer, roll back in commits!") + } + recordNum := 0 for { var rec Record