Correct demo data importing. Add significant debugging.

This commit is contained in:
Christbru 2025-10-19 10:04:34 -05:00
commit 18b96382f2
4 changed files with 224 additions and 167 deletions

View file

@ -2,11 +2,10 @@
# rust-engine/Dockerfile
# --- Stage 1: Builder ---
# Use a stable Rust version
# (No changes needed in this stage)
FROM rust:slim AS builder
WORKDIR /usr/src/app
# Install build dependencies needed for sqlx
RUN apt-get update && apt-get install -y --no-install-recommends \
pkg-config \
libssl-dev \
@ -15,17 +14,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# Allow optional override of toolchain (e.g., nightly or a pinned version). Leave empty to use image default.
ARG RUSTUP_TOOLCHAIN=
# Use rustup and cargo from the official Rust image location
ENV PATH="/usr/local/cargo/bin:${PATH}"
# Copy manifest files first to leverage Docker layer caching for dependencies
COPY Cargo.toml Cargo.lock rust-toolchain.toml ./
# Ensure the pinned toolchain from rust-toolchain.toml (or provided ARG) is installed only if missing
RUN set -eux; \
if [ -n "${RUSTUP_TOOLCHAIN}" ]; then \
if ! rustup toolchain list | grep -q "^${RUSTUP_TOOLCHAIN}"; then \
@ -45,46 +38,38 @@ RUN set -eux; \
fi; \
rustup show active-toolchain || true
# Create a dummy src to allow cargo to download dependencies into the cache layer
RUN mkdir -p src && echo "fn main() { println!(\"cargo cache build\"); }" > src/main.rs
# Warm up dependency caches without compiling a dummy binary
RUN --mount=type=cache,target=/usr/local/cargo/registry,sharing=locked \
--mount=type=cache,target=/usr/local/cargo/git,sharing=locked \
cargo fetch
# Remove dummy main.rs before copying the real source
RUN rm -f src/main.rs
COPY src ./src
# Build the real binary
RUN cargo build --release --locked
# --- Stage 2: Final, small image ---
FROM debian:bookworm-slim
# Install only necessary runtime dependencies (no upgrade, just ca-certificates)
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/*
# Add a non-root user for security
RUN useradd --system --uid 10001 --no-create-home --shell /usr/sbin/nologin appuser
# Copy the compiled binary from the builder stage
COPY --from=builder /usr/src/app/target/release/rust-engine /usr/local/bin/rust-engine
# Create writable storage and logs directories for appuser
# --- THIS IS THE FIX ---
# **1. Copy the demo data files from your local machine into the image.**
COPY demo-data /app/demo-data
# **2. Create other directories and set permissions on everything.**
RUN chown appuser:appuser /usr/local/bin/rust-engine \
&& mkdir -p /var/log /app/storage /app/demo-data \
&& mkdir -p /var/log /app/storage \
&& touch /var/log/astra-errors.log \
&& chown -R appuser:appuser /var/log /app
# Set working directory to a writable location
WORKDIR /app
# Switch to non-root user
USER appuser
EXPOSE 8000
# Redirect all output to /var/log/astra-errors.log for easy monitoring
ENTRYPOINT ["/bin/sh", "-c", "/usr/local/bin/rust-engine >> /var/log/astra-errors.log 2>&1"]

View file

@ -13,6 +13,7 @@
- GEMINI_API_KEY: used for Gemini content generation (optional in demo)
- DEMO_DATA_DIR: path to the folder containing PDF demo data (default resolves to `demo-data` under the repo or `/app/demo-data` in containers)
- ASTRA_STORAGE: directory for uploaded file blobs (default `/app/storage`)
- AUTO_IMPORT_DEMO: set to `false`, `0`, `off`, or `no` to disable automatic demo import at startup (defaults to `true`)
## Endpoints (JSON)
@ -74,7 +75,7 @@
2. set env DATABASE_URL and QDRANT_URL
3. cargo run
4. (optional) import demo PDFs
- Populate a folder with PDFs under `rust-engine/demo-data` (or point `DEMO_DATA_DIR` to a custom path). The server auto-resolves common locations such as the repo root, `/app/demo-data`, and the working directory when running in Docker.
- Populate a folder with PDFs under `rust-engine/demo-data` (or point `DEMO_DATA_DIR` to a custom path). The server auto-resolves common locations such as the repo root, `/app/demo-data`, and the working directory when running in Docker. When the engine boots it automatically attempts this import (can be disabled by setting `AUTO_IMPORT_DEMO=false`).
- Call the endpoint:
- POST <http://localhost:8000/api/files/import-demo>
- Optional query `?force=1` to overwrite existing by filename. The JSON response also echoes where the engine looked (`source_dir`, `attempted_paths`) and how many PDFs were detected (`files_found`) so misconfigurations are easy to spot. Imported files are written to the shared `/app/storage` volume; the web-app container mounts this volume read-only and serves the contents at `/storage/<filename>`.

View file

@ -3,9 +3,9 @@ use crate::vector_db::QdrantClient;
use anyhow::Result;
use bytes::Buf;
use futures_util::TryStreamExt;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use sqlx::{MySqlPool, Row};
use tracing::info;
use tracing::{debug, info, warn};
use warp::{multipart::FormData, Filter, Rejection, Reply};
#[derive(Debug, Deserialize)]
@ -13,6 +13,177 @@ struct DeleteQuery {
id: String,
}
#[derive(Debug, Serialize)]
pub struct DemoImportSummary {
pub imported: usize,
pub skipped: usize,
pub files_found: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_dir: Option<String>,
pub attempted_paths: Vec<String>,
pub force: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
pub async fn perform_demo_import(force: bool, pool: &MySqlPool) -> Result<DemoImportSummary> {
use anyhow::Context;
use std::fs;
use std::path::PathBuf;
let demo_dir_setting =
std::env::var("DEMO_DATA_DIR").unwrap_or_else(|_| "demo-data".to_string());
info!(force, requested_dir = %demo_dir_setting, "demo import requested");
let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
// Build a list of plausible demo-data locations so local runs and containers both work.
let mut candidates: Vec<PathBuf> = Vec::new();
let configured = PathBuf::from(&demo_dir_setting);
let mut push_candidate = |path: PathBuf| {
if !candidates.iter().any(|existing| existing == &path) {
candidates.push(path);
}
};
push_candidate(base.join(&configured));
push_candidate(PathBuf::from(&demo_dir_setting));
push_candidate(base.join("rust-engine").join(&configured));
push_candidate(base.join("rust-engine").join("demo-data"));
push_candidate(base.join("demo-data"));
if let Ok(exe_path) = std::env::current_exe() {
if let Some(exe_dir) = exe_path.parent() {
push_candidate(exe_dir.join(&configured));
push_candidate(exe_dir.join("demo-data"));
push_candidate(exe_dir.join("rust-engine").join(&configured));
}
}
let mut attempted: Vec<PathBuf> = Vec::new();
let mut resolved_dir: Option<PathBuf> = None;
for candidate in candidates {
debug!(candidate = %candidate.display(), "evaluating demo import path candidate");
if candidate.exists() && candidate.is_dir() {
resolved_dir = Some(candidate.clone());
break;
}
attempted.push(candidate);
}
let attempted_paths: Vec<String> = attempted.iter().map(|p| p.display().to_string()).collect();
let mut summary = DemoImportSummary {
imported: 0,
skipped: 0,
files_found: 0,
source_dir: resolved_dir.as_ref().map(|p| p.display().to_string()),
attempted_paths,
force,
error: None,
};
let src_dir = match resolved_dir {
Some(path) => path,
None => {
summary.error =
Some("demo dir not found; set DEMO_DATA_DIR or bind mount demo PDFs".to_string());
warn!(
attempted_paths = ?summary.attempted_paths,
"demo import skipped; source directory not found"
);
return Ok(summary);
}
};
summary.source_dir = Some(src_dir.display().to_string());
info!(
source = %summary.source_dir.as_deref().unwrap_or_default(),
attempted_paths = ?summary.attempted_paths,
"demo import source resolved"
);
for entry in fs::read_dir(&src_dir).with_context(|| format!("reading {:?}", src_dir))? {
let entry = entry.with_context(|| format!("reading entry in {:?}", src_dir))?;
let path = entry.path();
if path
.extension()
.and_then(|e| e.to_str())
.map(|e| e.eq_ignore_ascii_case("pdf"))
.unwrap_or(false)
{
summary.files_found += 1;
let filename = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown.pdf")
.to_string();
if !force {
if let Some(_) = sqlx::query("SELECT id FROM files WHERE filename = ?")
.bind(&filename)
.fetch_optional(pool)
.await?
{
summary.skipped += 1;
info!(%filename, "skipping demo import; already present");
continue;
}
}
let data = fs::read(&path).with_context(|| format!("reading {:?}", path))?;
let stored_path = storage::save_file(&filename, &data)?;
info!(%filename, dest = %stored_path.to_string_lossy(), "demo file copied to storage");
let id = uuid::Uuid::new_v4().to_string();
if force {
sqlx::query("DELETE FROM files WHERE filename = ?")
.bind(&filename)
.execute(pool)
.await?;
info!(%filename, "existing file records removed due to force import");
}
sqlx::query("INSERT INTO files (id, filename, path, description, pending_analysis, analysis_status) VALUES (?, ?, ?, ?, ?, 'Queued')")
.bind(&id)
.bind(&filename)
.bind(stored_path.to_string_lossy().to_string())
.bind(Option::<String>::None)
.bind(true)
.execute(pool)
.await?;
info!(%filename, file_id = %id, "demo file inserted into database");
sqlx::query("INSERT INTO file_jobs (file_id, status) VALUES (?, 'Queued')")
.bind(&id)
.execute(pool)
.await?;
info!(%filename, file_id = %id, "demo file queued for analysis");
summary.imported += 1;
} else {
debug!(path = %path.display(), "skipping non-PDF entry during demo import");
}
}
let source_label = summary.source_dir.as_deref().unwrap_or("unknown");
if summary.files_found == 0 {
warn!(source = %source_label, "demo import located zero PDFs");
}
info!(
source = %source_label,
files_found = summary.files_found,
attempted_paths = ?summary.attempted_paths,
imported = summary.imported,
skipped = summary.skipped,
force,
"demo import completed"
);
Ok(summary)
}
pub fn routes(pool: MySqlPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
let pool_filter = warp::any().map(move || pool.clone());
@ -150,154 +321,27 @@ async fn handle_import_demo(
params: std::collections::HashMap<String, String>,
pool: MySqlPool,
) -> Result<impl Reply, Rejection> {
use std::fs;
use std::path::PathBuf;
let force = params
.get("force")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let demo_dir_setting =
std::env::var("DEMO_DATA_DIR").unwrap_or_else(|_| "demo-data".to_string());
let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
info!(force, requested_dir = %demo_dir_setting, "demo import requested");
// Build a list of plausible demo-data locations so local runs and containers both work.
let mut candidates: Vec<PathBuf> = Vec::new();
let configured = PathBuf::from(&demo_dir_setting);
let mut push_candidate = |path: PathBuf| {
if !candidates.iter().any(|existing| existing == &path) {
candidates.push(path);
}
};
push_candidate(base.join(&configured));
push_candidate(PathBuf::from(&demo_dir_setting));
push_candidate(base.join("rust-engine").join(&configured));
push_candidate(base.join("rust-engine").join("demo-data"));
push_candidate(base.join("demo-data"));
if let Ok(exe_path) = std::env::current_exe() {
if let Some(exe_dir) = exe_path.parent() {
push_candidate(exe_dir.join(&configured));
push_candidate(exe_dir.join("demo-data"));
push_candidate(exe_dir.join("rust-engine").join(&configured));
match perform_demo_import(force, &pool).await {
Ok(summary) => Ok(warp::reply::json(&summary)),
Err(err) => {
tracing::error!(error = %err, "demo import failed");
let fallback = DemoImportSummary {
imported: 0,
skipped: 0,
files_found: 0,
source_dir: None,
attempted_paths: Vec::new(),
force,
error: Some(err.to_string()),
};
Ok(warp::reply::json(&fallback))
}
}
let mut attempted: Vec<PathBuf> = Vec::new();
let mut resolved_dir: Option<PathBuf> = None;
for candidate in candidates {
if candidate.exists() && candidate.is_dir() {
resolved_dir = Some(candidate.clone());
break;
}
attempted.push(candidate);
}
let attempted_paths: Vec<String> = attempted.iter().map(|p| p.display().to_string()).collect();
let src_dir = match resolved_dir {
Some(path) => path,
None => {
return Ok(warp::reply::json(&serde_json::json!({
"imported": 0,
"skipped": 0,
"files_found": 0,
"attempted_paths": attempted_paths,
"error": format!("demo dir not found; set DEMO_DATA_DIR or bind mount demo PDFs")
})));
}
};
let src_dir_display = src_dir.display().to_string();
info!(source = %src_dir_display, attempted_paths = ?attempted_paths, "demo import source resolved");
let mut imported = 0;
let mut skipped = 0;
let mut files_found = 0;
for entry in fs::read_dir(&src_dir).map_err(|_| warp::reject())? {
let entry = entry.map_err(|_| warp::reject())?;
let path = entry.path();
if path
.extension()
.and_then(|e| e.to_str())
.map(|e| e.eq_ignore_ascii_case("pdf"))
.unwrap_or(false)
{
files_found += 1;
let filename = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown.pdf")
.to_string();
// check if exists
if !force {
if let Some(_) = sqlx::query("SELECT id FROM files WHERE filename = ?")
.bind(&filename)
.fetch_optional(&pool)
.await
.map_err(|_| warp::reject())?
{
skipped += 1;
info!(%filename, "skipping demo import; already present");
continue;
}
}
// read and save to storage
let data = fs::read(&path).map_err(|_| warp::reject())?;
let stored_path = storage::save_file(&filename, &data).map_err(|_| warp::reject())?;
info!(%filename, dest = %stored_path.to_string_lossy(), "demo file copied to storage");
// insert or upsert db record
let id = uuid::Uuid::new_v4().to_string();
if force {
let _ = sqlx::query("DELETE FROM files WHERE filename = ?")
.bind(&filename)
.execute(&pool)
.await;
info!(%filename, "existing file records removed due to force import");
}
sqlx::query("INSERT INTO files (id, filename, path, description, pending_analysis, analysis_status) VALUES (?, ?, ?, ?, ?, 'Queued')")
.bind(&id)
.bind(&filename)
.bind(stored_path.to_string_lossy().to_string())
.bind(Option::<String>::None)
.bind(true)
.execute(&pool)
.await
.map_err(|e| {
tracing::error!("DB insert error: {}", e);
warp::reject()
})?;
info!(%filename, file_id = %id, "demo file inserted into database");
// queue for worker
sqlx::query("INSERT INTO file_jobs (file_id, status) VALUES (?, 'Queued')")
.bind(&id)
.execute(&pool)
.await
.map_err(|_| warp::reject())?;
info!(%filename, file_id = %id, "demo file queued for analysis");
imported += 1;
}
}
info!(
source = %src_dir_display,
files_found,
attempted_paths = ?attempted_paths,
imported,
skipped,
force,
"demo import completed"
);
Ok(warp::reply::json(&serde_json::json!({
"imported": imported,
"skipped": skipped,
"files_found": files_found,
"source_dir": src_dir_display,
"attempted_paths": attempted_paths,
"force": force
})))
}
async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> {

View file

@ -10,7 +10,7 @@ mod worker;
use std::env;
use std::error::Error;
use tracing::info;
use tracing::{error, info, warn};
use warp::Filter;
#[tokio::main]
@ -34,6 +34,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await
.map_err(|e| -> Box<dyn Error> { Box::new(e) })?;
let auto_import_setting = env::var("AUTO_IMPORT_DEMO").unwrap_or_else(|_| "true".to_string());
let auto_import = !matches!(
auto_import_setting.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "off" | "no"
);
if auto_import {
match api::perform_demo_import(false, &pool).await {
Ok(summary) => {
if let Some(err_msg) = summary.error.as_ref() {
warn!(error = %err_msg, "startup demo import completed with warnings");
}
info!(
imported = summary.imported,
skipped = summary.skipped,
files_found = summary.files_found,
source = summary.source_dir.as_deref().unwrap_or("unknown"),
"startup demo import completed"
);
}
Err(err) => {
error!(error = %err, "startup demo import failed");
}
}
} else {
info!("AUTO_IMPORT_DEMO disabled; skipping startup demo import");
}
// Spawn query worker
let worker = worker::Worker::new(pool.clone());
tokio::spawn(async move { worker.run().await });