Merge branch 'rust-dev'

This commit is contained in:
Christbru 2025-10-19 05:02:20 -05:00
commit 54169c2b93
9 changed files with 350 additions and 36 deletions

73
rust-engine/README.md Normal file
View file

@ -0,0 +1,73 @@
# Rust Engine API and Worker
## Overview
- HTTP API (warp) under /api for file management and query lifecycle
- MySQL for metadata, Qdrant for vector similarity
- Background worker resumes queued work and re-queues stale InProgress jobs at startup
## Environment variables
- DATABASE_URL: mysql://USER:PASS@HOST:3306/DB
- QDRANT_URL: default <http://qdrant:6333>
- GEMINI_API_KEY: used for Gemini content generation (optional in demo)
## Endpoints (JSON)
- POST /api/files (multipart)
- Form: file=@path
- Response: {"success": true}
- GET /api/files/list
- Response: {"files": [{"id","filename","path","description"}]}
- GET /api/files/delete?id=<file_id>
- Response: {"deleted": true|false}
- POST /api/query/create
- Body: {"q": "text", "top_k": 5}
- Response: {"id": "uuid"}
- GET /api/query/status?id=<query_id>
- Response: {"status": "Queued"|"InProgress"|"Completed"|"Cancelled"|"Failed"|"not_found"}
- GET /api/query/result?id=<query_id>
- Response (Completed):
{
"result": {
"summary": "Found N related files",
"related_files": [
{"id","filename","path","description","score"}
],
"relationships": "...",
"final_answer": "..."
}
}
- GET /api/query/cancel?id=<query_id>
- Response: {"cancelled": true}
## Worker behavior
- Ensures Qdrant collection exists (dim 64, cosine)
- Re-queues InProgress older than 10 minutes
- Processing stages:
1) Set InProgress
2) Embed query text (demo now; pluggable Gemini later)
3) Search Qdrant top_k (default 5)
4) Join file metadata (MySQL)
5) Gemini step: relationship analysis (strictly from provided files)
6) Gemini step: final answer (no speculation; say unknown if insufficient)
7) Persist result (JSON) and set Completed
- Checks for cancellation between stages
## Local quickstart
1. docker compose up -d mysql qdrant
2. set env DATABASE_URL and QDRANT_URL
3. cargo run
## Notes
- Replace demo embeddings with real Gemini calls for production
- Add auth to endpoints if needed (API key/JWT)

View file

