mirror of
https://github.com/dragonflydb/dragonfly
synced 2024-11-21 23:19:53 +00:00
chore: improve connection I/O errors reporting (#1436)
This commit is contained in:
parent
3d651aecf5
commit
fdc2b460e2
@ -528,7 +528,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
|
||||
}
|
||||
|
||||
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
|
||||
LOG(WARNING) << "Socket error " << ec << " " << ec.message();
|
||||
string conn_info = service_->GetContextInfo(cc_.get());
|
||||
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec
|
||||
<< " " << ec.message();
|
||||
}
|
||||
|
||||
--stats_->num_conns;
|
||||
|
@ -130,9 +130,10 @@ struct ConnectionState {
|
||||
// For set op - it's the flag value we are storing along with the value.
|
||||
// For get op - we use it as a mask of MCGetMask values.
|
||||
uint32_t memcache_flag = 0;
|
||||
bool is_blocking = false; // whether this connection is blocking on a command
|
||||
|
||||
ExecInfo exec_info;
|
||||
ReplicationInfo replicaiton_info;
|
||||
ReplicationInfo replication_info;
|
||||
|
||||
std::unique_ptr<ScriptInfo> script_info;
|
||||
std::unique_ptr<SubscribeInfo> subscribe_info;
|
||||
@ -173,12 +174,16 @@ class ConnectionContext : public facade::ConnectionContext {
|
||||
void ChangeMonitor(bool start); // either start or stop monitor on a given connection
|
||||
|
||||
// Whether this connection is a connection from a replica to its master.
|
||||
// This flag is true only on replica side, where we need to setup a special ConnectionContext
|
||||
// instance that helps applying commands coming from master.
|
||||
bool is_replicating = false;
|
||||
// Reference to a FlowInfo for this connection if from a master to a replica.
|
||||
FlowInfo* replication_flow;
|
||||
|
||||
bool monitor = false; // when a monitor command is sent over a given connection, we need to aware
|
||||
// of it as a state for the connection
|
||||
|
||||
// Reference to a FlowInfo for this connection if from a master to a replica.
|
||||
FlowInfo* replication_flow;
|
||||
|
||||
private:
|
||||
void EnableMonitoring(bool enable) {
|
||||
subscriptions++; // required to support the monitoring
|
||||
|
@ -256,9 +256,8 @@ OpResult<ShardFFResult> FindFirstNonEmptyKey(Transaction* trans, int req_obj_typ
|
||||
}
|
||||
|
||||
// If OK is returned then cb was called on the first non empty key and `out_key` is set to the key.
|
||||
facade::OpStatus RunCbOnFirstNonEmptyBlocking(BlockingResultCb&& func, std::string* out_key,
|
||||
Transaction* trans, int req_obj_type,
|
||||
unsigned limit_ms) {
|
||||
OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
|
||||
BlockingResultCb func, unsigned limit_ms) {
|
||||
auto limit_tp = limit_ms ? std::chrono::steady_clock::now() + std::chrono::milliseconds(limit_ms)
|
||||
: Transaction::time_point::max();
|
||||
bool is_multi = trans->IsMulti();
|
||||
@ -296,19 +295,20 @@ facade::OpStatus RunCbOnFirstNonEmptyBlocking(BlockingResultCb&& func, std::stri
|
||||
return result.status();
|
||||
}
|
||||
|
||||
auto cb = [&func, &ff_result, out_key](Transaction* t, EngineShard* shard) {
|
||||
string result_key;
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) {
|
||||
*out_key = *wake_key;
|
||||
func(t, shard, *out_key);
|
||||
result_key = *wake_key;
|
||||
func(t, shard, result_key);
|
||||
} else if (shard->shard_id() == ff_result.sid) {
|
||||
ff_result.key.GetString(out_key);
|
||||
func(t, shard, *out_key);
|
||||
ff_result.key.GetString(&result_key);
|
||||
func(t, shard, result_key);
|
||||
}
|
||||
return OpStatus::OK;
|
||||
};
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
return OpStatus::OK;
|
||||
return result_key;
|
||||
}
|
||||
|
||||
} // namespace dfly::container_utils
|
||||
|
@ -89,9 +89,8 @@ struct ShardFFResult {
|
||||
OpResult<ShardFFResult> FindFirstNonEmptyKey(Transaction* trans, int req_obj_type);
|
||||
|
||||
using BlockingResultCb = std::function<void(Transaction*, EngineShard*, std::string_view)>;
|
||||
facade::OpStatus RunCbOnFirstNonEmptyBlocking(BlockingResultCb&& cb, std::string* out_key,
|
||||
Transaction* trans, int req_obj_type,
|
||||
unsigned limit_ms);
|
||||
OpResult<std::string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
|
||||
BlockingResultCb cb, unsigned limit_ms);
|
||||
|
||||
}; // namespace container_utils
|
||||
|
||||
|
@ -244,8 +244,8 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
||||
// Set meta info on connection.
|
||||
cntx->owner()->SetName(absl::StrCat("repl_flow_", sync_id));
|
||||
cntx->conn_state.replicaiton_info.repl_session_id = sync_id;
|
||||
cntx->conn_state.replicaiton_info.repl_flow_id = flow_id;
|
||||
cntx->conn_state.replication_info.repl_session_id = sync_id;
|
||||
cntx->conn_state.replication_info.repl_flow_id = flow_id;
|
||||
|
||||
absl::InsecureBitGen gen;
|
||||
string eof_token = GetRandomHex(gen, 40);
|
||||
@ -471,7 +471,7 @@ auto DflyCmd::CreateSyncSession(ConnectionContext* cntx)
|
||||
};
|
||||
|
||||
string address = cntx->owner()->RemoteEndpointAddress();
|
||||
uint32_t port = cntx->conn_state.replicaiton_info.repl_listening_port;
|
||||
uint32_t port = cntx->conn_state.replication_info.repl_listening_port;
|
||||
|
||||
LOG(INFO) << "Registered replica " << address << ":" << port;
|
||||
|
||||
@ -484,7 +484,7 @@ auto DflyCmd::CreateSyncSession(ConnectionContext* cntx)
|
||||
}
|
||||
|
||||
void DflyCmd::OnClose(ConnectionContext* cntx) {
|
||||
unsigned session_id = cntx->conn_state.replicaiton_info.repl_session_id;
|
||||
unsigned session_id = cntx->conn_state.replication_info.repl_session_id;
|
||||
if (!session_id)
|
||||
return;
|
||||
|
||||
|
@ -1200,29 +1200,30 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
|
||||
|
||||
absl::StrAppend(debugMessages.Next(), "BPopGeneric by ", transaction->DebugId());
|
||||
|
||||
std::string popped_key, popped_value;
|
||||
std::string popped_value;
|
||||
auto cb = [dir, &popped_value](Transaction* t, EngineShard* shard, std::string_view key) {
|
||||
popped_value = OpBPop(t, shard, key, dir);
|
||||
};
|
||||
|
||||
OpStatus result = container_utils::RunCbOnFirstNonEmptyBlocking(
|
||||
cb, &popped_key, transaction, OBJ_LIST, unsigned(timeout * 1000));
|
||||
|
||||
if (result == OpStatus::OK) {
|
||||
cntx->conn_state.is_blocking = true;
|
||||
OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
|
||||
transaction, OBJ_LIST, move(cb), unsigned(timeout * 1000));
|
||||
cntx->conn_state.is_blocking = false;
|
||||
if (popped_key) {
|
||||
DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << popped_key; // key.
|
||||
std::string_view str_arr[2] = {popped_key, popped_value};
|
||||
std::string_view str_arr[2] = {*popped_key, popped_value};
|
||||
return (*cntx)->SendStringArr(str_arr);
|
||||
}
|
||||
|
||||
DVLOG(1) << "result for " << transaction->DebugId() << " is " << result;
|
||||
DVLOG(1) << "result for " << transaction->DebugId() << " is " << popped_key.status();
|
||||
|
||||
switch (result) {
|
||||
switch (popped_key.status()) {
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return (*cntx)->SendError(kWrongTypeErr);
|
||||
case OpStatus::TIMED_OUT:
|
||||
return (*cntx)->SendNullArray();
|
||||
default:
|
||||
LOG(ERROR) << "Unexpected error " << result;
|
||||
LOG(ERROR) << "Unexpected error " << popped_key.status();
|
||||
}
|
||||
return (*cntx)->SendNullArray();
|
||||
}
|
||||
|
@ -1844,12 +1844,20 @@ void Service::OnClose(facade::ConnectionContext* cntx) {
|
||||
string Service::GetContextInfo(facade::ConnectionContext* cntx) {
|
||||
char buf[16] = {0};
|
||||
unsigned index = 0;
|
||||
if (cntx->async_dispatch)
|
||||
ConnectionContext* server_cntx = static_cast<ConnectionContext*>(cntx);
|
||||
|
||||
if (server_cntx->async_dispatch)
|
||||
buf[index++] = 'a';
|
||||
|
||||
if (cntx->conn_closing)
|
||||
if (server_cntx->conn_closing)
|
||||
buf[index++] = 't';
|
||||
|
||||
if (server_cntx->conn_state.subscribe_info)
|
||||
buf[index++] = 'P';
|
||||
|
||||
if (server_cntx->conn_state.is_blocking)
|
||||
buf[index++] = 'b';
|
||||
|
||||
return index ? absl::StrCat("flags:", buf) : string();
|
||||
}
|
||||
|
||||
|
@ -1916,7 +1916,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
|
||||
cntx->owner()->SetName(absl::StrCat("repl_ctrl_", sid));
|
||||
|
||||
string sync_id = absl::StrCat("SYNC", sid);
|
||||
cntx->conn_state.replicaiton_info.repl_session_id = sid;
|
||||
cntx->conn_state.replication_info.repl_session_id = sid;
|
||||
|
||||
if (!cntx->replica_conn) {
|
||||
ServerState::tl_connection_stats()->num_replicas += 1;
|
||||
@ -1937,7 +1937,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
|
||||
(*cntx)->SendError(kInvalidIntErr);
|
||||
return;
|
||||
}
|
||||
cntx->conn_state.replicaiton_info.repl_listening_port = replica_listening_port;
|
||||
cntx->conn_state.replication_info.repl_listening_port = replica_listening_port;
|
||||
} else if (cmd == "CLIENT-ID" && args.size() == 2) {
|
||||
std::string client_id{arg};
|
||||
auto& pool = service_.proactor_pool();
|
||||
@ -1949,8 +1949,8 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
VLOG(1) << "Client version for session_id="
|
||||
<< cntx->conn_state.replicaiton_info.repl_session_id << " is " << version;
|
||||
cntx->conn_state.replicaiton_info.repl_version = DflyVersion(version);
|
||||
<< cntx->conn_state.replication_info.repl_session_id << " is " << version;
|
||||
cntx->conn_state.replication_info.repl_version = DflyVersion(version);
|
||||
} else if (cmd == "ACK" && args.size() == 2) {
|
||||
// Don't send error/Ok back through the socket, because we don't want to interleave with
|
||||
// the journal writes that we write into the same socket.
|
||||
|
@ -1171,32 +1171,34 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
|
||||
VLOG(1) << "BZPop timeout(" << timeout << ")";
|
||||
|
||||
Transaction* transaction = cntx->transaction;
|
||||
std::string popped_key;
|
||||
OpResult<ZSetFamily::ScoredArray> popped_array;
|
||||
OpStatus result = container_utils::RunCbOnFirstNonEmptyBlocking(
|
||||
cntx->conn_state.is_blocking = true;
|
||||
OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
|
||||
transaction, OBJ_ZSET,
|
||||
[is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) {
|
||||
popped_array = OpBZPop(t, shard, key, is_max);
|
||||
},
|
||||
&popped_key, transaction, OBJ_ZSET, unsigned(timeout * 1000));
|
||||
unsigned(timeout * 1000));
|
||||
cntx->conn_state.is_blocking = false;
|
||||
|
||||
if (result == OpStatus::OK) {
|
||||
if (popped_key) {
|
||||
DVLOG(1) << "BZPop " << transaction->DebugId() << " popped from key " << popped_key; // key.
|
||||
CHECK(popped_array->size() == 1);
|
||||
(*cntx)->StartArray(3);
|
||||
(*cntx)->SendBulkString(popped_key);
|
||||
(*cntx)->SendBulkString(*popped_key);
|
||||
(*cntx)->SendBulkString(popped_array->front().first);
|
||||
return (*cntx)->SendDouble(popped_array->front().second);
|
||||
}
|
||||
|
||||
DVLOG(1) << "result for " << transaction->DebugId() << " is " << result;
|
||||
DVLOG(1) << "result for " << transaction->DebugId() << " is " << popped_key.status();
|
||||
|
||||
switch (result) {
|
||||
switch (popped_key.status()) {
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return (*cntx)->SendError(kWrongTypeErr);
|
||||
case OpStatus::TIMED_OUT:
|
||||
return (*cntx)->SendNullArray();
|
||||
default:
|
||||
LOG(ERROR) << "Unexpected error " << result;
|
||||
LOG(ERROR) << "Unexpected error " << popped_key.status();
|
||||
}
|
||||
return (*cntx)->SendNullArray();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user