diff --git a/rust-engine/src/api.rs b/rust-engine/src/api.rs index c531f36..41c4063 100644 --- a/rust-engine/src/api.rs +++ b/rust-engine/src/api.rs @@ -1,9 +1,8 @@ -use crate::gemini_client; use crate::vector_db::QdrantClient; use crate::storage; use anyhow::Result; use bytes::Buf; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::TryStreamExt; use serde::Deserialize; use sqlx::{MySqlPool, Row}; use warp::{multipart::FormData, Filter, Rejection, Reply}; @@ -80,10 +79,7 @@ pub fn routes(pool: MySqlPool) -> impl Filter Result { - // qdrant client - let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string()); - let qdrant = QdrantClient::new(&qdrant_url); - + let mut created_files = Vec::new(); while let Some(field) = form.try_next().await.map_err(|_| warp::reject())? { let _name = field.name().to_string(); let filename = field @@ -116,7 +112,7 @@ async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result Result, pool: MySqlPool) -> Result { @@ -209,7 +213,7 @@ async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result { @@ -66,12 +70,18 @@ impl FileWorker { .bind(file_id) .fetch_one(&self.pool) .await?; - let filename: String = row.get("filename"); - let path: String = row.get("path"); + let filename: String = row.get("filename"); + let _path: String = row.get("path"); // Stage 1: Gemini 2.5 Flash for description - std::env::set_var("GEMINI_MODEL", "gemini-1.5-flash"); - let desc = generate_text(&format!("Describe the file '{filename}' and extract all key components, keywords, and details for later vectorization. Be comprehensive and factual.")).await.unwrap_or_else(|e| format!("[desc error: {}]", e)); + let desc = generate_text_with_model( + "gemini-2.5-flash", + &format!( + "Describe the file '{filename}' and extract all key components, keywords, and details for later vectorization. Be comprehensive and factual." + ), + ) + .await + .unwrap_or_else(|e| format!("[desc error: {}]", e)); sqlx::query("UPDATE files SET description = ?, analysis_status = 'InProgress' WHERE id = ?") .bind(&desc) .bind(file_id) @@ -79,12 +89,26 @@ impl FileWorker { .await?; // Stage 2: Gemini 2.5 Pro for deep vector graph data - std::env::set_var("GEMINI_MODEL", "gemini-1.5-pro"); - let vector_graph = generate_text(&format!("Given the file '{filename}' and its description: {desc}\nGenerate a set of vector graph data (keywords, use cases, relationships) that can be used for broad and precise search. Only include what is directly supported by the file.")).await.unwrap_or_else(|e| format!("[vector error: {}]", e)); + let vector_graph = generate_text_with_model( + "gemini-2.5-pro", + &format!( + "Given the file '{filename}' and its description: {desc}\nGenerate a set of vector graph data (keywords, use cases, relationships) that can be used for broad and precise search. Only include what is directly supported by the file." + ), + ) + .await + .unwrap_or_else(|e| format!("[vector error: {}]", e)); // Stage 3: Embed and upsert to Qdrant let emb = demo_text_embedding(&vector_graph).await?; - self.qdrant.upsert_point(file_id, emb).await?; + match self.qdrant.upsert_point(file_id, emb.clone()).await { + Ok(_) => { + let _ = vector::store_embedding(file_id, emb.clone()); + } + Err(err) => { + error!("Qdrant upsert failed for {}: {}", file_id, err); + let _ = vector::store_embedding(file_id, emb); + } + } // Mark file as ready sqlx::query("UPDATE files SET pending_analysis = FALSE, analysis_status = 'Completed' WHERE id = ?") @@ -93,4 +117,17 @@ impl FileWorker { .await?; Ok(()) } + + async fn mark_failed(&self, file_id: &str, reason: &str) -> Result<()> { + sqlx::query("UPDATE files SET analysis_status = 'Failed', pending_analysis = TRUE WHERE id = ?") + .bind(file_id) + .execute(&self.pool) + .await?; + sqlx::query("UPDATE files SET description = COALESCE(description, ?) WHERE id = ?") + .bind(format!("[analysis failed: {}]", reason)) + .bind(file_id) + .execute(&self.pool) + .await?; + Ok(()) + } } diff --git a/rust-engine/src/gemini_client.rs b/rust-engine/src/gemini_client.rs index 8639ed2..5741166 100644 --- a/rust-engine/src/gemini_client.rs +++ b/rust-engine/src/gemini_client.rs @@ -1,30 +1,11 @@ use anyhow::Result; -use serde::{Deserialize, Serialize}; -use serde_json::json; use reqwest::Client; +use serde::Deserialize; +use serde_json::json; -// NOTE: This is a small stub to represent where you'd call the Gemini API. -// Replace with real API call and proper auth handling for production. - -#[derive(Debug, Deserialize)] -pub struct GeminiTokenResponse { - pub token: String, -} - -pub async fn generate_token_for_file(_path: &str) -> Result { - Ok("gemini-token-placeholder".to_string()) -} - -/// Demo embedding generator - deterministic pseudo-embedding from filename/path -pub fn demo_embedding_from_path(path: &str) -> Vec { - // Very simple: hash bytes into a small vector - let mut v = vec![0f32; 64]; - for (i, b) in path.as_bytes().iter().enumerate() { - let idx = i % v.len(); - v[idx] += (*b as f32) / 255.0; - } - v -} +// NOTE: This file provides lightweight helpers around the Gemini API. For the +// hackathon demo we fall back to deterministic strings when the API key is not +// configured so the flows still work end-to-end. pub const DEMO_EMBED_DIM: usize = 64; @@ -38,16 +19,27 @@ pub async fn demo_text_embedding(text: &str) -> Result> { Ok(v) } -/// Generate text with Gemini (Generative Language API). Falls back to a demo string if GEMINI_API_KEY is not set. +/// Generate text using the default model (GEMINI_MODEL or gemini-2.5-pro). +#[allow(dead_code)] pub async fn generate_text(prompt: &str) -> Result { + let model = std::env::var("GEMINI_MODEL").unwrap_or_else(|_| "gemini-2.5-pro".to_string()); + generate_text_with_model(&model, prompt).await +} + +/// Generate text with an explicit Gemini model. Falls back to a deterministic +/// response when the API key is not set so the demo still runs. +pub async fn generate_text_with_model(model: &str, prompt: &str) -> Result { let api_key = match std::env::var("GEMINI_API_KEY") { Ok(k) if !k.is_empty() => k, _ => { - return Ok(format!("[demo] Gemini not configured. Prompt preview: {}", truncate(prompt, 240))); + return Ok(format!( + "[demo] Gemini ({}) not configured. Prompt preview: {}", + model, + truncate(prompt, 240) + )); } }; - let model = std::env::var("GEMINI_MODEL").unwrap_or_else(|_| "gemini-1.5-pro".to_string()); let url = format!( "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent?key={}", model, api_key @@ -62,7 +54,12 @@ pub async fn generate_text(prompt: &str) -> Result { let status = resp.status(); let txt = resp.text().await?; if !status.is_success() { - return Ok(format!("[demo] Gemini error {}: {}", status, truncate(&txt, 240))); + return Ok(format!( + "[demo] Gemini ({}) error {}: {}", + model, + status, + truncate(&txt, 240) + )); } #[derive(Deserialize)] @@ -84,5 +81,9 @@ pub async fn generate_text(prompt: &str) -> Result { } fn truncate(s: &str, max: usize) -> String { - if s.len() <= max { s.to_string() } else { format!("{}…", &s[..max]) } + if s.len() <= max { + s.to_string() + } else { + format!("{}…", &s[..max]) + } } diff --git a/rust-engine/src/models.rs b/rust-engine/src/models.rs index 8392e8b..59f0e12 100644 --- a/rust-engine/src/models.rs +++ b/rust-engine/src/models.rs @@ -14,6 +14,7 @@ pub struct FileRecord { } impl FileRecord { + #[allow(dead_code)] pub fn new(filename: impl Into, path: impl Into, description: Option) -> Self { Self { id: Uuid::new_v4().to_string(), diff --git a/rust-engine/src/worker.rs b/rust-engine/src/worker.rs index a82a198..e3e96bb 100644 --- a/rust-engine/src/worker.rs +++ b/rust-engine/src/worker.rs @@ -1,5 +1,6 @@ -use crate::gemini_client::{demo_text_embedding, DEMO_EMBED_DIM, generate_text}; +use crate::gemini_client::{demo_text_embedding, generate_text_with_model, DEMO_EMBED_DIM}; use crate::models::{QueryRecord, QueryStatus}; +use crate::vector; use crate::vector_db::QdrantClient; use anyhow::Result; use sqlx::MySqlPool; @@ -84,15 +85,34 @@ impl Worker { // Stage 2: embed query text let text = q.payload.get("q").and_then(|v| v.as_str()).unwrap_or(""); - let emb = demo_text_embedding(text).await?; - let top_k = q.payload.get("top_k").and_then(|v| v.as_u64()).unwrap_or(5) as usize; + let emb = demo_text_embedding(text).await?; + let top_k = q.payload.get("top_k").and_then(|v| v.as_u64()).unwrap_or(5) as usize; + let top_k = top_k.max(1).min(20); // Check cancellation if self.is_cancelled(&q.id).await? { return Ok(()); } // Stage 3: search top-K in Qdrant - let hits = self.qdrant.search_top_k(emb, top_k).await.unwrap_or_default(); - let top_ids: Vec = hits.iter().map(|(id, _)| id.clone()).collect(); + let hits = match self.qdrant.search_top_k(emb.clone(), top_k).await { + Ok(list) if !list.is_empty() => list, + Ok(_) => Vec::new(), + Err(err) => { + error!("Qdrant search failed for query {}: {}", q.id, err); + Vec::new() + } + }; + + let hits = if hits.is_empty() { + match vector::query_top_k(&emb, top_k) { + Ok(fallback_ids) if !fallback_ids.is_empty() => { + info!("Using in-memory fallback for query {}", q.id); + fallback_ids.into_iter().map(|id| (id, 0.0)).collect() + } + _ => Vec::new(), + } + } else { + hits + }; // Check cancellation if self.is_cancelled(&q.id).await? { return Ok(()); } @@ -117,11 +137,23 @@ impl Worker { // Stage 5: call Gemini to analyze relationships and propose follow-up details strictly from provided files let relationships_prompt = build_relationships_prompt(text, &files_json); - let relationships = generate_text(&relationships_prompt).await.unwrap_or_else(|e| format!("[demo] relationships error: {}", e)); + let (relationships, final_answer) = if files_json.is_empty() { + ( + "No analyzed files are ready yet. Try seeding demo data or wait for processing to finish.".to_string(), + "I could not find any relevant documents yet. Once files finish analysis I will be able to answer.".to_string(), + ) + } else { + let relationships = generate_text_with_model("gemini-2.5-pro", &relationships_prompt) + .await + .unwrap_or_else(|e| format!("[demo] relationships error: {}", e)); - // Stage 6: final answer synthesis with strict constraints (no speculation; say unknown when insufficient) - let final_prompt = build_final_answer_prompt(text, &files_json, &relationships); - let final_answer = generate_text(&final_prompt).await.unwrap_or_else(|e| format!("[demo] final answer error: {}", e)); + // Stage 6: final answer synthesis with strict constraints (no speculation; say unknown when insufficient) + let final_prompt = build_final_answer_prompt(text, &files_json, &relationships); + let final_answer = generate_text_with_model("gemini-2.5-pro", &final_prompt) + .await + .unwrap_or_else(|e| format!("[demo] final answer error: {}", e)); + (relationships, final_answer) + }; // Stage 7: persist results let result = serde_json::json!({