mirror of
https://github.com/dragonflydb/dragonfly
synced 2024-11-22 15:44:13 +00:00
fix: fix index loading (#1742)
* fix: fix index loading Signed-off-by: Vladislav <vladislav.oleshko@gmail.com> Co-authored-by: Kostas Kyrimis <kostaskyrim@gmail.com>
This commit is contained in:
parent
50545fc176
commit
f1ac4b0f35
@ -11,6 +11,7 @@
|
||||
#include "base/logging.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/search/doc_accessors.h"
|
||||
#include "server/server_state.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/object.h"
|
||||
@ -150,10 +151,13 @@ bool DocIndex::Matches(string_view key, unsigned obj_code) const {
|
||||
}
|
||||
|
||||
ShardDocIndex::ShardDocIndex(shared_ptr<DocIndex> index)
|
||||
: base_{index}, indices_{index->schema}, key_index_{} {
|
||||
: base_{std::move(index)}, indices_{{}}, key_index_{} {
|
||||
}
|
||||
|
||||
void ShardDocIndex::Init(const OpArgs& op_args) {
|
||||
void ShardDocIndex::Rebuild(const OpArgs& op_args) {
|
||||
key_index_ = DocKeyIndex{};
|
||||
indices_ = search::FieldIndices{base_->schema};
|
||||
|
||||
auto cb = [this](string_view key, BaseAccessor* doc) { indices_.Add(key_index_.Add(key), doc); };
|
||||
TraverseAllMatching(*base_, op_args, cb);
|
||||
}
|
||||
@ -213,7 +217,11 @@ void ShardDocIndices::InitIndex(const OpArgs& op_args, std::string_view name,
|
||||
shared_ptr<DocIndex> index_ptr) {
|
||||
auto shard_index = make_unique<ShardDocIndex>(index_ptr);
|
||||
auto [it, _] = indices_.emplace(name, move(shard_index));
|
||||
it->second->Init(op_args);
|
||||
|
||||
// Don't build while loading, shutting down, etc.
|
||||
// After loading, indices are rebuilt separately
|
||||
if (ServerState::tlocal()->gstate() == GlobalState::ACTIVE)
|
||||
it->second->Rebuild(op_args);
|
||||
|
||||
op_args.shard->db_slice().SetDocDeletionCallback(
|
||||
[this](string_view key, const DbContext& cntx, const PrimeValue& pv) {
|
||||
@ -235,6 +243,11 @@ bool ShardDocIndices::DropIndex(string_view name) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void ShardDocIndices::RebuildAllIndices(const OpArgs& op_args) {
|
||||
for (auto& [_, ptr] : indices_)
|
||||
ptr->Rebuild(op_args);
|
||||
}
|
||||
|
||||
vector<string> ShardDocIndices::GetIndexNames() const {
|
||||
vector<string> names{};
|
||||
names.reserve(indices_.size());
|
||||
|
@ -75,6 +75,7 @@ class ShardDocIndex {
|
||||
struct DocKeyIndex {
|
||||
DocId Add(std::string_view key);
|
||||
DocId Remove(std::string_view key);
|
||||
|
||||
std::string_view Get(DocId id) const;
|
||||
size_t Size() const;
|
||||
|
||||
@ -86,14 +87,15 @@ class ShardDocIndex {
|
||||
};
|
||||
|
||||
public:
|
||||
// Index must be rebuilt at least once after intialization
|
||||
ShardDocIndex(std::shared_ptr<DocIndex> index);
|
||||
|
||||
// Perform search on all indexed documents and return results.
|
||||
SearchResult Search(const OpArgs& op_args, const SearchParams& params,
|
||||
search::SearchAlgorithm* search_algo) const;
|
||||
|
||||
// Initialize index. Traverses all matching documents and assigns ids.
|
||||
void Init(const OpArgs& op_args);
|
||||
// Clears internal data. Traverses all matching documents and assigns ids.
|
||||
void Rebuild(const OpArgs& op_args);
|
||||
|
||||
// Return whether base index matches
|
||||
bool Matches(std::string_view key, unsigned obj_code) const;
|
||||
@ -114,11 +116,17 @@ class ShardDocIndices {
|
||||
public:
|
||||
// Get sharded document index by its name or nullptr if not found
|
||||
ShardDocIndex* GetIndex(std::string_view name);
|
||||
// Init index: create shard local state for given index with given name
|
||||
|
||||
// Init index: create shard local state for given index with given name.
|
||||
// Build if instance is in active state.
|
||||
void InitIndex(const OpArgs& op_args, std::string_view name, std::shared_ptr<DocIndex> index);
|
||||
|
||||
// Drop index, return true if it existed and was dropped
|
||||
bool DropIndex(std::string_view name);
|
||||
|
||||
// Rebuild all indices
|
||||
void RebuildAllIndices(const OpArgs& op_args);
|
||||
|
||||
std::vector<std::string> GetIndexNames() const;
|
||||
|
||||
void AddDoc(std::string_view key, const DbContext& db_cnt, const PrimeValue& pv);
|
||||
|
@ -779,6 +779,15 @@ struct SaveStagesController : public SaveStagesInputs {
|
||||
Mutex rdb_name_map_mu_;
|
||||
};
|
||||
|
||||
void RebuildAllSearchIndices(Service* service) {
|
||||
boost::intrusive_ptr<Transaction> trans{new Transaction{service->FindCmd("FT.CREATE")}};
|
||||
trans->InitByArgs(0, {});
|
||||
trans->ScheduleSingleHop([](auto* trans, auto* es) {
|
||||
es->search_indices()->RebuildAllIndices(trans->GetOpArgs(es));
|
||||
return OpStatus::OK;
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
|
||||
@ -1073,6 +1082,8 @@ Future<std::error_code> ServerFamily::Load(const std::string& load_path) {
|
||||
fiber.Join();
|
||||
}
|
||||
|
||||
RebuildAllSearchIndices(&service_);
|
||||
|
||||
LOG(INFO) << "Load finished, num keys read: " << aggregated_result->keys_read;
|
||||
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
|
||||
ec_promise.set_value(*(aggregated_result->first_error));
|
||||
|
Loading…
Reference in New Issue
Block a user