@ -64,7 +64,8 @@ pub fn routes(pool: MySqlPool) -> impl Filter<Extract = impl Reply, Error = Reje
.and(pool_filter.clone())
.and_then(handle_cancel_query);
upload.or(delete).or(list).or(create_q).or(status).or(result).or(cancel)
let api = upload.or(delete).or(list).or(create_q).or(status).or(result).or(cancel);
warp::path("api").and(api)
}
async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply, Rejection> {
@ -73,7 +74,7 @@ async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply
let qdrant = QdrantClient::new(&qdrant_url);
while let Some(field) = form.try_next().await.map_err(|_| warp::reject())? {
let name = field.name().to_string();
let _name = field.name().to_string();
let filename = field
.filename()
.map(|s| s.to_string())
@ -102,33 +103,21 @@ async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply
// Save file
let path = storage::save_file(&filename, &data).map_err(|_| warp::reject())?;
// Generate gemini token/description (stub)
let token = gemini_client::generate_token_for_file(path.to_str().unwrap()).await.map_err(|_| warp::reject())?;
// Insert file record
// Insert file record with pending_analysis = true, description = NULL
let id = uuid::Uuid::new_v4().to_string();
let desc = Some(format!("token:{}", token));
sqlx::query("INSERT INTO files (id, filename, path, description) VALUES (?, ?, ?, ?)")
sqlx::query("INSERT INTO files (id, filename, path, description, pending_analysis) VALUES (?, ?, ?, ?, ?)")
.bind(&id)
.bind(&filename)
.bind(path.to_str().unwrap())
.bind(desc)
.bind(Option::<String>::None)
.bind(true)
.execute(&pool)
.await
.map_err(|e| {
tracing::error!("DB insert error: {}", e);
warp::reject()
})?;
// generate demo embedding and upsert to Qdrant (async best-effort)
let emb = crate::gemini_client::demo_embedding_from_path(path.to_str().unwrap());
let qdrant_clone = qdrant.clone();
let id_clone = id.clone();
tokio::spawn(async move {
if let Err(e) = qdrant_clone.upsert_point(&id_clone, emb).await {
tracing::error!("qdrant upsert failed: {}", e);
}
});
// Enqueue worker task to process file (to be implemented)
}
Ok(warp::reply::json(&serde_json::json!({"success": true})))
@ -143,6 +132,10 @@ async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Re
{
let path: String = row.get("path");
let _ = storage::delete_file(std::path::Path::new(&path));
// Remove from Qdrant
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
let qdrant = crate::vector_db::QdrantClient::new(&qdrant_url);
let _ = qdrant.delete_point(&q.id).await;
let _ = sqlx::query("DELETE FROM files WHERE id = ?").bind(&q.id).execute(&pool).await;
return Ok(warp::reply::json(&serde_json::json!({"deleted": true})));
}

View file

@ -12,7 +12,9 @@ pub async fn init_db(database_url: &str) -> Result<MySqlPool, sqlx::Error> {
filename TEXT NOT NULL,
path TEXT NOT NULL,
description TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
pending_analysis BOOLEAN DEFAULT TRUE,
analysis_status VARCHAR(32) DEFAULT 'Queued'
);
CREATE TABLE IF NOT EXISTS queries (

View file

@ -0,0 +1,96 @@
use crate::gemini_client::{generate_text, demo_text_embedding, DEMO_EMBED_DIM};
use crate::vector_db::QdrantClient;
use sqlx::MySqlPool;
use anyhow::Result;
use tracing::{info, error};
pub struct FileWorker {
pool: MySqlPool,
qdrant: QdrantClient,
}
impl FileWorker {
pub fn new(pool: MySqlPool) -> Self {
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
let qdrant = QdrantClient::new(&qdrant_url);
Self { pool, qdrant }
}
pub async fn run(&self) {
info!("FileWorker starting");
if let Err(e) = self.qdrant.ensure_files_collection(DEMO_EMBED_DIM).await {
error!("Failed to ensure Qdrant collection: {}", e);
}
loop {
match self.fetch_and_claim().await {
Ok(Some(fid)) => {
info!("Processing file {}", fid);
if let Err(e) = self.process_file(&fid).await {
error!("Error processing file {}: {}", fid, e);
}
}
Ok(None) => {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
Err(e) => {
error!("FileWorker fetch error: {}", e);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
async fn fetch_and_claim(&self) -> Result<Option<String>> {
// Claim files that are queued or stuck in progress for >10min
if let Some(row) = sqlx::query(
"SELECT id FROM files WHERE (analysis_status = 'Queued' OR (analysis_status = 'InProgress' AND created_at < (NOW() - INTERVAL 10 MINUTE))) AND pending_analysis = TRUE LIMIT 1"
)
.fetch_optional(&self.pool)
.await? {
use sqlx::Row;
let id: String = row.get("id");
// Mark as in-progress
let _ = sqlx::query("UPDATE files SET analysis_status = 'InProgress' WHERE id = ?")
.bind(&id)
.execute(&self.pool)
.await?;
Ok(Some(id))
} else {
Ok(None)
}
}
async fn process_file(&self, file_id: &str) -> Result<()> {
use sqlx::Row;
let row = sqlx::query("SELECT filename, path FROM files WHERE id = ?")
.bind(file_id)
.fetch_one(&self.pool)
.await?;
let filename: String = row.get("filename");
let path: String = row.get("path");
// Stage 1: Gemini 2.5 Flash for description
std::env::set_var("GEMINI_MODEL", "gemini-1.5-flash");
let desc = generate_text(&format!("Describe the file '{filename}' and extract all key components, keywords, and details for later vectorization. Be comprehensive and factual.")).await.unwrap_or_else(|e| format!("[desc error: {}]", e));
sqlx::query("UPDATE files SET description = ?, analysis_status = 'InProgress' WHERE id = ?")
.bind(&desc)
.bind(file_id)
.execute(&self.pool)
.await?;
// Stage 2: Gemini 2.5 Pro for deep vector graph data
std::env::set_var("GEMINI_MODEL", "gemini-1.5-pro");
let vector_graph = generate_text(&format!("Given the file '{filename}' and its description: {desc}\nGenerate a set of vector graph data (keywords, use cases, relationships) that can be used for broad and precise search. Only include what is directly supported by the file.")).await.unwrap_or_else(|e| format!("[vector error: {}]", e));
// Stage 3: Embed and upsert to Qdrant
let emb = demo_text_embedding(&vector_graph).await?;
self.qdrant.upsert_point(file_id, emb).await?;
// Mark file as ready
sqlx::query("UPDATE files SET pending_analysis = FALSE, analysis_status = 'Completed' WHERE id = ?")
.bind(file_id)
.execute(&self.pool)
.await?;
Ok(())
}
}

View file

@ -1,5 +1,7 @@
use anyhow::Result;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json::json;
use reqwest::Client;
// NOTE: This is a small stub to represent where you'd call the Gemini API.
// Replace with real API call and proper auth handling for production.
@ -35,3 +37,52 @@ pub async fn demo_text_embedding(text: &str) -> Result<Vec<f32>> {
}
Ok(v)
}
/// Generate text with Gemini (Generative Language API). Falls back to a demo string if GEMINI_API_KEY is not set.
pub async fn generate_text(prompt: &str) -> Result<String> {
let api_key = match std::env::var("GEMINI_API_KEY") {
Ok(k) if !k.is_empty() => k,
_ => {
return Ok(format!("[demo] Gemini not configured. Prompt preview: {}", truncate(prompt, 240)));
}
};
let model = std::env::var("GEMINI_MODEL").unwrap_or_else(|_| "gemini-1.5-pro".to_string());
let url = format!(
"https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent?key={}",
model, api_key
);
let body = json!({
"contents": [ { "parts": [ { "text": prompt } ] } ]
});
let client = Client::new();
let resp = client.post(&url).json(&body).send().await?;
let status = resp.status();
let txt = resp.text().await?;
if !status.is_success() {
return Ok(format!("[demo] Gemini error {}: {}", status, truncate(&txt, 240)));
}
#[derive(Deserialize)]
struct Part { text: Option<String> }
#[derive(Deserialize)]
struct Content { parts: Vec<Part> }
#[derive(Deserialize)]
struct Candidate { content: Content }
#[derive(Deserialize)]
struct Response { candidates: Option<Vec<Candidate>> }
let data: Response = serde_json::from_str(&txt).unwrap_or(Response { candidates: None });
let out = data
.candidates
.and_then(|mut v| v.pop())
.and_then(|c| c.content.parts.into_iter().find_map(|p| p.text))
.unwrap_or_else(|| "[demo] Gemini returned empty response".to_string());
Ok(out)
}
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max { s.to_string() } else { format!("{}", &s[..max]) }
}

View file

@ -1,3 +1,4 @@
mod file_worker;
mod api;
mod db;
mod gemini_client;
@ -31,10 +32,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize DB
let pool = db::init_db(&database_url).await.map_err(|e| -> Box<dyn Error> { Box::new(e) })?;
// Spawn worker
// Spawn query worker
let worker = worker::Worker::new(pool.clone());
tokio::spawn(async move { worker.run().await });
// Spawn file analysis worker
let file_worker = file_worker::FileWorker::new(pool.clone());
tokio::spawn(async move { file_worker.run().await });
// API routes
let api_routes = api::routes(pool.clone())
.with(warp::cors()

View file

@ -9,6 +9,8 @@ pub struct FileRecord {
pub path: String,
pub description: Option<String>,
pub created_at: Option<DateTime<Utc>>,
pub pending_analysis: bool, // true if file is not yet ready for search
pub analysis_status: String, // 'Queued', 'InProgress', 'Completed', 'Failed'
}
impl FileRecord {
@ -19,6 +21,8 @@ impl FileRecord {
path: path.into(),
description,
created_at: None,
pending_analysis: true,
analysis_status: "Queued".to_string(),
}
}
}

View file

@ -10,6 +10,22 @@ pub struct QdrantClient {
}
impl QdrantClient {
/// Delete a point from collection 'files' by id
pub async fn delete_point(&self, id: &str) -> Result<()> {
let url = format!("{}/collections/files/points/delete", self.base);
let body = json!({
"points": [id]
});
let resp = self.client.post(&url).json(&body).send().await?;
let status = resp.status();
if status.is_success() {
Ok(())
} else {
let t = resp.text().await.unwrap_or_default();
Err(anyhow::anyhow!("qdrant delete failed: {} - {}", status, t))
}
}
pub fn new(base: &str) -> Self {
Self {
base: base.trim_end_matches('/').to_string(),
@ -55,8 +71,8 @@ impl QdrantClient {
}
}
/// Search top-k nearest points from 'files'
pub async fn search_top_k(&self, vector: Vec<f32>, k: usize) -> Result<Vec<String>> {
/// Search top-k nearest points from 'files', return (id, score)
pub async fn search_top_k(&self, vector: Vec<f32>, k: usize) -> Result<Vec<(String, f32)>> {
let url = format!("{}/collections/files/points/search", self.base);
let body = json!({
"vector": vector,
@ -69,19 +85,19 @@ impl QdrantClient {
return Err(anyhow::anyhow!("qdrant search failed: {} - {}", status, t));
}
#[derive(Deserialize)]
struct Hit { id: serde_json::Value }
struct Hit { id: serde_json::Value, score: f32 }
#[derive(Deserialize)]
struct Data { result: Vec<Hit> }
let data: Data = resp.json().await?;
let mut ids = Vec::new();
let mut out = Vec::new();
for h in data.result {
// id can be string or number; handle string
if let Some(s) = h.id.as_str() {
ids.push(s.to_string());
out.push((s.to_string(), h.score));
} else {
ids.push(h.id.to_string());
out.push((h.id.to_string(), h.score));
}
}
Ok(ids)
Ok(out)
}
}

View file

@ -1,4 +1,4 @@
use crate::gemini_client::{demo_text_embedding, DEMO_EMBED_DIM};
use crate::gemini_client::{demo_text_embedding, DEMO_EMBED_DIM, generate_text};
use crate::models::{QueryRecord, QueryStatus};
use crate::vector_db::QdrantClient;
use anyhow::Result;
@ -85,20 +85,50 @@ impl Worker {
// Stage 2: embed query text
let text = q.payload.get("q").and_then(|v| v.as_str()).unwrap_or("");
let emb = demo_text_embedding(text).await?;
let top_k = q.payload.get("top_k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
// Check cancellation
if self.is_cancelled(&q.id).await? { return Ok(()); }
// Stage 3: search top-K in Qdrant
let top_ids = self.qdrant.search_top_k(emb, 5).await.unwrap_or_default();
let hits = self.qdrant.search_top_k(emb, top_k).await.unwrap_or_default();
let top_ids: Vec<String> = hits.iter().map(|(id, _)| id.clone()).collect();
// Check cancellation
if self.is_cancelled(&q.id).await? { return Ok(()); }
// Stage 4: persist results
// Stage 4: fetch file metadata for IDs
let mut files_json = Vec::new();
for (fid, score) in hits {
if let Some(row) = sqlx::query("SELECT id, filename, path, description FROM files WHERE id = ? AND pending_analysis = FALSE")
.bind(&fid)
.fetch_optional(&self.pool)
.await? {
use sqlx::Row;
let id: String = row.get("id");
let filename: String = row.get("filename");
let path: String = row.get("path");
let description: Option<String> = row.get("description");
files_json.push(serde_json::json!({
"id": id, "filename": filename, "path": path, "description": description, "score": score
}));
}
}
// Stage 5: call Gemini to analyze relationships and propose follow-up details strictly from provided files
let relationships_prompt = build_relationships_prompt(text, &files_json);
let relationships = generate_text(&relationships_prompt).await.unwrap_or_else(|e| format!("[demo] relationships error: {}", e));
// Stage 6: final answer synthesis with strict constraints (no speculation; say unknown when insufficient)
let final_prompt = build_final_answer_prompt(text, &files_json, &relationships);
let final_answer = generate_text(&final_prompt).await.unwrap_or_else(|e| format!("[demo] final answer error: {}", e));
// Stage 7: persist results
let result = serde_json::json!({
"summary": format!("Found {} related files", top_ids.len()),
"related_file_ids": top_ids,
"summary": format!("Found {} related files", files_json.len()),
"related_files": files_json,
"relationships": relationships,
"final_answer": final_answer,
});
sqlx::query("UPDATE queries SET status = 'Completed', result = ? WHERE id = ?")
.bind(result)
@ -158,3 +188,47 @@ impl Worker {
Ok(false)
}
}
fn build_relationships_prompt(query: &str, files: &Vec<serde_json::Value>) -> String {
let files_snippets: Vec<String> = files.iter().map(|f| format!(
"- id: {id}, filename: {name}, path: {path}, desc: {desc}",
id=f.get("id").and_then(|v| v.as_str()).unwrap_or(""),
name=f.get("filename").and_then(|v| v.as_str()).unwrap_or(""),
path=f.get("path").and_then(|v| v.as_str()).unwrap_or(""),
desc=f.get("description").and_then(|v| v.as_str()).unwrap_or("")
)).collect();
format!(
"You are an assistant analyzing relationships STRICTLY within the provided files.\n\
Query: {query}\n\
Files:\n{files}\n\
Tasks:\n\
1) Summarize key details from the files relevant to the query.\n\
2) Describe relationships and linkages strictly supported by these files.\n\
3) List important follow-up questions that could be answered only using the provided files.\n\
Rules: Do NOT guess or invent. If information is insufficient in the files, explicitly state that.",
query=query,
files=files_snippets.join("\n")
)
}
fn build_final_answer_prompt(query: &str, files: &Vec<serde_json::Value>, relationships: &str) -> String {
let files_short: Vec<String> = files.iter().map(|f| format!(
"- {name} ({id})",
id=f.get("id").and_then(|v| v.as_str()).unwrap_or(""),
name=f.get("filename").and_then(|v| v.as_str()).unwrap_or("")
)).collect();
format!(
"You are to compose a final answer to the user query using only the information from the files.\n\
Query: {query}\n\
Files considered:\n{files}\n\
Relationship analysis:\n{rels}\n\
Requirements:\n\
- Use only information present in the files and analysis above.\n\
- If the answer is uncertain or cannot be determined from the files, clearly state that limitation.\n\
- Avoid speculation or assumptions.\n\
Provide a concise, structured answer.",
query=query,
files=files_short.join("\n"),
rels=relationships
)
}