diff --git a/src/server/search/doc_index.cc b/src/server/search/doc_index.cc index 16bba08fc..c70da8f80 100644 --- a/src/server/search/doc_index.cc +++ b/src/server/search/doc_index.cc @@ -11,6 +11,7 @@ #include "base/logging.h" #include "server/engine_shard_set.h" #include "server/search/doc_accessors.h" +#include "server/server_state.h" extern "C" { #include "redis/object.h" @@ -150,10 +151,13 @@ bool DocIndex::Matches(string_view key, unsigned obj_code) const { } ShardDocIndex::ShardDocIndex(shared_ptr index) - : base_{index}, indices_{index->schema}, key_index_{} { + : base_{std::move(index)}, indices_{{}}, key_index_{} { } -void ShardDocIndex::Init(const OpArgs& op_args) { +void ShardDocIndex::Rebuild(const OpArgs& op_args) { + key_index_ = DocKeyIndex{}; + indices_ = search::FieldIndices{base_->schema}; + auto cb = [this](string_view key, BaseAccessor* doc) { indices_.Add(key_index_.Add(key), doc); }; TraverseAllMatching(*base_, op_args, cb); } @@ -213,7 +217,11 @@ void ShardDocIndices::InitIndex(const OpArgs& op_args, std::string_view name, shared_ptr index_ptr) { auto shard_index = make_unique(index_ptr); auto [it, _] = indices_.emplace(name, move(shard_index)); - it->second->Init(op_args); + + // Don't build while loading, shutting down, etc. + // After loading, indices are rebuilt separately + if (ServerState::tlocal()->gstate() == GlobalState::ACTIVE) + it->second->Rebuild(op_args); op_args.shard->db_slice().SetDocDeletionCallback( [this](string_view key, const DbContext& cntx, const PrimeValue& pv) { @@ -235,6 +243,11 @@ bool ShardDocIndices::DropIndex(string_view name) { return true; } +void ShardDocIndices::RebuildAllIndices(const OpArgs& op_args) { + for (auto& [_, ptr] : indices_) + ptr->Rebuild(op_args); +} + vector ShardDocIndices::GetIndexNames() const { vector names{}; names.reserve(indices_.size()); diff --git a/src/server/search/doc_index.h b/src/server/search/doc_index.h index d9d324413..3f4b1c73a 100644 --- a/src/server/search/doc_index.h +++ b/src/server/search/doc_index.h @@ -75,6 +75,7 @@ class ShardDocIndex { struct DocKeyIndex { DocId Add(std::string_view key); DocId Remove(std::string_view key); + std::string_view Get(DocId id) const; size_t Size() const; @@ -86,14 +87,15 @@ class ShardDocIndex { }; public: + // Index must be rebuilt at least once after intialization ShardDocIndex(std::shared_ptr index); // Perform search on all indexed documents and return results. SearchResult Search(const OpArgs& op_args, const SearchParams& params, search::SearchAlgorithm* search_algo) const; - // Initialize index. Traverses all matching documents and assigns ids. - void Init(const OpArgs& op_args); + // Clears internal data. Traverses all matching documents and assigns ids. + void Rebuild(const OpArgs& op_args); // Return whether base index matches bool Matches(std::string_view key, unsigned obj_code) const; @@ -114,11 +116,17 @@ class ShardDocIndices { public: // Get sharded document index by its name or nullptr if not found ShardDocIndex* GetIndex(std::string_view name); - // Init index: create shard local state for given index with given name + + // Init index: create shard local state for given index with given name. + // Build if instance is in active state. void InitIndex(const OpArgs& op_args, std::string_view name, std::shared_ptr index); + // Drop index, return true if it existed and was dropped bool DropIndex(std::string_view name); + // Rebuild all indices + void RebuildAllIndices(const OpArgs& op_args); + std::vector GetIndexNames() const; void AddDoc(std::string_view key, const DbContext& db_cnt, const PrimeValue& pv); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b02aa07b5..30d802a7a 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -779,6 +779,15 @@ struct SaveStagesController : public SaveStagesInputs { Mutex rdb_name_map_mu_; }; +void RebuildAllSearchIndices(Service* service) { + boost::intrusive_ptr trans{new Transaction{service->FindCmd("FT.CREATE")}}; + trans->InitByArgs(0, {}); + trans->ScheduleSingleHop([](auto* trans, auto* es) { + es->search_indices()->RebuildAllIndices(trans->GetOpArgs(es)); + return OpStatus::OK; + }); +} + } // namespace std::optional ParseSaveSchedule(string_view time) { @@ -1073,6 +1082,8 @@ Future ServerFamily::Load(const std::string& load_path) { fiber.Join(); } + RebuildAllSearchIndices(&service_); + LOG(INFO) << "Load finished, num keys read: " << aggregated_result->keys_read; service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); ec_promise.set_value(*(aggregated_result->first_error));