diff --git a/rust-engine/README.md b/rust-engine/README.md new file mode 100644 index 0000000..32f377a --- /dev/null +++ b/rust-engine/README.md @@ -0,0 +1,73 @@ +# Rust Engine API and Worker + +## Overview + +- HTTP API (warp) under /api for file management and query lifecycle +- MySQL for metadata, Qdrant for vector similarity +- Background worker resumes queued work and re-queues stale InProgress jobs at startup + +## Environment variables + +- DATABASE_URL: mysql://USER:PASS@HOST:3306/DB +- QDRANT_URL: default +- GEMINI_API_KEY: used for Gemini content generation (optional in demo) + +## Endpoints (JSON) + +- POST /api/files (multipart) + - Form: file=@path + - Response: {"success": true} + +- GET /api/files/list + - Response: {"files": [{"id","filename","path","description"}]} + +- GET /api/files/delete?id= + - Response: {"deleted": true|false} + +- POST /api/query/create + - Body: {"q": "text", "top_k": 5} + - Response: {"id": "uuid"} + +- GET /api/query/status?id= + - Response: {"status": "Queued"|"InProgress"|"Completed"|"Cancelled"|"Failed"|"not_found"} + +- GET /api/query/result?id= + - Response (Completed): + { + "result": { + "summary": "Found N related files", + "related_files": [ + {"id","filename","path","description","score"} + ], + "relationships": "...", + "final_answer": "..." + } + } + +- GET /api/query/cancel?id= + - Response: {"cancelled": true} + +## Worker behavior + +- Ensures Qdrant collection exists (dim 64, cosine) +- Re-queues InProgress older than 10 minutes +- Processing stages: + 1) Set InProgress + 2) Embed query text (demo now; pluggable Gemini later) + 3) Search Qdrant top_k (default 5) + 4) Join file metadata (MySQL) + 5) Gemini step: relationship analysis (strictly from provided files) + 6) Gemini step: final answer (no speculation; say unknown if insufficient) + 7) Persist result (JSON) and set Completed + - Checks for cancellation between stages + +## Local quickstart + +1. docker compose up -d mysql qdrant +2. set env DATABASE_URL and QDRANT_URL +3. cargo run + +## Notes + +- Replace demo embeddings with real Gemini calls for production +- Add auth to endpoints if needed (API key/JWT) diff --git a/rust-engine/src/api.rs b/rust-engine/src/api.rs index cae23d7..55d1a06 100644 --- a/rust-engine/src/api.rs +++ b/rust-engine/src/api.rs @@ -64,7 +64,8 @@ pub fn routes(pool: MySqlPool) -> impl Filter Result { @@ -73,7 +74,7 @@ async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result Result::None) + .bind(true) .execute(&pool) .await .map_err(|e| { tracing::error!("DB insert error: {}", e); warp::reject() })?; - - // generate demo embedding and upsert to Qdrant (async best-effort) - let emb = crate::gemini_client::demo_embedding_from_path(path.to_str().unwrap()); - let qdrant_clone = qdrant.clone(); - let id_clone = id.clone(); - tokio::spawn(async move { - if let Err(e) = qdrant_clone.upsert_point(&id_clone, emb).await { - tracing::error!("qdrant upsert failed: {}", e); - } - }); + // Enqueue worker task to process file (to be implemented) } Ok(warp::reply::json(&serde_json::json!({"success": true}))) @@ -143,7 +132,11 @@ async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result Result { filename TEXT NOT NULL, path TEXT NOT NULL, description TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + pending_analysis BOOLEAN DEFAULT TRUE, + analysis_status VARCHAR(32) DEFAULT 'Queued' ); CREATE TABLE IF NOT EXISTS queries ( diff --git a/rust-engine/src/file_worker.rs b/rust-engine/src/file_worker.rs new file mode 100644 index 0000000..b70cb59 --- /dev/null +++ b/rust-engine/src/file_worker.rs @@ -0,0 +1,96 @@ +use crate::gemini_client::{generate_text, demo_text_embedding, DEMO_EMBED_DIM}; +use crate::vector_db::QdrantClient; +use sqlx::MySqlPool; +use anyhow::Result; +use tracing::{info, error}; + +pub struct FileWorker { + pool: MySqlPool, + qdrant: QdrantClient, +} + +impl FileWorker { + pub fn new(pool: MySqlPool) -> Self { + let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string()); + let qdrant = QdrantClient::new(&qdrant_url); + Self { pool, qdrant } + } + + pub async fn run(&self) { + info!("FileWorker starting"); + if let Err(e) = self.qdrant.ensure_files_collection(DEMO_EMBED_DIM).await { + error!("Failed to ensure Qdrant collection: {}", e); + } + loop { + match self.fetch_and_claim().await { + Ok(Some(fid)) => { + info!("Processing file {}", fid); + if let Err(e) = self.process_file(&fid).await { + error!("Error processing file {}: {}", fid, e); + } + } + Ok(None) => { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + Err(e) => { + error!("FileWorker fetch error: {}", e); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + } + } + + async fn fetch_and_claim(&self) -> Result> { + // Claim files that are queued or stuck in progress for >10min + if let Some(row) = sqlx::query( + "SELECT id FROM files WHERE (analysis_status = 'Queued' OR (analysis_status = 'InProgress' AND created_at < (NOW() - INTERVAL 10 MINUTE))) AND pending_analysis = TRUE LIMIT 1" + ) + .fetch_optional(&self.pool) + .await? { + use sqlx::Row; + let id: String = row.get("id"); + // Mark as in-progress + let _ = sqlx::query("UPDATE files SET analysis_status = 'InProgress' WHERE id = ?") + .bind(&id) + .execute(&self.pool) + .await?; + Ok(Some(id)) + } else { + Ok(None) + } + } + + async fn process_file(&self, file_id: &str) -> Result<()> { + use sqlx::Row; + let row = sqlx::query("SELECT filename, path FROM files WHERE id = ?") + .bind(file_id) + .fetch_one(&self.pool) + .await?; + let filename: String = row.get("filename"); + let path: String = row.get("path"); + + // Stage 1: Gemini 2.5 Flash for description + std::env::set_var("GEMINI_MODEL", "gemini-1.5-flash"); + let desc = generate_text(&format!("Describe the file '{filename}' and extract all key components, keywords, and details for later vectorization. Be comprehensive and factual.")).await.unwrap_or_else(|e| format!("[desc error: {}]", e)); + sqlx::query("UPDATE files SET description = ?, analysis_status = 'InProgress' WHERE id = ?") + .bind(&desc) + .bind(file_id) + .execute(&self.pool) + .await?; + + // Stage 2: Gemini 2.5 Pro for deep vector graph data + std::env::set_var("GEMINI_MODEL", "gemini-1.5-pro"); + let vector_graph = generate_text(&format!("Given the file '{filename}' and its description: {desc}\nGenerate a set of vector graph data (keywords, use cases, relationships) that can be used for broad and precise search. Only include what is directly supported by the file.")).await.unwrap_or_else(|e| format!("[vector error: {}]", e)); + + // Stage 3: Embed and upsert to Qdrant + let emb = demo_text_embedding(&vector_graph).await?; + self.qdrant.upsert_point(file_id, emb).await?; + + // Mark file as ready + sqlx::query("UPDATE files SET pending_analysis = FALSE, analysis_status = 'Completed' WHERE id = ?") + .bind(file_id) + .execute(&self.pool) + .await?; + Ok(()) + } +} diff --git a/rust-engine/src/gemini_client.rs b/rust-engine/src/gemini_client.rs index e6ef798..8639ed2 100644 --- a/rust-engine/src/gemini_client.rs +++ b/rust-engine/src/gemini_client.rs @@ -1,5 +1,7 @@ use anyhow::Result; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use reqwest::Client; // NOTE: This is a small stub to represent where you'd call the Gemini API. // Replace with real API call and proper auth handling for production. @@ -35,3 +37,52 @@ pub async fn demo_text_embedding(text: &str) -> Result> { } Ok(v) } + +/// Generate text with Gemini (Generative Language API). Falls back to a demo string if GEMINI_API_KEY is not set. +pub async fn generate_text(prompt: &str) -> Result { + let api_key = match std::env::var("GEMINI_API_KEY") { + Ok(k) if !k.is_empty() => k, + _ => { + return Ok(format!("[demo] Gemini not configured. Prompt preview: {}", truncate(prompt, 240))); + } + }; + + let model = std::env::var("GEMINI_MODEL").unwrap_or_else(|_| "gemini-1.5-pro".to_string()); + let url = format!( + "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent?key={}", + model, api_key + ); + + let body = json!({ + "contents": [ { "parts": [ { "text": prompt } ] } ] + }); + + let client = Client::new(); + let resp = client.post(&url).json(&body).send().await?; + let status = resp.status(); + let txt = resp.text().await?; + if !status.is_success() { + return Ok(format!("[demo] Gemini error {}: {}", status, truncate(&txt, 240))); + } + + #[derive(Deserialize)] + struct Part { text: Option } + #[derive(Deserialize)] + struct Content { parts: Vec } + #[derive(Deserialize)] + struct Candidate { content: Content } + #[derive(Deserialize)] + struct Response { candidates: Option> } + + let data: Response = serde_json::from_str(&txt).unwrap_or(Response { candidates: None }); + let out = data + .candidates + .and_then(|mut v| v.pop()) + .and_then(|c| c.content.parts.into_iter().find_map(|p| p.text)) + .unwrap_or_else(|| "[demo] Gemini returned empty response".to_string()); + Ok(out) +} + +fn truncate(s: &str, max: usize) -> String { + if s.len() <= max { s.to_string() } else { format!("{}…", &s[..max]) } +} diff --git a/rust-engine/src/main.rs b/rust-engine/src/main.rs index 30c022a..68b2663 100644 --- a/rust-engine/src/main.rs +++ b/rust-engine/src/main.rs @@ -1,3 +1,4 @@ +mod file_worker; mod api; mod db; mod gemini_client; @@ -31,10 +32,14 @@ async fn main() -> Result<(), Box> { // Initialize DB let pool = db::init_db(&database_url).await.map_err(|e| -> Box { Box::new(e) })?; - // Spawn worker + // Spawn query worker let worker = worker::Worker::new(pool.clone()); tokio::spawn(async move { worker.run().await }); + // Spawn file analysis worker + let file_worker = file_worker::FileWorker::new(pool.clone()); + tokio::spawn(async move { file_worker.run().await }); + // API routes let api_routes = api::routes(pool.clone()) .with(warp::cors() diff --git a/rust-engine/src/models.rs b/rust-engine/src/models.rs index 0ecad7c..8392e8b 100644 --- a/rust-engine/src/models.rs +++ b/rust-engine/src/models.rs @@ -9,6 +9,8 @@ pub struct FileRecord { pub path: String, pub description: Option, pub created_at: Option>, + pub pending_analysis: bool, // true if file is not yet ready for search + pub analysis_status: String, // 'Queued', 'InProgress', 'Completed', 'Failed' } impl FileRecord { @@ -19,6 +21,8 @@ impl FileRecord { path: path.into(), description, created_at: None, + pending_analysis: true, + analysis_status: "Queued".to_string(), } } } diff --git a/rust-engine/src/vector_db.rs b/rust-engine/src/vector_db.rs index 1ce45f5..cb09d79 100644 --- a/rust-engine/src/vector_db.rs +++ b/rust-engine/src/vector_db.rs @@ -10,6 +10,22 @@ pub struct QdrantClient { } impl QdrantClient { + + /// Delete a point from collection 'files' by id + pub async fn delete_point(&self, id: &str) -> Result<()> { + let url = format!("{}/collections/files/points/delete", self.base); + let body = json!({ + "points": [id] + }); + let resp = self.client.post(&url).json(&body).send().await?; + let status = resp.status(); + if status.is_success() { + Ok(()) + } else { + let t = resp.text().await.unwrap_or_default(); + Err(anyhow::anyhow!("qdrant delete failed: {} - {}", status, t)) + } + } pub fn new(base: &str) -> Self { Self { base: base.trim_end_matches('/').to_string(), @@ -55,8 +71,8 @@ impl QdrantClient { } } - /// Search top-k nearest points from 'files' - pub async fn search_top_k(&self, vector: Vec, k: usize) -> Result> { + /// Search top-k nearest points from 'files', return (id, score) + pub async fn search_top_k(&self, vector: Vec, k: usize) -> Result> { let url = format!("{}/collections/files/points/search", self.base); let body = json!({ "vector": vector, @@ -69,19 +85,19 @@ impl QdrantClient { return Err(anyhow::anyhow!("qdrant search failed: {} - {}", status, t)); } #[derive(Deserialize)] - struct Hit { id: serde_json::Value } + struct Hit { id: serde_json::Value, score: f32 } #[derive(Deserialize)] struct Data { result: Vec } let data: Data = resp.json().await?; - let mut ids = Vec::new(); + let mut out = Vec::new(); for h in data.result { // id can be string or number; handle string if let Some(s) = h.id.as_str() { - ids.push(s.to_string()); + out.push((s.to_string(), h.score)); } else { - ids.push(h.id.to_string()); + out.push((h.id.to_string(), h.score)); } } - Ok(ids) + Ok(out) } } diff --git a/rust-engine/src/worker.rs b/rust-engine/src/worker.rs index 5b455a0..a82a198 100644 --- a/rust-engine/src/worker.rs +++ b/rust-engine/src/worker.rs @@ -1,4 +1,4 @@ -use crate::gemini_client::{demo_text_embedding, DEMO_EMBED_DIM}; +use crate::gemini_client::{demo_text_embedding, DEMO_EMBED_DIM, generate_text}; use crate::models::{QueryRecord, QueryStatus}; use crate::vector_db::QdrantClient; use anyhow::Result; @@ -84,21 +84,51 @@ impl Worker { // Stage 2: embed query text let text = q.payload.get("q").and_then(|v| v.as_str()).unwrap_or(""); - let emb = demo_text_embedding(text).await?; + let emb = demo_text_embedding(text).await?; + let top_k = q.payload.get("top_k").and_then(|v| v.as_u64()).unwrap_or(5) as usize; // Check cancellation if self.is_cancelled(&q.id).await? { return Ok(()); } // Stage 3: search top-K in Qdrant - let top_ids = self.qdrant.search_top_k(emb, 5).await.unwrap_or_default(); + let hits = self.qdrant.search_top_k(emb, top_k).await.unwrap_or_default(); + let top_ids: Vec = hits.iter().map(|(id, _)| id.clone()).collect(); // Check cancellation if self.is_cancelled(&q.id).await? { return Ok(()); } - // Stage 4: persist results + // Stage 4: fetch file metadata for IDs + let mut files_json = Vec::new(); + for (fid, score) in hits { + if let Some(row) = sqlx::query("SELECT id, filename, path, description FROM files WHERE id = ? AND pending_analysis = FALSE") + .bind(&fid) + .fetch_optional(&self.pool) + .await? { + use sqlx::Row; + let id: String = row.get("id"); + let filename: String = row.get("filename"); + let path: String = row.get("path"); + let description: Option = row.get("description"); + files_json.push(serde_json::json!({ + "id": id, "filename": filename, "path": path, "description": description, "score": score + })); + } + } + + // Stage 5: call Gemini to analyze relationships and propose follow-up details strictly from provided files + let relationships_prompt = build_relationships_prompt(text, &files_json); + let relationships = generate_text(&relationships_prompt).await.unwrap_or_else(|e| format!("[demo] relationships error: {}", e)); + + // Stage 6: final answer synthesis with strict constraints (no speculation; say unknown when insufficient) + let final_prompt = build_final_answer_prompt(text, &files_json, &relationships); + let final_answer = generate_text(&final_prompt).await.unwrap_or_else(|e| format!("[demo] final answer error: {}", e)); + + // Stage 7: persist results let result = serde_json::json!({ - "summary": format!("Found {} related files", top_ids.len()), - "related_file_ids": top_ids, + "summary": format!("Found {} related files", files_json.len()), + "related_files": files_json, + "relationships": relationships, + "final_answer": final_answer, }); sqlx::query("UPDATE queries SET status = 'Completed', result = ? WHERE id = ?") .bind(result) @@ -158,3 +188,47 @@ impl Worker { Ok(false) } } + +fn build_relationships_prompt(query: &str, files: &Vec) -> String { + let files_snippets: Vec = files.iter().map(|f| format!( + "- id: {id}, filename: {name}, path: {path}, desc: {desc}", + id=f.get("id").and_then(|v| v.as_str()).unwrap_or(""), + name=f.get("filename").and_then(|v| v.as_str()).unwrap_or(""), + path=f.get("path").and_then(|v| v.as_str()).unwrap_or(""), + desc=f.get("description").and_then(|v| v.as_str()).unwrap_or("") + )).collect(); + format!( + "You are an assistant analyzing relationships STRICTLY within the provided files.\n\ + Query: {query}\n\ + Files:\n{files}\n\ + Tasks:\n\ + 1) Summarize key details from the files relevant to the query.\n\ + 2) Describe relationships and linkages strictly supported by these files.\n\ + 3) List important follow-up questions that could be answered only using the provided files.\n\ + Rules: Do NOT guess or invent. If information is insufficient in the files, explicitly state that.", + query=query, + files=files_snippets.join("\n") + ) +} + +fn build_final_answer_prompt(query: &str, files: &Vec, relationships: &str) -> String { + let files_short: Vec = files.iter().map(|f| format!( + "- {name} ({id})", + id=f.get("id").and_then(|v| v.as_str()).unwrap_or(""), + name=f.get("filename").and_then(|v| v.as_str()).unwrap_or("") + )).collect(); + format!( + "You are to compose a final answer to the user query using only the information from the files.\n\ + Query: {query}\n\ + Files considered:\n{files}\n\ + Relationship analysis:\n{rels}\n\ + Requirements:\n\ + - Use only information present in the files and analysis above.\n\ + - If the answer is uncertain or cannot be determined from the files, clearly state that limitation.\n\ + - Avoid speculation or assumptions.\n\ + Provide a concise, structured answer.", + query=query, + files=files_short.join("\n"), + rels=relationships + ) +}