Prep worker management corrections and debug

This commit is contained in:
Christbru 2025-10-19 09:40:59 -05:00
commit 606c1dff53
11 changed files with 225 additions and 89 deletions

View file

@ -8,6 +8,8 @@ services:
- DATABASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql:3306/${MYSQL_DATABASE} - DATABASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql:3306/${MYSQL_DATABASE}
- RUST_ENGINE_URL=http://rust-engine:8000 - RUST_ENGINE_URL=http://rust-engine:8000
- GEMINI_API_KEY=${GEMINI_API_KEY} - GEMINI_API_KEY=${GEMINI_API_KEY}
volumes:
- rust-storage:/app/storage:ro
depends_on: depends_on:
- mysql - mysql
- rust-engine - rust-engine

View file

@ -13,6 +13,8 @@ services:
- DATABASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql:3306/${MYSQL_DATABASE} - DATABASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql:3306/${MYSQL_DATABASE}
- RUST_ENGINE_URL=http://rust-engine:8000 - RUST_ENGINE_URL=http://rust-engine:8000
- GEMINI_API_KEY=${GEMINI_API_KEY} - GEMINI_API_KEY=${GEMINI_API_KEY}
volumes:
- rust-storage:/app/storage:ro
depends_on: depends_on:
- mysql # <-- Updated dependency - mysql # <-- Updated dependency
- rust-engine - rust-engine

View file

@ -11,6 +11,8 @@
- DATABASE_URL: mysql://USER:PASS@HOST:3306/DB - DATABASE_URL: mysql://USER:PASS@HOST:3306/DB
- QDRANT_URL: default <http://qdrant:6333> - QDRANT_URL: default <http://qdrant:6333>
- GEMINI_API_KEY: used for Gemini content generation (optional in demo) - GEMINI_API_KEY: used for Gemini content generation (optional in demo)
- DEMO_DATA_DIR: path to the folder containing PDF demo data (default resolves to `demo-data` under the repo or `/app/demo-data` in containers)
- ASTRA_STORAGE: directory for uploaded file blobs (default `/app/storage`)
## Endpoints (JSON) ## Endpoints (JSON)
@ -19,7 +21,12 @@
- Response: {"success": true} - Response: {"success": true}
- GET /api/files/list - GET /api/files/list
- Response: {"files": [{"id","filename","path","description"}]} - Response: {"files": [{"id","filename","path","storage_url","description"}]}
- POST /api/files/import-demo[?force=1]
- Copies PDFs from the demo directory into storage and queues them for analysis.
- Response: {"imported": N, "skipped": M, "files_found": K, "source_dir": "...", "attempted_paths": [...], "force": bool}
- `force=1` deletes prior records with the same filename before re-importing.
- GET /api/files/delete?id=<file_id> - GET /api/files/delete?id=<file_id>
- Response: {"deleted": true|false} - Response: {"deleted": true|false}
@ -67,10 +74,10 @@
2. set env DATABASE_URL and QDRANT_URL 2. set env DATABASE_URL and QDRANT_URL
3. cargo run 3. cargo run
4. (optional) import demo PDFs 4. (optional) import demo PDFs
- Ensure demo files are located in `rust-engine/demo-data` (default) or set `DEMO_DATA_DIR` env var to a folder containing PDFs. - Populate a folder with PDFs under `rust-engine/demo-data` (or point `DEMO_DATA_DIR` to a custom path). The server auto-resolves common locations such as the repo root, `/app/demo-data`, and the working directory when running in Docker.
- Call the endpoint: - Call the endpoint:
- POST <http://localhost:8000/api/files/import-demo> - POST <http://localhost:8000/api/files/import-demo>
- Optional query `?force=1` to overwrite existing by filename - Optional query `?force=1` to overwrite existing by filename. The JSON response also echoes where the engine looked (`source_dir`, `attempted_paths`) and how many PDFs were detected (`files_found`) so misconfigurations are easy to spot. Imported files are written to the shared `/app/storage` volume; the web-app container mounts this volume read-only and serves the contents at `/storage/<filename>`.
- Or run the PowerShell helper: - Or run the PowerShell helper:
- `./scripts/import_demo.ps1` (adds all PDFs in demo-data) - `./scripts/import_demo.ps1` (adds all PDFs in demo-data)
- `./scripts/import_demo.ps1 -Force` (overwrite existing) - `./scripts/import_demo.ps1 -Force` (overwrite existing)

View file

@ -1,10 +1,11 @@
use crate::vector_db::QdrantClient;
use crate::storage; use crate::storage;
use crate::vector_db::QdrantClient;
use anyhow::Result; use anyhow::Result;
use bytes::Buf; use bytes::Buf;
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use serde::Deserialize; use serde::Deserialize;
use sqlx::{MySqlPool, Row}; use sqlx::{MySqlPool, Row};
use tracing::info;
use warp::{multipart::FormData, Filter, Rejection, Reply}; use warp::{multipart::FormData, Filter, Rejection, Reply};
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@ -21,7 +22,7 @@ pub fn routes(pool: MySqlPool) -> impl Filter<Extract = impl Reply, Error = Reje
.and( .and(
warp::query::<std::collections::HashMap<String, String>>() warp::query::<std::collections::HashMap<String, String>>()
.or(warp::any().map(|| std::collections::HashMap::new())) .or(warp::any().map(|| std::collections::HashMap::new()))
.unify() .unify(),
) )
.and(pool_filter.clone()) .and(pool_filter.clone())
.and_then(handle_import_demo); .and_then(handle_import_demo);
@ -74,14 +75,21 @@ pub fn routes(pool: MySqlPool) -> impl Filter<Extract = impl Reply, Error = Reje
.and(pool_filter.clone()) .and(pool_filter.clone())
.and_then(handle_cancel_query); .and_then(handle_cancel_query);
let api = upload.or(import_demo).or(delete).or(list).or(create_q).or(status).or(result).or(cancel); let api = upload
.or(import_demo)
.or(delete)
.or(list)
.or(create_q)
.or(status)
.or(result)
.or(cancel);
warp::path("api").and(api) warp::path("api").and(api)
} }
async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply, Rejection> { async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply, Rejection> {
let mut created_files = Vec::new(); let mut created_files = Vec::new();
while let Some(field) = form.try_next().await.map_err(|_| warp::reject())? { while let Some(field) = form.try_next().await.map_err(|_| warp::reject())? {
let _name = field.name().to_string(); let _name = field.name().to_string();
let filename = field let filename = field
.filename() .filename()
.map(|s| s.to_string()) .map(|s| s.to_string())
@ -138,13 +146,22 @@ async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply
}))) })))
} }
async fn handle_import_demo(params: std::collections::HashMap<String, String>, pool: MySqlPool) -> Result<impl Reply, Rejection> { async fn handle_import_demo(
params: std::collections::HashMap<String, String>,
pool: MySqlPool,
) -> Result<impl Reply, Rejection> {
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
let force = params.get("force").map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false); let force = params
let demo_dir_setting = std::env::var("DEMO_DATA_DIR").unwrap_or_else(|_| "demo-data".to_string()); .get("force")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let demo_dir_setting =
std::env::var("DEMO_DATA_DIR").unwrap_or_else(|_| "demo-data".to_string());
let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
info!(force, requested_dir = %demo_dir_setting, "demo import requested");
// Build a list of plausible demo-data locations so local runs and containers both work. // Build a list of plausible demo-data locations so local runs and containers both work.
let mut candidates: Vec<PathBuf> = Vec::new(); let mut candidates: Vec<PathBuf> = Vec::new();
let configured = PathBuf::from(&demo_dir_setting); let configured = PathBuf::from(&demo_dir_setting);
@ -171,33 +188,45 @@ async fn handle_import_demo(params: std::collections::HashMap<String, String>, p
let mut resolved_dir: Option<PathBuf> = None; let mut resolved_dir: Option<PathBuf> = None;
for candidate in candidates { for candidate in candidates {
if candidate.exists() && candidate.is_dir() { if candidate.exists() && candidate.is_dir() {
resolved_dir = Some(candidate); resolved_dir = Some(candidate.clone());
break; break;
} }
attempted.push(candidate); attempted.push(candidate);
} }
let attempted_paths: Vec<String> = attempted.iter().map(|p| p.display().to_string()).collect();
let src_dir = match resolved_dir { let src_dir = match resolved_dir {
Some(path) => path, Some(path) => path,
None => { None => {
let attempted_paths: Vec<String> = attempted
.into_iter()
.map(|p| p.display().to_string())
.collect();
return Ok(warp::reply::json(&serde_json::json!({ return Ok(warp::reply::json(&serde_json::json!({
"imported": 0, "imported": 0,
"skipped": 0, "skipped": 0,
"error": format!("demo dir not found (checked: {})", attempted_paths.join(", ")) "files_found": 0,
"attempted_paths": attempted_paths,
"error": format!("demo dir not found; set DEMO_DATA_DIR or bind mount demo PDFs")
}))); })));
} }
}; };
let src_dir_display = src_dir.display().to_string();
info!(source = %src_dir_display, attempted_paths = ?attempted_paths, "demo import source resolved");
let mut imported = 0; let mut imported = 0;
let mut skipped = 0; let mut skipped = 0;
let mut files_found = 0;
for entry in fs::read_dir(&src_dir).map_err(|_| warp::reject())? { for entry in fs::read_dir(&src_dir).map_err(|_| warp::reject())? {
let entry = entry.map_err(|_| warp::reject())?; let entry = entry.map_err(|_| warp::reject())?;
let path = entry.path(); let path = entry.path();
if path.extension().and_then(|e| e.to_str()).map(|e| e.eq_ignore_ascii_case("pdf")).unwrap_or(false) { if path
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown.pdf").to_string(); .extension()
.and_then(|e| e.to_str())
.map(|e| e.eq_ignore_ascii_case("pdf"))
.unwrap_or(false)
{
files_found += 1;
let filename = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown.pdf")
.to_string();
// check if exists // check if exists
if !force { if !force {
@ -205,8 +234,10 @@ async fn handle_import_demo(params: std::collections::HashMap<String, String>, p
.bind(&filename) .bind(&filename)
.fetch_optional(&pool) .fetch_optional(&pool)
.await .await
.map_err(|_| warp::reject())? { .map_err(|_| warp::reject())?
{
skipped += 1; skipped += 1;
info!(%filename, "skipping demo import; already present");
continue; continue;
} }
} }
@ -214,6 +245,7 @@ async fn handle_import_demo(params: std::collections::HashMap<String, String>, p
// read and save to storage // read and save to storage
let data = fs::read(&path).map_err(|_| warp::reject())?; let data = fs::read(&path).map_err(|_| warp::reject())?;
let stored_path = storage::save_file(&filename, &data).map_err(|_| warp::reject())?; let stored_path = storage::save_file(&filename, &data).map_err(|_| warp::reject())?;
info!(%filename, dest = %stored_path.to_string_lossy(), "demo file copied to storage");
// insert or upsert db record // insert or upsert db record
let id = uuid::Uuid::new_v4().to_string(); let id = uuid::Uuid::new_v4().to_string();
@ -222,11 +254,12 @@ async fn handle_import_demo(params: std::collections::HashMap<String, String>, p
.bind(&filename) .bind(&filename)
.execute(&pool) .execute(&pool)
.await; .await;
info!(%filename, "existing file records removed due to force import");
} }
sqlx::query("INSERT INTO files (id, filename, path, description, pending_analysis, analysis_status) VALUES (?, ?, ?, ?, ?, 'Queued')") sqlx::query("INSERT INTO files (id, filename, path, description, pending_analysis, analysis_status) VALUES (?, ?, ?, ?, ?, 'Queued')")
.bind(&id) .bind(&id)
.bind(&filename) .bind(&filename)
.bind(stored_path.to_str().unwrap()) .bind(stored_path.to_string_lossy().to_string())
.bind(Option::<String>::None) .bind(Option::<String>::None)
.bind(true) .bind(true)
.execute(&pool) .execute(&pool)
@ -235,10 +268,36 @@ async fn handle_import_demo(params: std::collections::HashMap<String, String>, p
tracing::error!("DB insert error: {}", e); tracing::error!("DB insert error: {}", e);
warp::reject() warp::reject()
})?; })?;
info!(%filename, file_id = %id, "demo file inserted into database");
// queue for worker
sqlx::query("INSERT INTO file_jobs (file_id, status) VALUES (?, 'Queued')")
.bind(&id)
.execute(&pool)
.await
.map_err(|_| warp::reject())?;
info!(%filename, file_id = %id, "demo file queued for analysis");
imported += 1; imported += 1;
} }
} }
Ok(warp::reply::json(&serde_json::json!({ "imported": imported, "skipped": skipped }))) info!(
source = %src_dir_display,
files_found,
attempted_paths = ?attempted_paths,
imported,
skipped,
force,
"demo import completed"
);
Ok(warp::reply::json(&serde_json::json!({
"imported": imported,
"skipped": skipped,
"files_found": files_found,
"source_dir": src_dir_display,
"attempted_paths": attempted_paths,
"force": force
})))
} }
async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> { async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> {
@ -251,10 +310,14 @@ async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Re
let path: String = row.get("path"); let path: String = row.get("path");
let _ = storage::delete_file(std::path::Path::new(&path)); let _ = storage::delete_file(std::path::Path::new(&path));
// Remove from Qdrant // Remove from Qdrant
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string()); let qdrant_url =
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
let qdrant = QdrantClient::new(&qdrant_url); let qdrant = QdrantClient::new(&qdrant_url);
let _ = qdrant.delete_point(&q.id).await; let _ = qdrant.delete_point(&q.id).await;
let _ = sqlx::query("DELETE FROM files WHERE id = ?").bind(&q.id).execute(&pool).await; let _ = sqlx::query("DELETE FROM files WHERE id = ?")
.bind(&q.id)
.execute(&pool)
.await;
return Ok(warp::reply::json(&serde_json::json!({"deleted": true}))); return Ok(warp::reply::json(&serde_json::json!({"deleted": true})));
} }
Ok(warp::reply::json(&serde_json::json!({"deleted": false}))) Ok(warp::reply::json(&serde_json::json!({"deleted": false})))
@ -268,7 +331,6 @@ async fn handle_list(pool: MySqlPool) -> Result<impl Reply, Rejection> {
tracing::error!("DB list error: {}", e); tracing::error!("DB list error: {}", e);
warp::reject() warp::reject()
})?; })?;
let files: Vec<serde_json::Value> = rows let files: Vec<serde_json::Value> = rows
.into_iter() .into_iter()
.map(|r| { .map(|r| {
@ -278,10 +340,12 @@ async fn handle_list(pool: MySqlPool) -> Result<impl Reply, Rejection> {
let description: Option<String> = r.get("description"); let description: Option<String> = r.get("description");
let pending: bool = r.get("pending_analysis"); let pending: bool = r.get("pending_analysis");
let status: Option<String> = r.try_get("analysis_status").ok(); let status: Option<String> = r.try_get("analysis_status").ok();
let storage_url = format!("/storage/{}", filename);
serde_json::json!({ serde_json::json!({
"id": id, "id": id,
"filename": filename, "filename": filename,
"path": path, "path": path,
"storage_url": storage_url,
"description": description, "description": description,
"pending_analysis": pending, "pending_analysis": pending,
"analysis_status": status "analysis_status": status
@ -292,7 +356,10 @@ async fn handle_list(pool: MySqlPool) -> Result<impl Reply, Rejection> {
Ok(warp::reply::json(&serde_json::json!({"files": files}))) Ok(warp::reply::json(&serde_json::json!({"files": files})))
} }
async fn handle_create_query(body: serde_json::Value, pool: MySqlPool) -> Result<impl Reply, Rejection> { async fn handle_create_query(
body: serde_json::Value,
pool: MySqlPool,
) -> Result<impl Reply, Rejection> {
// Insert query as queued, worker will pick it up // Insert query as queued, worker will pick it up
let id = uuid::Uuid::new_v4().to_string(); let id = uuid::Uuid::new_v4().to_string();
let payload = body; let payload = body;
@ -317,9 +384,11 @@ async fn handle_query_status(q: DeleteQuery, pool: MySqlPool) -> Result<impl Rep
.map_err(|_| warp::reject())? .map_err(|_| warp::reject())?
{ {
let status: String = row.get("status"); let status: String = row.get("status");
return Ok(warp::reply::json(&serde_json::json!({"status": status}))); return Ok(warp::reply::json(&serde_json::json!({"status": status})));
} }
Ok(warp::reply::json(&serde_json::json!({"status": "not_found"}))) Ok(warp::reply::json(
&serde_json::json!({"status": "not_found"}),
))
} }
async fn handle_query_result(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> { async fn handle_query_result(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> {

View file

@ -1,9 +1,9 @@
use crate::gemini_client::{demo_text_embedding, generate_text_with_model, DEMO_EMBED_DIM}; use crate::gemini_client::{demo_text_embedding, generate_text_with_model, DEMO_EMBED_DIM};
use crate::vector; use crate::vector;
use crate::vector_db::QdrantClient; use crate::vector_db::QdrantClient;
use sqlx::MySqlPool;
use anyhow::Result; use anyhow::Result;
use tracing::{info, error}; use sqlx::MySqlPool;
use tracing::{error, info};
pub struct FileWorker { pub struct FileWorker {
pool: MySqlPool, pool: MySqlPool,
@ -12,7 +12,8 @@ pub struct FileWorker {
impl FileWorker { impl FileWorker {
pub fn new(pool: MySqlPool) -> Self { pub fn new(pool: MySqlPool) -> Self {
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string()); let qdrant_url =
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
let qdrant = QdrantClient::new(&qdrant_url); let qdrant = QdrantClient::new(&qdrant_url);
Self { pool, qdrant } Self { pool, qdrant }
} }
@ -70,8 +71,8 @@ impl FileWorker {
.bind(file_id) .bind(file_id)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await?; .await?;
let filename: String = row.get("filename"); let filename: String = row.get("filename");
let _path: String = row.get("path"); let _path: String = row.get("path");
// Stage 1: Gemini 2.5 Flash for description // Stage 1: Gemini 2.5 Flash for description
let desc = generate_text_with_model( let desc = generate_text_with_model(
@ -82,11 +83,13 @@ impl FileWorker {
) )
.await .await
.unwrap_or_else(|e| format!("[desc error: {}]", e)); .unwrap_or_else(|e| format!("[desc error: {}]", e));
sqlx::query("UPDATE files SET description = ?, analysis_status = 'InProgress' WHERE id = ?") sqlx::query(
.bind(&desc) "UPDATE files SET description = ?, analysis_status = 'InProgress' WHERE id = ?",
.bind(file_id) )
.execute(&self.pool) .bind(&desc)
.await?; .bind(file_id)
.execute(&self.pool)
.await?;
// Stage 2: Gemini 2.5 Pro for deep vector graph data // Stage 2: Gemini 2.5 Pro for deep vector graph data
let vector_graph = generate_text_with_model( let vector_graph = generate_text_with_model(
@ -111,18 +114,22 @@ impl FileWorker {
} }
// Mark file as ready // Mark file as ready
sqlx::query("UPDATE files SET pending_analysis = FALSE, analysis_status = 'Completed' WHERE id = ?") sqlx::query(
.bind(file_id) "UPDATE files SET pending_analysis = FALSE, analysis_status = 'Completed' WHERE id = ?",
.execute(&self.pool) )
.await?; .bind(file_id)
.execute(&self.pool)
.await?;
Ok(()) Ok(())
} }
async fn mark_failed(&self, file_id: &str, reason: &str) -> Result<()> { async fn mark_failed(&self, file_id: &str, reason: &str) -> Result<()> {
sqlx::query("UPDATE files SET analysis_status = 'Failed', pending_analysis = TRUE WHERE id = ?") sqlx::query(
.bind(file_id) "UPDATE files SET analysis_status = 'Failed', pending_analysis = TRUE WHERE id = ?",
.execute(&self.pool) )
.await?; .bind(file_id)
.execute(&self.pool)
.await?;
sqlx::query("UPDATE files SET description = COALESCE(description, ?) WHERE id = ?") sqlx::query("UPDATE files SET description = COALESCE(description, ?) WHERE id = ?")
.bind(format!("[analysis failed: {}]", reason)) .bind(format!("[analysis failed: {}]", reason))
.bind(file_id) .bind(file_id)

View file

@ -63,13 +63,21 @@ pub async fn generate_text_with_model(model: &str, prompt: &str) -> Result<Strin
} }
#[derive(Deserialize)] #[derive(Deserialize)]
struct Part { text: Option<String> } struct Part {
text: Option<String>,
}
#[derive(Deserialize)] #[derive(Deserialize)]
struct Content { parts: Vec<Part> } struct Content {
parts: Vec<Part>,
}
#[derive(Deserialize)] #[derive(Deserialize)]
struct Candidate { content: Content } struct Candidate {
content: Content,
}
#[derive(Deserialize)] #[derive(Deserialize)]
struct Response { candidates: Option<Vec<Candidate>> } struct Response {
candidates: Option<Vec<Candidate>>,
}
let data: Response = serde_json::from_str(&txt).unwrap_or(Response { candidates: None }); let data: Response = serde_json::from_str(&txt).unwrap_or(Response { candidates: None });
let out = data let out = data

View file

@ -1,12 +1,12 @@
mod file_worker;
mod api; mod api;
mod db; mod db;
mod file_worker;
mod gemini_client; mod gemini_client;
mod models; mod models;
mod storage; mod storage;
mod vector; mod vector;
mod worker;
mod vector_db; mod vector_db;
mod worker;
use std::env; use std::env;
use std::error::Error; use std::error::Error;
@ -30,7 +30,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
storage::ensure_storage_dir().expect("storage dir"); storage::ensure_storage_dir().expect("storage dir");
// Initialize DB // Initialize DB
let pool = db::init_db(&database_url).await.map_err(|e| -> Box<dyn Error> { Box::new(e) })?; let pool = db::init_db(&database_url)
.await
.map_err(|e| -> Box<dyn Error> { Box::new(e) })?;
// Spawn query worker // Spawn query worker
let worker = worker::Worker::new(pool.clone()); let worker = worker::Worker::new(pool.clone());
@ -42,17 +44,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// API routes // API routes
let api_routes = api::routes(pool.clone()) let api_routes = api::routes(pool.clone())
.with(warp::cors() .with(
.allow_any_origin() warp::cors()
.allow_headers(vec!["content-type", "authorization"]) .allow_any_origin()
.allow_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"])) .allow_headers(vec!["content-type", "authorization"])
.allow_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]),
)
.with(warp::log("rust_engine")); .with(warp::log("rust_engine"));
info!("Rust Engine started on http://0.0.0.0:8000"); info!("Rust Engine started on http://0.0.0.0:8000");
warp::serve(api_routes) warp::serve(api_routes).run(([0, 0, 0, 0], 8000)).await;
.run(([0, 0, 0, 0], 8000))
.await;
Ok(()) Ok(())
} }

View file

@ -15,7 +15,11 @@ pub struct FileRecord {
impl FileRecord { impl FileRecord {
#[allow(dead_code)] #[allow(dead_code)]
pub fn new(filename: impl Into<String>, path: impl Into<String>, description: Option<String>) -> Self { pub fn new(
filename: impl Into<String>,
path: impl Into<String>,
description: Option<String>,
) -> Self {
Self { Self {
id: Uuid::new_v4().to_string(), id: Uuid::new_v4().to_string(),
filename: filename.into(), filename: filename.into(),

View file

@ -1,7 +1,7 @@
use anyhow::Result; use anyhow::Result;
use reqwest::Client; use reqwest::Client;
use serde_json::json;
use serde::Deserialize; use serde::Deserialize;
use serde_json::json;
#[derive(Clone)] #[derive(Clone)]
pub struct QdrantClient { pub struct QdrantClient {
@ -10,7 +10,6 @@ pub struct QdrantClient {
} }
impl QdrantClient { impl QdrantClient {
/// Delete a point from collection 'files' by id /// Delete a point from collection 'files' by id
pub async fn delete_point(&self, id: &str) -> Result<()> { pub async fn delete_point(&self, id: &str) -> Result<()> {
let url = format!("{}/collections/files/points/delete", self.base); let url = format!("{}/collections/files/points/delete", self.base);
@ -67,7 +66,11 @@ impl QdrantClient {
} else { } else {
let status = resp.status(); let status = resp.status();
let t = resp.text().await.unwrap_or_default(); let t = resp.text().await.unwrap_or_default();
Err(anyhow::anyhow!("qdrant ensure collection failed: {} - {}", status, t)) Err(anyhow::anyhow!(
"qdrant ensure collection failed: {} - {}",
status,
t
))
} }
} }
@ -85,9 +88,14 @@ impl QdrantClient {
return Err(anyhow::anyhow!("qdrant search failed: {} - {}", status, t)); return Err(anyhow::anyhow!("qdrant search failed: {} - {}", status, t));
} }
#[derive(Deserialize)] #[derive(Deserialize)]
struct Hit { id: serde_json::Value, score: f32 } struct Hit {
id: serde_json::Value,
score: f32,
}
#[derive(Deserialize)] #[derive(Deserialize)]
struct Data { result: Vec<Hit> } struct Data {
result: Vec<Hit>,
}
let data: Data = resp.json().await?; let data: Data = resp.json().await?;
let mut out = Vec::new(); let mut out = Vec::new();
for h in data.result { for h in data.result {

View file

@ -14,7 +14,8 @@ pub struct Worker {
impl Worker { impl Worker {
pub fn new(pool: MySqlPool) -> Self { pub fn new(pool: MySqlPool) -> Self {
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string()); let qdrant_url =
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
let qdrant = QdrantClient::new(&qdrant_url); let qdrant = QdrantClient::new(&qdrant_url);
Self { pool, qdrant } Self { pool, qdrant }
} }
@ -56,18 +57,22 @@ impl Worker {
async fn fetch_and_claim(&self) -> Result<Option<QueryRecord>> { async fn fetch_and_claim(&self) -> Result<Option<QueryRecord>> {
// Note: MySQL transactional SELECT FOR UPDATE handling is more complex; for this hackathon scaffold // Note: MySQL transactional SELECT FOR UPDATE handling is more complex; for this hackathon scaffold
// we do a simple two-step: select one queued id, then update it to InProgress if it is still queued. // we do a simple two-step: select one queued id, then update it to InProgress if it is still queued.
if let Some(row) = sqlx::query("SELECT id, payload FROM queries WHERE status = 'Queued' ORDER BY created_at LIMIT 1") if let Some(row) = sqlx::query(
.fetch_optional(&self.pool) "SELECT id, payload FROM queries WHERE status = 'Queued' ORDER BY created_at LIMIT 1",
.await? )
.fetch_optional(&self.pool)
.await?
{ {
use sqlx::Row; use sqlx::Row;
let id: String = row.get("id"); let id: String = row.get("id");
let payload: serde_json::Value = row.get("payload"); let payload: serde_json::Value = row.get("payload");
let updated = sqlx::query("UPDATE queries SET status = 'InProgress' WHERE id = ? AND status = 'Queued'") let updated = sqlx::query(
.bind(&id) "UPDATE queries SET status = 'InProgress' WHERE id = ? AND status = 'Queued'",
.execute(&self.pool) )
.await?; .bind(&id)
.execute(&self.pool)
.await?;
if updated.rows_affected() == 1 { if updated.rows_affected() == 1 {
let mut q = QueryRecord::new(payload); let mut q = QueryRecord::new(payload);
@ -90,7 +95,9 @@ impl Worker {
let top_k = top_k.max(1).min(20); let top_k = top_k.max(1).min(20);
// Check cancellation // Check cancellation
if self.is_cancelled(&q.id).await? { return Ok(()); } if self.is_cancelled(&q.id).await? {
return Ok(());
}
// Stage 3: search top-K in Qdrant // Stage 3: search top-K in Qdrant
let hits = match self.qdrant.search_top_k(emb.clone(), top_k).await { let hits = match self.qdrant.search_top_k(emb.clone(), top_k).await {
@ -115,7 +122,9 @@ impl Worker {
}; };
// Check cancellation // Check cancellation
if self.is_cancelled(&q.id).await? { return Ok(()); } if self.is_cancelled(&q.id).await? {
return Ok(());
}
// Stage 4: fetch file metadata for IDs // Stage 4: fetch file metadata for IDs
let mut files_json = Vec::new(); let mut files_json = Vec::new();
@ -222,13 +231,18 @@ impl Worker {
} }
fn build_relationships_prompt(query: &str, files: &Vec<serde_json::Value>) -> String { fn build_relationships_prompt(query: &str, files: &Vec<serde_json::Value>) -> String {
let files_snippets: Vec<String> = files.iter().map(|f| format!( let files_snippets: Vec<String> = files
"- id: {id}, filename: {name}, path: {path}, desc: {desc}", .iter()
id=f.get("id").and_then(|v| v.as_str()).unwrap_or(""), .map(|f| {
name=f.get("filename").and_then(|v| v.as_str()).unwrap_or(""), format!(
path=f.get("path").and_then(|v| v.as_str()).unwrap_or(""), "- id: {id}, filename: {name}, path: {path}, desc: {desc}",
desc=f.get("description").and_then(|v| v.as_str()).unwrap_or("") id = f.get("id").and_then(|v| v.as_str()).unwrap_or(""),
)).collect(); name = f.get("filename").and_then(|v| v.as_str()).unwrap_or(""),
path = f.get("path").and_then(|v| v.as_str()).unwrap_or(""),
desc = f.get("description").and_then(|v| v.as_str()).unwrap_or("")
)
})
.collect();
format!( format!(
"You are an assistant analyzing relationships STRICTLY within the provided files.\n\ "You are an assistant analyzing relationships STRICTLY within the provided files.\n\
Query: {query}\n\ Query: {query}\n\
@ -243,12 +257,21 @@ fn build_relationships_prompt(query: &str, files: &Vec<serde_json::Value>) -> St
) )
} }
fn build_final_answer_prompt(query: &str, files: &Vec<serde_json::Value>, relationships: &str) -> String { fn build_final_answer_prompt(
let files_short: Vec<String> = files.iter().map(|f| format!( query: &str,
"- {name} ({id})", files: &Vec<serde_json::Value>,
id=f.get("id").and_then(|v| v.as_str()).unwrap_or(""), relationships: &str,
name=f.get("filename").and_then(|v| v.as_str()).unwrap_or("") ) -> String {
)).collect(); let files_short: Vec<String> = files
.iter()
.map(|f| {
format!(
"- {name} ({id})",
id = f.get("id").and_then(|v| v.as_str()).unwrap_or(""),
name = f.get("filename").and_then(|v| v.as_str()).unwrap_or("")
)
})
.collect();
format!( format!(
"You are to compose a final answer to the user query using only the information from the files.\n\ "You are to compose a final answer to the user query using only the information from the files.\n\
Query: {query}\n\ Query: {query}\n\

View file

@ -15,6 +15,7 @@ const RUST_ENGINE_BASE =
process.env.RUST_ENGINE_BASE || process.env.RUST_ENGINE_BASE ||
process.env.RUST_ENGINE_URL || process.env.RUST_ENGINE_URL ||
'http://rust-engine:8000'; 'http://rust-engine:8000';
const STORAGE_DIR = path.resolve(process.env.ASTRA_STORAGE || '/app/storage');
app.set('trust proxy', true); app.set('trust proxy', true);
app.use(helmet({ contentSecurityPolicy: false })); app.use(helmet({ contentSecurityPolicy: false }));
@ -43,6 +44,9 @@ app.post('/api/files/import-demo', async (req, res) => {
const distDir = path.resolve(__dirname, 'dist'); const distDir = path.resolve(__dirname, 'dist');
app.use(express.static(distDir)); app.use(express.static(distDir));
// Expose imported files for the UI (read-only)
app.use('/storage', express.static(STORAGE_DIR));
// SPA fallback (Express 5 requires middleware instead of bare '*') // SPA fallback (Express 5 requires middleware instead of bare '*')
app.use((req, res) => { app.use((req, res) => {
res.sendFile(path.join(distDir, 'index.html')); res.sendFile(path.join(distDir, 'index.html'));