Add and prepare rust worker management system for file information processing and knowledge base framework
This commit is contained in:
parent
af82b71657
commit
da6ab3a782
12 changed files with 1402 additions and 251 deletions
160
rust-engine/src/worker.rs
Normal file
160
rust-engine/src/worker.rs
Normal file
|
|
@ -0,0 +1,160 @@
|
|||
use crate::gemini_client::{demo_text_embedding, DEMO_EMBED_DIM};
|
||||
use crate::models::{QueryRecord, QueryStatus};
|
||||
use crate::vector_db::QdrantClient;
|
||||
use anyhow::Result;
|
||||
use sqlx::MySqlPool;
|
||||
use std::time::Duration;
|
||||
use tracing::{error, info};
|
||||
|
||||
pub struct Worker {
|
||||
pool: MySqlPool,
|
||||
qdrant: QdrantClient,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub fn new(pool: MySqlPool) -> Self {
|
||||
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
|
||||
let qdrant = QdrantClient::new(&qdrant_url);
|
||||
Self { pool, qdrant }
|
||||
}
|
||||
|
||||
pub async fn run(&self) {
|
||||
info!("Worker starting");
|
||||
|
||||
// Ensure qdrant collection exists
|
||||
if let Err(e) = self.qdrant.ensure_files_collection(DEMO_EMBED_DIM).await {
|
||||
error!("Failed to ensure Qdrant collection: {}", e);
|
||||
}
|
||||
|
||||
// Requeue stale InProgress jobs older than cutoff (e.g., 10 minutes)
|
||||
if let Err(e) = self.requeue_stale_inprogress(10 * 60).await {
|
||||
error!("Failed to requeue stale jobs: {}", e);
|
||||
}
|
||||
|
||||
loop {
|
||||
// Claim next queued query
|
||||
match self.fetch_and_claim().await {
|
||||
Ok(Some(mut q)) => {
|
||||
info!("Processing query {}", q.id);
|
||||
if let Err(e) = self.process_query(&mut q).await {
|
||||
error!("Error processing {}: {}", q.id, e);
|
||||
let _ = self.mark_failed(&q.id, &format!("{}", e)).await;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Worker fetch error: {}", e);
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_and_claim(&self) -> Result<Option<QueryRecord>> {
|
||||
// Note: MySQL transactional SELECT FOR UPDATE handling is more complex; for this hackathon scaffold
|
||||
// we do a simple two-step: select one queued id, then update it to InProgress if it is still queued.
|
||||
if let Some(row) = sqlx::query("SELECT id, payload FROM queries WHERE status = 'Queued' ORDER BY created_at LIMIT 1")
|
||||
.fetch_optional(&self.pool)
|
||||
.await?
|
||||
{
|
||||
use sqlx::Row;
|
||||
let id: String = row.get("id");
|
||||
let payload: serde_json::Value = row.get("payload");
|
||||
|
||||
let updated = sqlx::query("UPDATE queries SET status = 'InProgress' WHERE id = ? AND status = 'Queued'")
|
||||
.bind(&id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
if updated.rows_affected() == 1 {
|
||||
let mut q = QueryRecord::new(payload);
|
||||
q.id = id;
|
||||
q.status = QueryStatus::InProgress;
|
||||
return Ok(Some(q));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn process_query(&self, q: &mut QueryRecord) -> Result<()> {
|
||||
// Stage 1: set InProgress (idempotent)
|
||||
self.update_status(&q.id, QueryStatus::InProgress).await?;
|
||||
|
||||
// Stage 2: embed query text
|
||||
let text = q.payload.get("q").and_then(|v| v.as_str()).unwrap_or("");
|
||||
let emb = demo_text_embedding(text).await?;
|
||||
|
||||
// Check cancellation
|
||||
if self.is_cancelled(&q.id).await? { return Ok(()); }
|
||||
|
||||
// Stage 3: search top-K in Qdrant
|
||||
let top_ids = self.qdrant.search_top_k(emb, 5).await.unwrap_or_default();
|
||||
|
||||
// Check cancellation
|
||||
if self.is_cancelled(&q.id).await? { return Ok(()); }
|
||||
|
||||
// Stage 4: persist results
|
||||
let result = serde_json::json!({
|
||||
"summary": format!("Found {} related files", top_ids.len()),
|
||||
"related_file_ids": top_ids,
|
||||
});
|
||||
sqlx::query("UPDATE queries SET status = 'Completed', result = ? WHERE id = ?")
|
||||
.bind(result)
|
||||
.bind(&q.id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_status(&self, id: &str, status: QueryStatus) -> Result<()> {
|
||||
let s = match status {
|
||||
QueryStatus::Queued => "Queued",
|
||||
QueryStatus::InProgress => "InProgress",
|
||||
QueryStatus::Completed => "Completed",
|
||||
QueryStatus::Cancelled => "Cancelled",
|
||||
QueryStatus::Failed => "Failed",
|
||||
};
|
||||
sqlx::query("UPDATE queries SET status = ? WHERE id = ?")
|
||||
.bind(s)
|
||||
.bind(id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_failed(&self, id: &str, message: &str) -> Result<()> {
|
||||
let result = serde_json::json!({"error": message});
|
||||
sqlx::query("UPDATE queries SET status = 'Failed', result = ? WHERE id = ?")
|
||||
.bind(result)
|
||||
.bind(id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn requeue_stale_inprogress(&self, age_secs: i64) -> Result<()> {
|
||||
// MySQL: requeue items updated_at < now()-age and status = InProgress
|
||||
sqlx::query(
|
||||
"UPDATE queries SET status = 'Queued' WHERE status = 'InProgress' AND updated_at < (NOW() - INTERVAL ? SECOND)"
|
||||
)
|
||||
.bind(age_secs)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn is_cancelled(&self, id: &str) -> Result<bool> {
|
||||
if let Some(row) = sqlx::query("SELECT status FROM queries WHERE id = ?")
|
||||
.bind(id)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?
|
||||
{
|
||||
use sqlx::Row;
|
||||
let s: String = row.get("status");
|
||||
return Ok(s == "Cancelled");
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue