feat(webserver): use logkit to simplify logging to db for background jobs. (#2497)

* switch to log crate for dblogger

* fix

* make fix

* update
This commit is contained in:
Meng Zhang 2024-06-25 14:01:37 +08:00 committed by GitHub
parent df1108d3ff
commit b7d2cf6044
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 326 additions and 112 deletions

168
Cargo.lock generated
View File

@ -1135,6 +1135,24 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "encoder"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03f6928ad5c6efcdae42eb068dff8a555ef2f057c92bbd491ddf5610f6444987"
dependencies = [
"encoder-ryu",
"indexmap 2.2.6",
"serde_json",
"simd-json",
]
[[package]]
name = "encoder-ryu"
version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e27addc39f5f73c85604bfe21b59fe93717f9765194015d92bde1db11e8ccef"
[[package]]
name = "encoding_rs"
version = "0.8.34"
@ -1238,6 +1256,15 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "float-cmp"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4"
dependencies = [
"num-traits",
]
[[package]]
name = "flume"
version = "0.11.0"
@ -1683,6 +1710,16 @@ dependencies = [
"tracing",
]
[[package]]
name = "halfbrown"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8588661a8607108a5ca69cab034063441a0413a0b041c13618a7dd348021ef6f"
dependencies = [
"hashbrown 0.14.5",
"serde",
]
[[package]]
name = "hash-ids"
version = "0.2.1"
@ -2399,6 +2436,70 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c2cdeb66e45e9f36bfad5bbdb4d2384e70936afbee843c6f6543f0c551ebb25"
[[package]]
name = "lexical-core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46"
dependencies = [
"lexical-parse-float",
"lexical-parse-integer",
"lexical-util",
"lexical-write-float",
"lexical-write-integer",
]
[[package]]
name = "lexical-parse-float"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f"
dependencies = [
"lexical-parse-integer",
"lexical-util",
"static_assertions",
]
[[package]]
name = "lexical-parse-integer"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9"
dependencies = [
"lexical-util",
"static_assertions",
]
[[package]]
name = "lexical-util"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc"
dependencies = [
"static_assertions",
]
[[package]]
name = "lexical-write-float"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862"
dependencies = [
"lexical-util",
"lexical-write-integer",
"static_assertions",
]
[[package]]
name = "lexical-write-integer"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446"
dependencies = [
"lexical-util",
"static_assertions",
]
[[package]]
name = "libc"
version = "0.2.155"
@ -2518,6 +2619,17 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
[[package]]
name = "logkit"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b517d00135d2ea552dc1f6cbc5da9d8953f895c91db680c288d53c50ca309767"
dependencies = [
"backtrace",
"chrono",
"encoder",
]
[[package]]
name = "loom"
version = "0.5.6"
@ -3718,6 +3830,26 @@ dependencies = [
"bitflags 2.5.0",
]
[[package]]
name = "ref-cast"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccf0a6f84d5f1d581da8b41b47ec8600871962f2a528115b542b362d4b744931"
dependencies = [
"ref-cast-impl",
]
[[package]]
name = "ref-cast-impl"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "regex"
version = "1.10.4"
@ -4343,6 +4475,28 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "simd-json"
version = "0.13.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "570c430b3d902ea083097e853263ae782dfe40857d93db019a12356c8e8143fa"
dependencies = [
"getrandom 0.2.15",
"halfbrown",
"lexical-core",
"ref-cast",
"serde",
"serde_json",
"simdutf8",
"value-trait",
]
[[package]]
name = "simdutf8"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
[[package]]
name = "similar"
version = "2.5.0"
@ -5033,6 +5187,7 @@ dependencies = [
"ignore",
"insta",
"lazy_static",
"logkit",
"readable-readability",
"serde",
"serde_json",
@ -5109,6 +5264,7 @@ dependencies = [
"juniper_graphql_ws",
"lazy_static",
"lettre",
"logkit",
"mime_guess",
"octocrab",
"pin-project",
@ -6169,6 +6325,18 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "value-trait"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dad8db98c1e677797df21ba03fca7d3bf9bec3ca38db930954e4fe6e1ea27eb4"
dependencies = [
"float-cmp",
"halfbrown",
"itoa 1.0.11",
"ryu",
]
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@ -63,6 +63,7 @@ tower-http = "0.5"
mime_guess = "2.0.4"
assert_matches = "1.5"
insta = "1.34.0"
logkit = "0.3"
[workspace.dependencies.uuid]
version = "1.3.3"

View File

@ -1,7 +1,6 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tabby_inference::Embedding;
use tracing::debug;
pub struct LlamaCppEngine {
client: reqwest::Client,
@ -34,13 +33,6 @@ struct EmbeddingResponse {
#[async_trait]
impl Embedding for LlamaCppEngine {
async fn embed(&self, prompt: &str) -> anyhow::Result<Vec<f32>> {
// Workaround for https://github.com/ggerganov/llama.cpp/issues/6722
// When prompt is super short, we just return an empty embedding vector.
if prompt.len() < 8 {
debug!("Prompt length is {:?}, which is too short for llama.cpp embedding, returning empty embedding vector.", prompt.len());
return Ok(vec![]);
}
let request = EmbeddingRequest {
content: prompt.to_owned(),
};

View File

@ -174,7 +174,7 @@ impl Default for ServerConfig {
fn default_embedding_config() -> ModelConfig {
ModelConfig::Local(LocalModelConfig {
model_id: "Nomic-Embed-Text".into(),
parallelism: 4,
parallelism: 1,
num_gpu_layers: 9999,
})
}

View File

@ -37,6 +37,7 @@ tabby-inference = { path = "../tabby-inference" }
git2.workspace = true
insta.workspace = true
async-trait.workspace = true
logkit.workspace = true
[dev-dependencies]
temp_testdir = { workspace = true }

View File

@ -5,7 +5,7 @@ use futures::StreamExt;
use ignore::Walk;
use tabby_common::config::RepositoryConfig;
use tabby_inference::Embedding;
use tracing::{debug, warn};
use tracing::warn;
use super::{
create_code_index,
@ -38,7 +38,7 @@ pub async fn garbage_collection() {
}
}
debug!("Finished garbage collection for code index: {num_to_keep} items kept, {num_to_delete} items removed");
logkit::info!("Finished garbage collection for code index: {num_to_keep} items kept, {num_to_delete} items removed");
index.commit();
}.collect::<()>().await;
}
@ -64,6 +64,8 @@ async fn add_changed_documents(repository: &RepositoryConfig, index: Indexer<Sou
continue;
}
logkit::info!("Indexing file: {}", file.path().display());
let Some(code) = intelligence.compute_source_file(repository, file.path()) else {
continue;
};
@ -77,6 +79,24 @@ async fn add_changed_documents(repository: &RepositoryConfig, index: Indexer<Sou
index.add(code).await;
});
}
wait_for_index(index).await;
}
async fn wait_for_index(index: Arc<Indexer<SourceCode>>) {
let mut current_index = Box::new(index);
loop {
match Arc::try_unwrap(*current_index) {
Ok(index) => {
index.commit();
break;
}
Err(index) => {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
*current_index = index;
}
}
}
}
fn is_valid_file(file: &SourceCode) -> bool {

View File

@ -9,7 +9,7 @@ use tabby_common::{
index::{code, corpus},
};
use tabby_inference::Embedding;
use tracing::{debug, warn};
use tracing::warn;
use self::intelligence::SourceCode;
use crate::{code::intelligence::CodeIntelligence, IndexAttributeBuilder, Indexer};
@ -26,7 +26,10 @@ pub struct CodeIndexer {}
impl CodeIndexer {
pub async fn refresh(&mut self, embedding: Arc<dyn Embedding>, repository: &RepositoryConfig) {
debug!("Refreshing repository: {}", repository.canonical_git_url());
logkit::info!(
"Building source code index: {}",
repository.canonical_git_url()
);
repository::sync_repository(repository);
index::index_repository(embedding, repository).await;

View File

@ -61,7 +61,7 @@ async fn crawl_url(start_url: &str) -> anyhow::Result<impl Stream<Item = KatanaR
}
// Skip if the content is larger than 1M.
if data.response.raw.len() > 1_000_000 {
if data.response.raw.as_ref().is_some_and(|x| x.len() > 1_000_000) {
debug!("Skipping {} as the content is larger than 1M", data.request.endpoint);
continue;
}
@ -138,7 +138,7 @@ mod tests {
headers,
body: Some("<p>Hello, World!</p>".to_owned()),
technologies: Default::default(),
raw: "HTTP/1.1 200 OK\nContent-Type: text/html\n".to_owned(),
raw: Some("HTTP/1.1 200 OK\nContent-Type: text/html\n".to_owned()),
},
};

View File

@ -22,7 +22,7 @@ pub struct KatanaResponse {
pub headers: HashMap<String, String>,
pub body: Option<String>,
pub technologies: Option<Vec<String>>,
pub raw: String,
pub raw: Option<String>,
}
#[derive(Serialize)]

View File

@ -8,7 +8,6 @@ use tantivy::{
schema::{self, IndexRecordOption, Value},
DocAddress, DocSet, IndexWriter, Searcher, TantivyDocument, Term, TERMINATED,
};
use tracing::debug;
use crate::tantivy_utils::open_or_create_index;
@ -122,7 +121,7 @@ impl<T: Send + 'static> Indexer<T> {
}
pub fn commit(mut self) {
debug!("Committing changes to index...");
logkit::info!("Committing changes to index...");
self.writer.commit().expect("Failed to commit changes");
self.writer
.wait_merging_threads()

View File

@ -9,34 +9,29 @@ pub use code::CodeIndexer;
use crawl::crawl_pipeline;
use doc::create_web_index;
pub use doc::{DocIndexer, WebDocument};
use futures::{Future, StreamExt};
use futures::StreamExt;
use indexer::{IndexAttributeBuilder, Indexer};
use tabby_inference::Embedding;
mod doc;
use std::sync::Arc;
use tracing::{debug, info};
use crate::doc::SourceDocument;
pub async fn crawl_index_docs<F>(
pub async fn crawl_index_docs(
urls: &[String],
embedding: Arc<dyn Embedding>,
on_process_url: impl Fn(String) -> F,
) -> anyhow::Result<()>
where
F: Future<Output = ()>,
{
on_process_url: impl Fn(String),
) -> anyhow::Result<()> {
for url in urls {
debug!("Starting doc index pipeline for {url}");
logkit::info!("Starting doc index pipeline for {url}");
let embedding = embedding.clone();
let mut num_docs = 0;
let doc_index = create_web_index(embedding.clone());
let mut pipeline = Box::pin(crawl_pipeline(url).await?);
while let Some(doc) = pipeline.next().await {
on_process_url(doc.url.clone()).await;
on_process_url(doc.url.clone());
let source_doc = SourceDocument {
id: doc.url.clone(),
title: doc.metadata.title.unwrap_or_default(),
@ -47,7 +42,7 @@ where
num_docs += 1;
doc_index.add(source_doc).await;
}
info!("Crawled {} documents from '{}'", num_docs, url);
logkit::info!("Crawled {} documents from '{}'", num_docs, url);
doc_index.commit();
}
Ok(())

View File

@ -51,6 +51,7 @@ uuid.workspace = true
strum.workspace = true
cron = "0.12.1"
async-stream.workspace = true
logkit.workspace = true
[dev-dependencies]
assert_matches.workspace = true

View File

@ -8,11 +8,7 @@ use tabby_inference::Embedding;
use tabby_scheduler::CodeIndexer;
use tabby_schema::{job::JobService, repository::GitRepositoryService};
use super::{
cprintln,
helper::{Job, JobLogger},
BackgroundJobEvent,
};
use super::{helper::Job, BackgroundJobEvent};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SchedulerGitJob {
@ -30,19 +26,10 @@ impl Job for SchedulerGitJob {
}
impl SchedulerGitJob {
pub async fn run(
self,
job_logger: JobLogger,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
pub async fn run(self, embedding: Arc<dyn Embedding>) -> tabby_schema::Result<()> {
let repository = self.repository.clone();
tokio::spawn(async move {
let mut code = CodeIndexer::default();
cprintln!(
job_logger,
"Refreshing repository {}",
repository.canonical_git_url()
);
code.refresh(embedding, &repository).await;
})
.await

View File

@ -1,33 +1,93 @@
use tabby_db::DbConn;
use tracing::warn;
#[derive(Clone)]
pub struct JobLogger {
id: i64,
db: DbConn,
handle: tokio::task::JoinHandle<()>,
}
impl JobLogger {
pub async fn new(db: DbConn, id: i64) -> Self {
Self { id, db }
pub fn new(db: DbConn, id: i64) -> Self {
let mut logger = logkit::Logger::new(None);
logger.mount(logkit::LevelPlugin);
logger.mount(logkit::TimePlugin::from_micros());
let (target, handle) = DbTarget::new(db, id);
logger.route(target);
logkit::set_default_logger(logger);
Self { handle }
}
pub async fn r#internal_println(&self, stdout: String) {
let stdout = stdout + "\n";
match self.db.update_job_stdout(self.id, stdout).await {
Ok(_) => (),
Err(_) => {
warn!("Failed to write stdout to job `{}`", self.id);
}
}
}
pub async fn complete(&mut self, exit_code: i32) {
match self.db.update_job_status(self.id, exit_code).await {
Ok(_) => (),
Err(_) => {
warn!("Failed to complete job `{}`", self.id);
}
}
pub async fn finalize(self) {
logkit::set_default_logger(logkit::Logger::new(None));
self.handle.await.unwrap_or_else(|err| {
warn!("Failed to join logging thread: {}", err);
});
}
}
struct DbTarget {
tx: tokio::sync::mpsc::Sender<Record>,
}
impl DbTarget {
fn new(db: DbConn, id: i64) -> (Self, tokio::task::JoinHandle<()>) {
let (tx, rx) = tokio::sync::mpsc::channel::<Record>(100);
let handle = Self::create_logging_thread(db, id, rx);
(Self { tx }, handle)
}
}
impl DbTarget {
fn create_logging_thread(
db: DbConn,
id: i64,
mut rx: tokio::sync::mpsc::Receiver<Record>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(record) = rx.recv().await {
let stdout = format!(
"{} [{}]: {}\n",
record.time,
record.level.to_uppercase(),
record.msg
);
match db.update_job_stdout(id, stdout).await {
Ok(_) => (),
Err(_) => {
warn!("Failed to write stdout to job `{}`", id);
}
}
if let Some(exit_code) = record.exit_code {
match db.update_job_status(id, exit_code).await {
Ok(_) => (),
Err(_) => {
warn!("Failed to write exit code to job `{}`", id);
}
}
}
}
})
}
}
#[derive(serde::Deserialize)]
struct Record {
level: String,
time: String,
msg: String,
exit_code: Option<i32>,
}
impl logkit::Target for DbTarget {
fn write(&self, buf: &[u8]) {
let Ok(record) = serde_json::from_slice::<Record>(buf) else {
warn!("Failed to parse log record");
return;
};
self.tx.try_send(record).unwrap_or_else(|err| {
warn!("Failed to send log record: {}", err);
});
}
}

View File

@ -21,7 +21,7 @@ use tabby_schema::{
repository::{GitRepositoryService, ThirdPartyRepositoryService},
};
use third_party_integration::SchedulerGithubGitlabJob;
use tracing::warn;
use tracing::{debug, warn};
use web_crawler::WebCrawlerJob;
use self::{db::DbMaintainanceJob, third_party_integration::SyncIntegrationJob};
@ -72,17 +72,17 @@ pub async fn start(
continue;
};
let mut job_logger = JobLogger::new(db.clone(), job.id).await;
let logger = JobLogger::new(db.clone(), job.id);
debug!("Background job {} started, command: {}", job.id, job.command);
let Ok(event) = serde_json::from_str::<BackgroundJobEvent>(&job.command) else {
cprintln!(job_logger, "Failed to parse background job event, marking it as failed");
job_logger.complete(-1).await;
logkit::info!(exit_code = -1; "Failed to parse background job event, marking it as failed");
continue;
};
if let Err(err) = match event {
BackgroundJobEvent::SchedulerGitRepository(repository_config) => {
let job = SchedulerGitJob::new(repository_config);
job.run(job_logger.clone(), embedding.clone()).await
job.run(embedding.clone()).await
},
BackgroundJobEvent::SyncThirdPartyRepositories(integration_id) => {
let job = SyncIntegrationJob::new(integration_id);
@ -90,18 +90,19 @@ pub async fn start(
}
BackgroundJobEvent::SchedulerGithubGitlabRepository(integration_id) => {
let job = SchedulerGithubGitlabJob::new(integration_id);
job.run(job_logger.clone(), embedding.clone(), third_party_repository_service.clone(), integration_service.clone()).await
job.run(embedding.clone(), third_party_repository_service.clone(), integration_service.clone()).await
}
BackgroundJobEvent::WebCrawler(url) => {
let job = WebCrawlerJob::new(url);
job.run(job_logger.clone(), embedding.clone()).await
job.run(embedding.clone()).await
}
} {
cprintln!(job_logger, "{:?}", err);
job_logger.complete(-1).await;
logkit::info!(exit_code = 1; "Job failed {}", err);
} else {
job_logger.complete(0).await;
logkit::info!(exit_code = 0; "Job completed successfully");
}
logger.finalize().await;
debug!("Background job {} completed", job.id);
},
Some(now) = hourly.next() => {
if let Err(err) = DbMaintainanceJob::cron(now, db.clone()).await {
@ -128,14 +129,3 @@ pub async fn start(
}
});
}
macro_rules! cprintln {
($ctx:expr, $($params:tt)+) => {
{
tracing::debug!($($params)+);
$ctx.r#internal_println(format!($($params)+)).await;
}
}
}
use cprintln;

View File

@ -6,7 +6,7 @@ use juniper::ID;
use serde::{Deserialize, Serialize};
use tabby_common::config::RepositoryConfig;
use tabby_inference::Embedding;
use tabby_scheduler::DocIndexer;
use tabby_scheduler::{CodeIndexer, DocIndexer};
use tabby_schema::{
integration::{IntegrationKind, IntegrationService},
job::JobService,
@ -14,11 +14,7 @@ use tabby_schema::{
};
use tracing::debug;
use super::{
git::SchedulerGitJob,
helper::{Job, JobLogger},
BackgroundJobEvent,
};
use super::{helper::Job, BackgroundJobEvent};
mod issues;
@ -85,7 +81,6 @@ impl SchedulerGithubGitlabJob {
pub async fn run(
self,
job_logger: JobLogger,
embedding: Arc<dyn Embedding>,
repository_service: Arc<dyn ThirdPartyRepositoryService>,
integration_service: Arc<dyn IntegrationService>,
@ -101,14 +96,16 @@ impl SchedulerGithubGitlabJob {
.kind
.format_authenticated_url(&repository.git_url, &integration.access_token)?;
let repo = RepositoryConfig::new(authenticated_url);
// First, run the regular scheduler job to sync and index the repository
SchedulerGitJob::new(repo)
.run(job_logger, embedding.clone())
.await?;
logkit::info!(
"Pulling source code for repository {}",
repository.display_name
);
let mut code = CodeIndexer::default();
code.refresh(embedding.clone(), &RepositoryConfig::new(authenticated_url))
.await;
debug!("Indexing issues for repository {}", repository.display_name);
logkit::info!("Indexing issues for repository {}", repository.display_name);
let index = DocIndexer::new(embedding);
match &integration.kind {

View File

@ -2,10 +2,7 @@ use std::sync::Arc;
use tabby_inference::Embedding;
use super::{
cprintln,
helper::{Job, JobLogger},
};
use super::helper::Job;
pub struct WebCrawlerJob {
url: String,
@ -20,14 +17,9 @@ impl WebCrawlerJob {
Self { url }
}
pub async fn run(
self,
job_logger: JobLogger,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
pub async fn run(self, embedding: Arc<dyn Embedding>) -> tabby_schema::Result<()> {
tabby_scheduler::crawl_index_docs(&[self.url], embedding, move |url| {
let job_logger = job_logger.clone();
async move { cprintln!(job_logger, "Fetching {url}") }
logkit::info!("Fetching {}", url);
})
.await?;
Ok(())

View File

@ -0,0 +1,8 @@
id: do-not-use-logkit-crate
message: Don't use logkit crate with use statement to avoid conflicts with the tracing crate. logkit crate is only used for background job logging to enrich the jobs output in admin UI.
severity: error
language: rust
files:
- ./**
rule:
pattern: use logkit::$$$;