Merge branch 'rust-dev'
This commit is contained in:
commit
b0d4eb86c1
15 changed files with 492 additions and 201 deletions
101
.github/workflows/build-and-deploy-fallback.yml
vendored
Normal file
101
.github/workflows/build-and-deploy-fallback.yml
vendored
Normal file
|
|
@ -0,0 +1,101 @@
|
||||||
|
# .github/workflows/build-and-deploy.yml
|
||||||
|
|
||||||
|
name: Build and Deploy
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: ["gemini"]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build-and-deploy:
|
||||||
|
permissions:
|
||||||
|
contents: read
|
||||||
|
packages: write
|
||||||
|
|
||||||
|
name: Build Images and Deploy to Server
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Set repo name to lowercase
|
||||||
|
id: repo_name
|
||||||
|
run: echo "name=$(echo '${{ github.repository }}' | tr '[:upper:]' '[:lower:]')" >> $GITHUB_OUTPUT
|
||||||
|
|
||||||
|
- name: Log in to GitHub Container Registry
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
registry: ghcr.io
|
||||||
|
username: ${{ github.actor }}
|
||||||
|
password: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
- name: Create web-app .env file
|
||||||
|
run: echo 'GEMINI_API_KEY=${{ secrets.GEMINI_API_KEY }}' > web-app/.env
|
||||||
|
|
||||||
|
- name: Build and push web-app image 🚀
|
||||||
|
uses: docker/build-push-action@v6
|
||||||
|
with:
|
||||||
|
context: ./web-app
|
||||||
|
push: true
|
||||||
|
tags: ghcr.io/${{ steps.repo_name.outputs.name }}/web-app:${{ github.sha }}
|
||||||
|
cache-from: type=gha,scope=web-app
|
||||||
|
cache-to: type=gha,mode=max,scope=web-app
|
||||||
|
|
||||||
|
- name: Ensure remote deploy directory exists
|
||||||
|
uses: appleboy/ssh-action@v1.0.3
|
||||||
|
with:
|
||||||
|
host: ${{ secrets.SERVER_HOST }}
|
||||||
|
username: ${{ secrets.SERVER_USERNAME }}
|
||||||
|
key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||||
|
script: |
|
||||||
|
mkdir -p /home/github-actions/codered-astra
|
||||||
|
|
||||||
|
- name: Upload compose files to server
|
||||||
|
uses: appleboy/scp-action@v0.1.7
|
||||||
|
with:
|
||||||
|
host: ${{ secrets.SERVER_HOST }}
|
||||||
|
username: ${{ secrets.SERVER_USERNAME }}
|
||||||
|
key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||||
|
source: "docker-compose.yml,docker-compose.prod.yml"
|
||||||
|
target: "/home/github-actions/codered-astra/"
|
||||||
|
|
||||||
|
- name: Deploy to server via SSH ☁️
|
||||||
|
uses: appleboy/ssh-action@v1.0.3
|
||||||
|
env:
|
||||||
|
RUNNER_GH_ACTOR: ${{ github.actor }}
|
||||||
|
RUNNER_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
with:
|
||||||
|
host: ${{ secrets.SERVER_HOST }}
|
||||||
|
username: ${{ secrets.SERVER_USERNAME }}
|
||||||
|
key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||||
|
# pass selected env vars to the remote shell so docker login works
|
||||||
|
envs: RUNNER_GITHUB_TOKEN,RUNNER_GH_ACTOR
|
||||||
|
debug: true
|
||||||
|
script: |
|
||||||
|
cd /home/github-actions/codered-astra
|
||||||
|
chmod -R o+rX rust-engine/demo-data
|
||||||
|
# wrapper to support both Docker Compose v2 and legacy v1
|
||||||
|
compose() { docker compose "$@" || docker-compose "$@"; }
|
||||||
|
# Log in to GHCR using the run's GITHUB_TOKEN so compose can pull images.
|
||||||
|
if [ -n "$RUNNER_GITHUB_TOKEN" ] && [ -n "$RUNNER_GH_ACTOR" ]; then
|
||||||
|
echo "$RUNNER_GITHUB_TOKEN" | docker login ghcr.io -u "$RUNNER_GH_ACTOR" --password-stdin || true
|
||||||
|
fi
|
||||||
|
export REPO_NAME_LOWER='${{ steps.repo_name.outputs.name }}'
|
||||||
|
export GEMINI_API_KEY='${{ secrets.GEMINI_API_KEY }}'
|
||||||
|
export MYSQL_DATABASE='${{ secrets.MYSQL_DATABASE }}'
|
||||||
|
export MYSQL_USER='${{ secrets.MYSQL_USER }}'
|
||||||
|
export MYSQL_PASSWORD='${{ secrets.MYSQL_PASSWORD }}'
|
||||||
|
export MYSQL_ROOT_PASSWORD='${{ secrets.MYSQL_ROOT_PASSWORD }}'
|
||||||
|
export IMAGE_TAG=${{ github.sha }}
|
||||||
|
# Stop and remove old containers before pulling new images
|
||||||
|
compose -f docker-compose.prod.yml down
|
||||||
|
# Clear previous logs for a clean deployment log
|
||||||
|
: > ~/astra-logs/astra-errors.log || true
|
||||||
|
compose -f docker-compose.prod.yml pull
|
||||||
|
compose -f docker-compose.prod.yml up -d
|
||||||
|
# Security hygiene: remove GHCR credentials after pulling
|
||||||
|
docker logout ghcr.io || true
|
||||||
3
.github/workflows/build-and-deploy.yml
vendored
3
.github/workflows/build-and-deploy.yml
vendored
|
|
@ -69,7 +69,7 @@ jobs:
|
||||||
host: ${{ secrets.SERVER_HOST }}
|
host: ${{ secrets.SERVER_HOST }}
|
||||||
username: ${{ secrets.SERVER_USERNAME }}
|
username: ${{ secrets.SERVER_USERNAME }}
|
||||||
key: ${{ secrets.SSH_PRIVATE_KEY }}
|
key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||||
source: "docker-compose.yml,docker-compose.prod.yml"
|
source: "docker-compose.yml,docker-compose.prod.yml,rust-engine/demo-data"
|
||||||
target: "/home/github-actions/codered-astra/"
|
target: "/home/github-actions/codered-astra/"
|
||||||
|
|
||||||
- name: Deploy to server via SSH ☁️
|
- name: Deploy to server via SSH ☁️
|
||||||
|
|
@ -86,6 +86,7 @@ jobs:
|
||||||
debug: true
|
debug: true
|
||||||
script: |
|
script: |
|
||||||
cd /home/github-actions/codered-astra
|
cd /home/github-actions/codered-astra
|
||||||
|
chmod -R o+rX rust-engine/demo-data
|
||||||
# wrapper to support both Docker Compose v2 and legacy v1
|
# wrapper to support both Docker Compose v2 and legacy v1
|
||||||
compose() { docker compose "$@" || docker-compose "$@"; }
|
compose() { docker compose "$@" || docker-compose "$@"; }
|
||||||
# Log in to GHCR using the run's GITHUB_TOKEN so compose can pull images.
|
# Log in to GHCR using the run's GITHUB_TOKEN so compose can pull images.
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,8 @@ services:
|
||||||
- DATABASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql:3306/${MYSQL_DATABASE}
|
- DATABASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql:3306/${MYSQL_DATABASE}
|
||||||
- RUST_ENGINE_URL=http://rust-engine:8000
|
- RUST_ENGINE_URL=http://rust-engine:8000
|
||||||
- GEMINI_API_KEY=${GEMINI_API_KEY}
|
- GEMINI_API_KEY=${GEMINI_API_KEY}
|
||||||
|
volumes:
|
||||||
|
- /home/github-actions/codered-astra/rust-engine/demo-data:/app/storage:ro
|
||||||
depends_on:
|
depends_on:
|
||||||
- mysql
|
- mysql
|
||||||
- rust-engine
|
- rust-engine
|
||||||
|
|
@ -28,7 +30,7 @@ services:
|
||||||
volumes:
|
volumes:
|
||||||
- ~/astra-logs:/var/log
|
- ~/astra-logs:/var/log
|
||||||
- rust-storage:/app/storage
|
- rust-storage:/app/storage
|
||||||
- /var/www/codered-astra/rust-engine/demo-data:/app/demo-data:ro
|
- /home/github-actions/codered-astra/rust-engine/demo-data:/app/demo-data:ro
|
||||||
|
|
||||||
mysql:
|
mysql:
|
||||||
image: mysql:8.0
|
image: mysql:8.0
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,8 @@ services:
|
||||||
- DATABASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql:3306/${MYSQL_DATABASE}
|
- DATABASE_URL=mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mysql:3306/${MYSQL_DATABASE}
|
||||||
- RUST_ENGINE_URL=http://rust-engine:8000
|
- RUST_ENGINE_URL=http://rust-engine:8000
|
||||||
- GEMINI_API_KEY=${GEMINI_API_KEY}
|
- GEMINI_API_KEY=${GEMINI_API_KEY}
|
||||||
|
volumes:
|
||||||
|
- rust-storage:/app/storage:ro
|
||||||
depends_on:
|
depends_on:
|
||||||
- mysql # <-- Updated dependency
|
- mysql # <-- Updated dependency
|
||||||
- rust-engine
|
- rust-engine
|
||||||
|
|
|
||||||
|
|
@ -2,11 +2,10 @@
|
||||||
# rust-engine/Dockerfile
|
# rust-engine/Dockerfile
|
||||||
|
|
||||||
# --- Stage 1: Builder ---
|
# --- Stage 1: Builder ---
|
||||||
# Use a stable Rust version
|
# (No changes needed in this stage)
|
||||||
FROM rust:slim AS builder
|
FROM rust:slim AS builder
|
||||||
WORKDIR /usr/src/app
|
WORKDIR /usr/src/app
|
||||||
|
|
||||||
# Install build dependencies needed for sqlx
|
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
pkg-config \
|
pkg-config \
|
||||||
libssl-dev \
|
libssl-dev \
|
||||||
|
|
@ -15,17 +14,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
ca-certificates \
|
ca-certificates \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
|
||||||
# Allow optional override of toolchain (e.g., nightly or a pinned version). Leave empty to use image default.
|
|
||||||
ARG RUSTUP_TOOLCHAIN=
|
ARG RUSTUP_TOOLCHAIN=
|
||||||
|
|
||||||
# Use rustup and cargo from the official Rust image location
|
|
||||||
ENV PATH="/usr/local/cargo/bin:${PATH}"
|
ENV PATH="/usr/local/cargo/bin:${PATH}"
|
||||||
|
|
||||||
# Copy manifest files first to leverage Docker layer caching for dependencies
|
|
||||||
COPY Cargo.toml Cargo.lock rust-toolchain.toml ./
|
COPY Cargo.toml Cargo.lock rust-toolchain.toml ./
|
||||||
|
|
||||||
# Ensure the pinned toolchain from rust-toolchain.toml (or provided ARG) is installed only if missing
|
|
||||||
RUN set -eux; \
|
RUN set -eux; \
|
||||||
if [ -n "${RUSTUP_TOOLCHAIN}" ]; then \
|
if [ -n "${RUSTUP_TOOLCHAIN}" ]; then \
|
||||||
if ! rustup toolchain list | grep -q "^${RUSTUP_TOOLCHAIN}"; then \
|
if ! rustup toolchain list | grep -q "^${RUSTUP_TOOLCHAIN}"; then \
|
||||||
|
|
@ -45,46 +38,38 @@ RUN set -eux; \
|
||||||
fi; \
|
fi; \
|
||||||
rustup show active-toolchain || true
|
rustup show active-toolchain || true
|
||||||
|
|
||||||
# Create a dummy src to allow cargo to download dependencies into the cache layer
|
|
||||||
RUN mkdir -p src && echo "fn main() { println!(\"cargo cache build\"); }" > src/main.rs
|
RUN mkdir -p src && echo "fn main() { println!(\"cargo cache build\"); }" > src/main.rs
|
||||||
|
|
||||||
# Warm up dependency caches without compiling a dummy binary
|
|
||||||
RUN --mount=type=cache,target=/usr/local/cargo/registry,sharing=locked \
|
RUN --mount=type=cache,target=/usr/local/cargo/registry,sharing=locked \
|
||||||
--mount=type=cache,target=/usr/local/cargo/git,sharing=locked \
|
--mount=type=cache,target=/usr/local/cargo/git,sharing=locked \
|
||||||
cargo fetch
|
cargo fetch
|
||||||
|
|
||||||
|
|
||||||
# Remove dummy main.rs before copying the real source
|
|
||||||
RUN rm -f src/main.rs
|
RUN rm -f src/main.rs
|
||||||
COPY src ./src
|
COPY src ./src
|
||||||
# Build the real binary
|
|
||||||
RUN cargo build --release --locked
|
RUN cargo build --release --locked
|
||||||
|
|
||||||
# --- Stage 2: Final, small image ---
|
# --- Stage 2: Final, small image ---
|
||||||
|
|
||||||
FROM debian:bookworm-slim
|
FROM debian:bookworm-slim
|
||||||
# Install only necessary runtime dependencies (no upgrade, just ca-certificates)
|
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/*
|
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
# Add a non-root user for security
|
RUN useradd --system --uid 1004 --no-create-home --shell /usr/sbin/nologin appuser
|
||||||
RUN useradd --system --uid 10001 --no-create-home --shell /usr/sbin/nologin appuser
|
|
||||||
|
|
||||||
# Copy the compiled binary from the builder stage
|
# Copy the compiled binary from the builder stage
|
||||||
|
|
||||||
COPY --from=builder /usr/src/app/target/release/rust-engine /usr/local/bin/rust-engine
|
COPY --from=builder /usr/src/app/target/release/rust-engine /usr/local/bin/rust-engine
|
||||||
|
|
||||||
# Create writable storage and logs directories for appuser
|
# --- THIS IS THE FIX ---
|
||||||
|
# **1. Copy the demo data files from your local machine into the image.**
|
||||||
|
COPY demo-data /app/demo-data
|
||||||
|
|
||||||
|
# **2. Create other directories and set permissions on everything.**
|
||||||
RUN chown appuser:appuser /usr/local/bin/rust-engine \
|
RUN chown appuser:appuser /usr/local/bin/rust-engine \
|
||||||
&& mkdir -p /var/log /app/storage /app/demo-data \
|
&& mkdir -p /var/log /app/storage \
|
||||||
&& touch /var/log/astra-errors.log \
|
&& touch /var/log/astra-errors.log \
|
||||||
&& chown -R appuser:appuser /var/log /app
|
&& chown -R appuser:appuser /var/log /app
|
||||||
|
|
||||||
# Set working directory to a writable location
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
# Switch to non-root user
|
|
||||||
USER appuser
|
USER appuser
|
||||||
|
|
||||||
EXPOSE 8000
|
EXPOSE 8000
|
||||||
# Redirect all output to /var/log/astra-errors.log for easy monitoring
|
|
||||||
ENTRYPOINT ["/bin/sh", "-c", "/usr/local/bin/rust-engine >> /var/log/astra-errors.log 2>&1"]
|
ENTRYPOINT ["/bin/sh", "-c", "/usr/local/bin/rust-engine >> /var/log/astra-errors.log 2>&1"]
|
||||||
|
|
@ -11,6 +11,9 @@
|
||||||
- DATABASE_URL: mysql://USER:PASS@HOST:3306/DB
|
- DATABASE_URL: mysql://USER:PASS@HOST:3306/DB
|
||||||
- QDRANT_URL: default <http://qdrant:6333>
|
- QDRANT_URL: default <http://qdrant:6333>
|
||||||
- GEMINI_API_KEY: used for Gemini content generation (optional in demo)
|
- GEMINI_API_KEY: used for Gemini content generation (optional in demo)
|
||||||
|
- DEMO_DATA_DIR: path to the folder containing PDF demo data (default resolves to `demo-data` under the repo or `/app/demo-data` in containers)
|
||||||
|
- ASTRA_STORAGE: directory for uploaded file blobs (default `/app/storage`)
|
||||||
|
- AUTO_IMPORT_DEMO: set to `false`, `0`, `off`, or `no` to disable automatic demo import at startup (defaults to `true`)
|
||||||
|
|
||||||
## Endpoints (JSON)
|
## Endpoints (JSON)
|
||||||
|
|
||||||
|
|
@ -19,7 +22,12 @@
|
||||||
- Response: {"success": true}
|
- Response: {"success": true}
|
||||||
|
|
||||||
- GET /api/files/list
|
- GET /api/files/list
|
||||||
- Response: {"files": [{"id","filename","path","description"}]}
|
- Response: {"files": [{"id","filename","path","storage_url","description"}]}
|
||||||
|
|
||||||
|
- POST /api/files/import-demo[?force=1]
|
||||||
|
- Copies PDFs from the demo directory into storage and queues them for analysis.
|
||||||
|
- Response: {"imported": N, "skipped": M, "files_found": K, "source_dir": "...", "attempted_paths": [...], "force": bool}
|
||||||
|
- `force=1` deletes prior records with the same filename before re-importing.
|
||||||
|
|
||||||
- GET /api/files/delete?id=<file_id>
|
- GET /api/files/delete?id=<file_id>
|
||||||
- Response: {"deleted": true|false}
|
- Response: {"deleted": true|false}
|
||||||
|
|
@ -67,10 +75,10 @@
|
||||||
2. set env DATABASE_URL and QDRANT_URL
|
2. set env DATABASE_URL and QDRANT_URL
|
||||||
3. cargo run
|
3. cargo run
|
||||||
4. (optional) import demo PDFs
|
4. (optional) import demo PDFs
|
||||||
- Ensure demo files are located in `rust-engine/demo-data` (default) or set `DEMO_DATA_DIR` env var to a folder containing PDFs.
|
- Populate a folder with PDFs under `rust-engine/demo-data` (or point `DEMO_DATA_DIR` to a custom path). The server auto-resolves common locations such as the repo root, `/app/demo-data`, and the working directory when running in Docker. When the engine boots it automatically attempts this import (can be disabled by setting `AUTO_IMPORT_DEMO=false`).
|
||||||
- Call the endpoint:
|
- Call the endpoint:
|
||||||
- POST <http://localhost:8000/api/files/import-demo>
|
- POST <http://localhost:8000/api/files/import-demo>
|
||||||
- Optional query `?force=1` to overwrite existing by filename
|
- Optional query `?force=1` to overwrite existing by filename. The JSON response also echoes where the engine looked (`source_dir`, `attempted_paths`) and how many PDFs were detected (`files_found`) so misconfigurations are easy to spot. Imported files are written to the shared `/app/storage` volume; the web-app container mounts this volume read-only and serves the contents at `/storage/<filename>`.
|
||||||
- Or run the PowerShell helper:
|
- Or run the PowerShell helper:
|
||||||
- `./scripts/import_demo.ps1` (adds all PDFs in demo-data)
|
- `./scripts/import_demo.ps1` (adds all PDFs in demo-data)
|
||||||
- `./scripts/import_demo.ps1 -Force` (overwrite existing)
|
- `./scripts/import_demo.ps1 -Force` (overwrite existing)
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,11 @@
|
||||||
use crate::vector_db::QdrantClient;
|
|
||||||
use crate::storage;
|
use crate::storage;
|
||||||
|
use crate::vector_db::QdrantClient;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use bytes::Buf;
|
use bytes::Buf;
|
||||||
use futures_util::TryStreamExt;
|
use futures_util::TryStreamExt;
|
||||||
use serde::Deserialize;
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::{MySqlPool, Row};
|
use sqlx::{MySqlPool, Row};
|
||||||
|
use tracing::{debug, info, warn};
|
||||||
use warp::{multipart::FormData, Filter, Rejection, Reply};
|
use warp::{multipart::FormData, Filter, Rejection, Reply};
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
|
|
@ -12,6 +13,173 @@ struct DeleteQuery {
|
||||||
id: String,
|
id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct DemoImportSummary {
|
||||||
|
pub imported: usize,
|
||||||
|
pub skipped: usize,
|
||||||
|
pub files_found: usize,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub source_dir: Option<String>,
|
||||||
|
pub attempted_paths: Vec<String>,
|
||||||
|
pub force: bool,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn perform_demo_import(force: bool, pool: &MySqlPool) -> Result<DemoImportSummary> {
|
||||||
|
use anyhow::Context;
|
||||||
|
use std::fs;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
let demo_dir_setting =
|
||||||
|
std::env::var("DEMO_DATA_DIR").unwrap_or_else(|_| "demo-data".to_string());
|
||||||
|
info!(force, requested_dir = %demo_dir_setting, "demo import requested");
|
||||||
|
|
||||||
|
let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
|
||||||
|
|
||||||
|
// Build a list of plausible demo-data locations so local runs and containers both work.
|
||||||
|
let mut candidates: Vec<PathBuf> = Vec::new();
|
||||||
|
let configured = PathBuf::from(&demo_dir_setting);
|
||||||
|
let mut push_candidate = |path: PathBuf| {
|
||||||
|
if !candidates.iter().any(|existing| existing == &path) {
|
||||||
|
candidates.push(path);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
push_candidate(base.join(&configured));
|
||||||
|
push_candidate(PathBuf::from(&demo_dir_setting));
|
||||||
|
push_candidate(base.join("rust-engine").join(&configured));
|
||||||
|
push_candidate(base.join("rust-engine").join("demo-data"));
|
||||||
|
push_candidate(base.join("demo-data"));
|
||||||
|
if let Ok(exe_path) = std::env::current_exe() {
|
||||||
|
if let Some(exe_dir) = exe_path.parent() {
|
||||||
|
push_candidate(exe_dir.join(&configured));
|
||||||
|
push_candidate(exe_dir.join("demo-data"));
|
||||||
|
push_candidate(exe_dir.join("rust-engine").join(&configured));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut attempted: Vec<PathBuf> = Vec::new();
|
||||||
|
let mut resolved_dir: Option<PathBuf> = None;
|
||||||
|
for candidate in candidates {
|
||||||
|
debug!(candidate = %candidate.display(), "evaluating demo import path candidate");
|
||||||
|
if candidate.exists() && candidate.is_dir() {
|
||||||
|
resolved_dir = Some(candidate.clone());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
attempted.push(candidate);
|
||||||
|
}
|
||||||
|
|
||||||
|
let attempted_paths: Vec<String> = attempted.iter().map(|p| p.display().to_string()).collect();
|
||||||
|
let mut summary = DemoImportSummary {
|
||||||
|
imported: 0,
|
||||||
|
skipped: 0,
|
||||||
|
files_found: 0,
|
||||||
|
source_dir: resolved_dir.as_ref().map(|p| p.display().to_string()),
|
||||||
|
attempted_paths,
|
||||||
|
force,
|
||||||
|
error: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let src_dir = match resolved_dir {
|
||||||
|
Some(path) => path,
|
||||||
|
None => {
|
||||||
|
summary.error =
|
||||||
|
Some("demo dir not found; set DEMO_DATA_DIR or bind mount demo PDFs".to_string());
|
||||||
|
warn!(
|
||||||
|
attempted_paths = ?summary.attempted_paths,
|
||||||
|
"demo import skipped; source directory not found"
|
||||||
|
);
|
||||||
|
return Ok(summary);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
summary.source_dir = Some(src_dir.display().to_string());
|
||||||
|
info!(
|
||||||
|
source = %summary.source_dir.as_deref().unwrap_or_default(),
|
||||||
|
attempted_paths = ?summary.attempted_paths,
|
||||||
|
"demo import source resolved"
|
||||||
|
);
|
||||||
|
|
||||||
|
for entry in fs::read_dir(&src_dir).with_context(|| format!("reading {:?}", src_dir))? {
|
||||||
|
let entry = entry.with_context(|| format!("reading entry in {:?}", src_dir))?;
|
||||||
|
let path = entry.path();
|
||||||
|
if path
|
||||||
|
.extension()
|
||||||
|
.and_then(|e| e.to_str())
|
||||||
|
.map(|e| e.eq_ignore_ascii_case("pdf"))
|
||||||
|
.unwrap_or(false)
|
||||||
|
{
|
||||||
|
summary.files_found += 1;
|
||||||
|
let filename = path
|
||||||
|
.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.unwrap_or("unknown.pdf")
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
if !force {
|
||||||
|
if let Some(_) = sqlx::query("SELECT id FROM files WHERE filename = ?")
|
||||||
|
.bind(&filename)
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
summary.skipped += 1;
|
||||||
|
info!(%filename, "skipping demo import; already present");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let data = fs::read(&path).with_context(|| format!("reading {:?}", path))?;
|
||||||
|
let stored_path = storage::save_file(&filename, &data)?;
|
||||||
|
info!(%filename, dest = %stored_path.to_string_lossy(), "demo file copied to storage");
|
||||||
|
|
||||||
|
let id = uuid::Uuid::new_v4().to_string();
|
||||||
|
|
||||||
|
if force {
|
||||||
|
sqlx::query("DELETE FROM files WHERE filename = ?")
|
||||||
|
.bind(&filename)
|
||||||
|
.execute(pool)
|
||||||
|
.await?;
|
||||||
|
info!(%filename, "existing file records removed due to force import");
|
||||||
|
}
|
||||||
|
|
||||||
|
sqlx::query("INSERT INTO files (id, filename, path, description, pending_analysis, analysis_status) VALUES (?, ?, ?, ?, ?, 'Queued')")
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&filename)
|
||||||
|
.bind(stored_path.to_string_lossy().to_string())
|
||||||
|
.bind(Option::<String>::None)
|
||||||
|
.bind(true)
|
||||||
|
.execute(pool)
|
||||||
|
.await?;
|
||||||
|
info!(%filename, file_id = %id, "demo file inserted into database");
|
||||||
|
|
||||||
|
info!(%filename, file_id = %id, "demo file queued for analysis by worker");
|
||||||
|
|
||||||
|
summary.imported += 1;
|
||||||
|
} else {
|
||||||
|
debug!(path = %path.display(), "skipping non-PDF entry during demo import");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let source_label = summary.source_dir.as_deref().unwrap_or("unknown");
|
||||||
|
|
||||||
|
if summary.files_found == 0 {
|
||||||
|
warn!(source = %source_label, "demo import located zero PDFs");
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
source = %source_label,
|
||||||
|
files_found = summary.files_found,
|
||||||
|
attempted_paths = ?summary.attempted_paths,
|
||||||
|
imported = summary.imported,
|
||||||
|
skipped = summary.skipped,
|
||||||
|
force,
|
||||||
|
"demo import completed"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(summary)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn routes(pool: MySqlPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
|
pub fn routes(pool: MySqlPool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
|
||||||
let pool_filter = warp::any().map(move || pool.clone());
|
let pool_filter = warp::any().map(move || pool.clone());
|
||||||
|
|
||||||
|
|
@ -21,7 +189,7 @@ pub fn routes(pool: MySqlPool) -> impl Filter<Extract = impl Reply, Error = Reje
|
||||||
.and(
|
.and(
|
||||||
warp::query::<std::collections::HashMap<String, String>>()
|
warp::query::<std::collections::HashMap<String, String>>()
|
||||||
.or(warp::any().map(|| std::collections::HashMap::new()))
|
.or(warp::any().map(|| std::collections::HashMap::new()))
|
||||||
.unify()
|
.unify(),
|
||||||
)
|
)
|
||||||
.and(pool_filter.clone())
|
.and(pool_filter.clone())
|
||||||
.and_then(handle_import_demo);
|
.and_then(handle_import_demo);
|
||||||
|
|
@ -74,14 +242,21 @@ pub fn routes(pool: MySqlPool) -> impl Filter<Extract = impl Reply, Error = Reje
|
||||||
.and(pool_filter.clone())
|
.and(pool_filter.clone())
|
||||||
.and_then(handle_cancel_query);
|
.and_then(handle_cancel_query);
|
||||||
|
|
||||||
let api = upload.or(import_demo).or(delete).or(list).or(create_q).or(status).or(result).or(cancel);
|
let api = upload
|
||||||
|
.or(import_demo)
|
||||||
|
.or(delete)
|
||||||
|
.or(list)
|
||||||
|
.or(create_q)
|
||||||
|
.or(status)
|
||||||
|
.or(result)
|
||||||
|
.or(cancel);
|
||||||
warp::path("api").and(api)
|
warp::path("api").and(api)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
||||||
let mut created_files = Vec::new();
|
let mut created_files = Vec::new();
|
||||||
while let Some(field) = form.try_next().await.map_err(|_| warp::reject())? {
|
while let Some(field) = form.try_next().await.map_err(|_| warp::reject())? {
|
||||||
let _name = field.name().to_string();
|
let _name = field.name().to_string();
|
||||||
let filename = field
|
let filename = field
|
||||||
.filename()
|
.filename()
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
|
|
@ -138,107 +313,31 @@ async fn handle_upload(mut form: FormData, pool: MySqlPool) -> Result<impl Reply
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_import_demo(params: std::collections::HashMap<String, String>, pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
async fn handle_import_demo(
|
||||||
use std::fs;
|
params: std::collections::HashMap<String, String>,
|
||||||
use std::path::PathBuf;
|
pool: MySqlPool,
|
||||||
let force = params.get("force").map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false);
|
) -> Result<impl Reply, Rejection> {
|
||||||
let demo_dir_setting = std::env::var("DEMO_DATA_DIR").unwrap_or_else(|_| "demo-data".to_string());
|
let force = params
|
||||||
let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
|
.get("force")
|
||||||
|
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
// Build a list of plausible demo-data locations so local runs and containers both work.
|
match perform_demo_import(force, &pool).await {
|
||||||
let mut candidates: Vec<PathBuf> = Vec::new();
|
Ok(summary) => Ok(warp::reply::json(&summary)),
|
||||||
let configured = PathBuf::from(&demo_dir_setting);
|
Err(err) => {
|
||||||
let mut push_candidate = |path: PathBuf| {
|
tracing::error!(error = %err, "demo import failed");
|
||||||
if !candidates.iter().any(|existing| existing == &path) {
|
let fallback = DemoImportSummary {
|
||||||
candidates.push(path);
|
imported: 0,
|
||||||
}
|
skipped: 0,
|
||||||
};
|
files_found: 0,
|
||||||
|
source_dir: None,
|
||||||
push_candidate(base.join(&configured));
|
attempted_paths: Vec::new(),
|
||||||
push_candidate(PathBuf::from(&demo_dir_setting));
|
force,
|
||||||
push_candidate(base.join("rust-engine").join(&configured));
|
error: Some(err.to_string()),
|
||||||
push_candidate(base.join("rust-engine").join("demo-data"));
|
};
|
||||||
push_candidate(base.join("demo-data"));
|
Ok(warp::reply::json(&fallback))
|
||||||
if let Ok(exe_path) = std::env::current_exe() {
|
|
||||||
if let Some(exe_dir) = exe_path.parent() {
|
|
||||||
push_candidate(exe_dir.join(&configured));
|
|
||||||
push_candidate(exe_dir.join("demo-data"));
|
|
||||||
push_candidate(exe_dir.join("rust-engine").join(&configured));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut attempted: Vec<PathBuf> = Vec::new();
|
|
||||||
let mut resolved_dir: Option<PathBuf> = None;
|
|
||||||
for candidate in candidates {
|
|
||||||
if candidate.exists() && candidate.is_dir() {
|
|
||||||
resolved_dir = Some(candidate);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
attempted.push(candidate);
|
|
||||||
}
|
|
||||||
|
|
||||||
let src_dir = match resolved_dir {
|
|
||||||
Some(path) => path,
|
|
||||||
None => {
|
|
||||||
let attempted_paths: Vec<String> = attempted
|
|
||||||
.into_iter()
|
|
||||||
.map(|p| p.display().to_string())
|
|
||||||
.collect();
|
|
||||||
return Ok(warp::reply::json(&serde_json::json!({
|
|
||||||
"imported": 0,
|
|
||||||
"skipped": 0,
|
|
||||||
"error": format!("demo dir not found (checked: {})", attempted_paths.join(", "))
|
|
||||||
})));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let mut imported = 0;
|
|
||||||
let mut skipped = 0;
|
|
||||||
for entry in fs::read_dir(&src_dir).map_err(|_| warp::reject())? {
|
|
||||||
let entry = entry.map_err(|_| warp::reject())?;
|
|
||||||
let path = entry.path();
|
|
||||||
if path.extension().and_then(|e| e.to_str()).map(|e| e.eq_ignore_ascii_case("pdf")).unwrap_or(false) {
|
|
||||||
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown.pdf").to_string();
|
|
||||||
|
|
||||||
// check if exists
|
|
||||||
if !force {
|
|
||||||
if let Some(_) = sqlx::query("SELECT id FROM files WHERE filename = ?")
|
|
||||||
.bind(&filename)
|
|
||||||
.fetch_optional(&pool)
|
|
||||||
.await
|
|
||||||
.map_err(|_| warp::reject())? {
|
|
||||||
skipped += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// read and save to storage
|
|
||||||
let data = fs::read(&path).map_err(|_| warp::reject())?;
|
|
||||||
let stored_path = storage::save_file(&filename, &data).map_err(|_| warp::reject())?;
|
|
||||||
|
|
||||||
// insert or upsert db record
|
|
||||||
let id = uuid::Uuid::new_v4().to_string();
|
|
||||||
if force {
|
|
||||||
let _ = sqlx::query("DELETE FROM files WHERE filename = ?")
|
|
||||||
.bind(&filename)
|
|
||||||
.execute(&pool)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
sqlx::query("INSERT INTO files (id, filename, path, description, pending_analysis, analysis_status) VALUES (?, ?, ?, ?, ?, 'Queued')")
|
|
||||||
.bind(&id)
|
|
||||||
.bind(&filename)
|
|
||||||
.bind(stored_path.to_str().unwrap())
|
|
||||||
.bind(Option::<String>::None)
|
|
||||||
.bind(true)
|
|
||||||
.execute(&pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
tracing::error!("DB insert error: {}", e);
|
|
||||||
warp::reject()
|
|
||||||
})?;
|
|
||||||
imported += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(warp::reply::json(&serde_json::json!({ "imported": imported, "skipped": skipped })))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
||||||
|
|
@ -251,10 +350,14 @@ async fn handle_delete(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Re
|
||||||
let path: String = row.get("path");
|
let path: String = row.get("path");
|
||||||
let _ = storage::delete_file(std::path::Path::new(&path));
|
let _ = storage::delete_file(std::path::Path::new(&path));
|
||||||
// Remove from Qdrant
|
// Remove from Qdrant
|
||||||
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
|
let qdrant_url =
|
||||||
|
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
|
||||||
let qdrant = QdrantClient::new(&qdrant_url);
|
let qdrant = QdrantClient::new(&qdrant_url);
|
||||||
let _ = qdrant.delete_point(&q.id).await;
|
let _ = qdrant.delete_point(&q.id).await;
|
||||||
let _ = sqlx::query("DELETE FROM files WHERE id = ?").bind(&q.id).execute(&pool).await;
|
let _ = sqlx::query("DELETE FROM files WHERE id = ?")
|
||||||
|
.bind(&q.id)
|
||||||
|
.execute(&pool)
|
||||||
|
.await;
|
||||||
return Ok(warp::reply::json(&serde_json::json!({"deleted": true})));
|
return Ok(warp::reply::json(&serde_json::json!({"deleted": true})));
|
||||||
}
|
}
|
||||||
Ok(warp::reply::json(&serde_json::json!({"deleted": false})))
|
Ok(warp::reply::json(&serde_json::json!({"deleted": false})))
|
||||||
|
|
@ -268,7 +371,6 @@ async fn handle_list(pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
||||||
tracing::error!("DB list error: {}", e);
|
tracing::error!("DB list error: {}", e);
|
||||||
warp::reject()
|
warp::reject()
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let files: Vec<serde_json::Value> = rows
|
let files: Vec<serde_json::Value> = rows
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|r| {
|
.map(|r| {
|
||||||
|
|
@ -278,10 +380,12 @@ async fn handle_list(pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
||||||
let description: Option<String> = r.get("description");
|
let description: Option<String> = r.get("description");
|
||||||
let pending: bool = r.get("pending_analysis");
|
let pending: bool = r.get("pending_analysis");
|
||||||
let status: Option<String> = r.try_get("analysis_status").ok();
|
let status: Option<String> = r.try_get("analysis_status").ok();
|
||||||
|
let storage_url = format!("/storage/{}", filename);
|
||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
"id": id,
|
"id": id,
|
||||||
"filename": filename,
|
"filename": filename,
|
||||||
"path": path,
|
"path": path,
|
||||||
|
"storage_url": storage_url,
|
||||||
"description": description,
|
"description": description,
|
||||||
"pending_analysis": pending,
|
"pending_analysis": pending,
|
||||||
"analysis_status": status
|
"analysis_status": status
|
||||||
|
|
@ -292,7 +396,10 @@ async fn handle_list(pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
||||||
Ok(warp::reply::json(&serde_json::json!({"files": files})))
|
Ok(warp::reply::json(&serde_json::json!({"files": files})))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_create_query(body: serde_json::Value, pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
async fn handle_create_query(
|
||||||
|
body: serde_json::Value,
|
||||||
|
pool: MySqlPool,
|
||||||
|
) -> Result<impl Reply, Rejection> {
|
||||||
// Insert query as queued, worker will pick it up
|
// Insert query as queued, worker will pick it up
|
||||||
let id = uuid::Uuid::new_v4().to_string();
|
let id = uuid::Uuid::new_v4().to_string();
|
||||||
let payload = body;
|
let payload = body;
|
||||||
|
|
@ -317,9 +424,11 @@ async fn handle_query_status(q: DeleteQuery, pool: MySqlPool) -> Result<impl Rep
|
||||||
.map_err(|_| warp::reject())?
|
.map_err(|_| warp::reject())?
|
||||||
{
|
{
|
||||||
let status: String = row.get("status");
|
let status: String = row.get("status");
|
||||||
return Ok(warp::reply::json(&serde_json::json!({"status": status})));
|
return Ok(warp::reply::json(&serde_json::json!({"status": status})));
|
||||||
}
|
}
|
||||||
Ok(warp::reply::json(&serde_json::json!({"status": "not_found"})))
|
Ok(warp::reply::json(
|
||||||
|
&serde_json::json!({"status": "not_found"}),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_query_result(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
async fn handle_query_result(q: DeleteQuery, pool: MySqlPool) -> Result<impl Reply, Rejection> {
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
use crate::gemini_client::{demo_text_embedding, generate_text_with_model, DEMO_EMBED_DIM};
|
use crate::gemini_client::{demo_text_embedding, generate_text_with_model, DEMO_EMBED_DIM};
|
||||||
use crate::vector;
|
use crate::vector;
|
||||||
use crate::vector_db::QdrantClient;
|
use crate::vector_db::QdrantClient;
|
||||||
use sqlx::MySqlPool;
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use tracing::{info, error};
|
use sqlx::MySqlPool;
|
||||||
|
use tracing::{error, info};
|
||||||
|
|
||||||
pub struct FileWorker {
|
pub struct FileWorker {
|
||||||
pool: MySqlPool,
|
pool: MySqlPool,
|
||||||
|
|
@ -12,7 +12,8 @@ pub struct FileWorker {
|
||||||
|
|
||||||
impl FileWorker {
|
impl FileWorker {
|
||||||
pub fn new(pool: MySqlPool) -> Self {
|
pub fn new(pool: MySqlPool) -> Self {
|
||||||
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
|
let qdrant_url =
|
||||||
|
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
|
||||||
let qdrant = QdrantClient::new(&qdrant_url);
|
let qdrant = QdrantClient::new(&qdrant_url);
|
||||||
Self { pool, qdrant }
|
Self { pool, qdrant }
|
||||||
}
|
}
|
||||||
|
|
@ -70,8 +71,8 @@ impl FileWorker {
|
||||||
.bind(file_id)
|
.bind(file_id)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
let filename: String = row.get("filename");
|
let filename: String = row.get("filename");
|
||||||
let _path: String = row.get("path");
|
let _path: String = row.get("path");
|
||||||
|
|
||||||
// Stage 1: Gemini 2.5 Flash for description
|
// Stage 1: Gemini 2.5 Flash for description
|
||||||
let desc = generate_text_with_model(
|
let desc = generate_text_with_model(
|
||||||
|
|
@ -82,11 +83,13 @@ impl FileWorker {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap_or_else(|e| format!("[desc error: {}]", e));
|
.unwrap_or_else(|e| format!("[desc error: {}]", e));
|
||||||
sqlx::query("UPDATE files SET description = ?, analysis_status = 'InProgress' WHERE id = ?")
|
sqlx::query(
|
||||||
.bind(&desc)
|
"UPDATE files SET description = ?, analysis_status = 'InProgress' WHERE id = ?",
|
||||||
.bind(file_id)
|
)
|
||||||
.execute(&self.pool)
|
.bind(&desc)
|
||||||
.await?;
|
.bind(file_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Stage 2: Gemini 2.5 Pro for deep vector graph data
|
// Stage 2: Gemini 2.5 Pro for deep vector graph data
|
||||||
let vector_graph = generate_text_with_model(
|
let vector_graph = generate_text_with_model(
|
||||||
|
|
@ -111,18 +114,22 @@ impl FileWorker {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark file as ready
|
// Mark file as ready
|
||||||
sqlx::query("UPDATE files SET pending_analysis = FALSE, analysis_status = 'Completed' WHERE id = ?")
|
sqlx::query(
|
||||||
.bind(file_id)
|
"UPDATE files SET pending_analysis = FALSE, analysis_status = 'Completed' WHERE id = ?",
|
||||||
.execute(&self.pool)
|
)
|
||||||
.await?;
|
.bind(file_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn mark_failed(&self, file_id: &str, reason: &str) -> Result<()> {
|
async fn mark_failed(&self, file_id: &str, reason: &str) -> Result<()> {
|
||||||
sqlx::query("UPDATE files SET analysis_status = 'Failed', pending_analysis = TRUE WHERE id = ?")
|
sqlx::query(
|
||||||
.bind(file_id)
|
"UPDATE files SET analysis_status = 'Failed', pending_analysis = TRUE WHERE id = ?",
|
||||||
.execute(&self.pool)
|
)
|
||||||
.await?;
|
.bind(file_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
sqlx::query("UPDATE files SET description = COALESCE(description, ?) WHERE id = ?")
|
sqlx::query("UPDATE files SET description = COALESCE(description, ?) WHERE id = ?")
|
||||||
.bind(format!("[analysis failed: {}]", reason))
|
.bind(format!("[analysis failed: {}]", reason))
|
||||||
.bind(file_id)
|
.bind(file_id)
|
||||||
|
|
|
||||||
|
|
@ -63,13 +63,21 @@ pub async fn generate_text_with_model(model: &str, prompt: &str) -> Result<Strin
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct Part { text: Option<String> }
|
struct Part {
|
||||||
|
text: Option<String>,
|
||||||
|
}
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct Content { parts: Vec<Part> }
|
struct Content {
|
||||||
|
parts: Vec<Part>,
|
||||||
|
}
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct Candidate { content: Content }
|
struct Candidate {
|
||||||
|
content: Content,
|
||||||
|
}
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct Response { candidates: Option<Vec<Candidate>> }
|
struct Response {
|
||||||
|
candidates: Option<Vec<Candidate>>,
|
||||||
|
}
|
||||||
|
|
||||||
let data: Response = serde_json::from_str(&txt).unwrap_or(Response { candidates: None });
|
let data: Response = serde_json::from_str(&txt).unwrap_or(Response { candidates: None });
|
||||||
let out = data
|
let out = data
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,16 @@
|
||||||
mod file_worker;
|
|
||||||
mod api;
|
mod api;
|
||||||
mod db;
|
mod db;
|
||||||
|
mod file_worker;
|
||||||
mod gemini_client;
|
mod gemini_client;
|
||||||
mod models;
|
mod models;
|
||||||
mod storage;
|
mod storage;
|
||||||
mod vector;
|
mod vector;
|
||||||
mod worker;
|
|
||||||
mod vector_db;
|
mod vector_db;
|
||||||
|
mod worker;
|
||||||
|
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use tracing::info;
|
use tracing::{error, info, warn};
|
||||||
use warp::Filter;
|
use warp::Filter;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
|
@ -30,7 +30,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
storage::ensure_storage_dir().expect("storage dir");
|
storage::ensure_storage_dir().expect("storage dir");
|
||||||
|
|
||||||
// Initialize DB
|
// Initialize DB
|
||||||
let pool = db::init_db(&database_url).await.map_err(|e| -> Box<dyn Error> { Box::new(e) })?;
|
let pool = db::init_db(&database_url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| -> Box<dyn Error> { Box::new(e) })?;
|
||||||
|
|
||||||
|
let auto_import_setting = env::var("AUTO_IMPORT_DEMO").unwrap_or_else(|_| "true".to_string());
|
||||||
|
let auto_import = !matches!(
|
||||||
|
auto_import_setting.trim().to_ascii_lowercase().as_str(),
|
||||||
|
"0" | "false" | "off" | "no"
|
||||||
|
);
|
||||||
|
if auto_import {
|
||||||
|
match api::perform_demo_import(false, &pool).await {
|
||||||
|
Ok(summary) => {
|
||||||
|
if let Some(err_msg) = summary.error.as_ref() {
|
||||||
|
warn!(error = %err_msg, "startup demo import completed with warnings");
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
imported = summary.imported,
|
||||||
|
skipped = summary.skipped,
|
||||||
|
files_found = summary.files_found,
|
||||||
|
source = summary.source_dir.as_deref().unwrap_or("unknown"),
|
||||||
|
"startup demo import completed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!(error = %err, "startup demo import failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
info!("AUTO_IMPORT_DEMO disabled; skipping startup demo import");
|
||||||
|
}
|
||||||
|
|
||||||
// Spawn query worker
|
// Spawn query worker
|
||||||
let worker = worker::Worker::new(pool.clone());
|
let worker = worker::Worker::new(pool.clone());
|
||||||
|
|
@ -42,17 +71,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
// API routes
|
// API routes
|
||||||
let api_routes = api::routes(pool.clone())
|
let api_routes = api::routes(pool.clone())
|
||||||
.with(warp::cors()
|
.with(
|
||||||
.allow_any_origin()
|
warp::cors()
|
||||||
.allow_headers(vec!["content-type", "authorization"])
|
.allow_any_origin()
|
||||||
.allow_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]))
|
.allow_headers(vec!["content-type", "authorization"])
|
||||||
|
.allow_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]),
|
||||||
|
)
|
||||||
.with(warp::log("rust_engine"));
|
.with(warp::log("rust_engine"));
|
||||||
|
|
||||||
info!("Rust Engine started on http://0.0.0.0:8000");
|
info!("Rust Engine started on http://0.0.0.0:8000");
|
||||||
|
|
||||||
warp::serve(api_routes)
|
warp::serve(api_routes).run(([0, 0, 0, 0], 8000)).await;
|
||||||
.run(([0, 0, 0, 0], 8000))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,11 @@ pub struct FileRecord {
|
||||||
|
|
||||||
impl FileRecord {
|
impl FileRecord {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn new(filename: impl Into<String>, path: impl Into<String>, description: Option<String>) -> Self {
|
pub fn new(
|
||||||
|
filename: impl Into<String>,
|
||||||
|
path: impl Into<String>,
|
||||||
|
description: Option<String>,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
id: Uuid::new_v4().to_string(),
|
id: Uuid::new_v4().to_string(),
|
||||||
filename: filename.into(),
|
filename: filename.into(),
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde_json::json;
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct QdrantClient {
|
pub struct QdrantClient {
|
||||||
|
|
@ -10,7 +10,6 @@ pub struct QdrantClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QdrantClient {
|
impl QdrantClient {
|
||||||
|
|
||||||
/// Delete a point from collection 'files' by id
|
/// Delete a point from collection 'files' by id
|
||||||
pub async fn delete_point(&self, id: &str) -> Result<()> {
|
pub async fn delete_point(&self, id: &str) -> Result<()> {
|
||||||
let url = format!("{}/collections/files/points/delete", self.base);
|
let url = format!("{}/collections/files/points/delete", self.base);
|
||||||
|
|
@ -67,7 +66,11 @@ impl QdrantClient {
|
||||||
} else {
|
} else {
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
let t = resp.text().await.unwrap_or_default();
|
let t = resp.text().await.unwrap_or_default();
|
||||||
Err(anyhow::anyhow!("qdrant ensure collection failed: {} - {}", status, t))
|
Err(anyhow::anyhow!(
|
||||||
|
"qdrant ensure collection failed: {} - {}",
|
||||||
|
status,
|
||||||
|
t
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -85,9 +88,14 @@ impl QdrantClient {
|
||||||
return Err(anyhow::anyhow!("qdrant search failed: {} - {}", status, t));
|
return Err(anyhow::anyhow!("qdrant search failed: {} - {}", status, t));
|
||||||
}
|
}
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct Hit { id: serde_json::Value, score: f32 }
|
struct Hit {
|
||||||
|
id: serde_json::Value,
|
||||||
|
score: f32,
|
||||||
|
}
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct Data { result: Vec<Hit> }
|
struct Data {
|
||||||
|
result: Vec<Hit>,
|
||||||
|
}
|
||||||
let data: Data = resp.json().await?;
|
let data: Data = resp.json().await?;
|
||||||
let mut out = Vec::new();
|
let mut out = Vec::new();
|
||||||
for h in data.result {
|
for h in data.result {
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,8 @@ pub struct Worker {
|
||||||
|
|
||||||
impl Worker {
|
impl Worker {
|
||||||
pub fn new(pool: MySqlPool) -> Self {
|
pub fn new(pool: MySqlPool) -> Self {
|
||||||
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
|
let qdrant_url =
|
||||||
|
std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://qdrant:6333".to_string());
|
||||||
let qdrant = QdrantClient::new(&qdrant_url);
|
let qdrant = QdrantClient::new(&qdrant_url);
|
||||||
Self { pool, qdrant }
|
Self { pool, qdrant }
|
||||||
}
|
}
|
||||||
|
|
@ -56,18 +57,22 @@ impl Worker {
|
||||||
async fn fetch_and_claim(&self) -> Result<Option<QueryRecord>> {
|
async fn fetch_and_claim(&self) -> Result<Option<QueryRecord>> {
|
||||||
// Note: MySQL transactional SELECT FOR UPDATE handling is more complex; for this hackathon scaffold
|
// Note: MySQL transactional SELECT FOR UPDATE handling is more complex; for this hackathon scaffold
|
||||||
// we do a simple two-step: select one queued id, then update it to InProgress if it is still queued.
|
// we do a simple two-step: select one queued id, then update it to InProgress if it is still queued.
|
||||||
if let Some(row) = sqlx::query("SELECT id, payload FROM queries WHERE status = 'Queued' ORDER BY created_at LIMIT 1")
|
if let Some(row) = sqlx::query(
|
||||||
.fetch_optional(&self.pool)
|
"SELECT id, payload FROM queries WHERE status = 'Queued' ORDER BY created_at LIMIT 1",
|
||||||
.await?
|
)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?
|
||||||
{
|
{
|
||||||
use sqlx::Row;
|
use sqlx::Row;
|
||||||
let id: String = row.get("id");
|
let id: String = row.get("id");
|
||||||
let payload: serde_json::Value = row.get("payload");
|
let payload: serde_json::Value = row.get("payload");
|
||||||
|
|
||||||
let updated = sqlx::query("UPDATE queries SET status = 'InProgress' WHERE id = ? AND status = 'Queued'")
|
let updated = sqlx::query(
|
||||||
.bind(&id)
|
"UPDATE queries SET status = 'InProgress' WHERE id = ? AND status = 'Queued'",
|
||||||
.execute(&self.pool)
|
)
|
||||||
.await?;
|
.bind(&id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
if updated.rows_affected() == 1 {
|
if updated.rows_affected() == 1 {
|
||||||
let mut q = QueryRecord::new(payload);
|
let mut q = QueryRecord::new(payload);
|
||||||
|
|
@ -90,7 +95,9 @@ impl Worker {
|
||||||
let top_k = top_k.max(1).min(20);
|
let top_k = top_k.max(1).min(20);
|
||||||
|
|
||||||
// Check cancellation
|
// Check cancellation
|
||||||
if self.is_cancelled(&q.id).await? { return Ok(()); }
|
if self.is_cancelled(&q.id).await? {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// Stage 3: search top-K in Qdrant
|
// Stage 3: search top-K in Qdrant
|
||||||
let hits = match self.qdrant.search_top_k(emb.clone(), top_k).await {
|
let hits = match self.qdrant.search_top_k(emb.clone(), top_k).await {
|
||||||
|
|
@ -115,7 +122,9 @@ impl Worker {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check cancellation
|
// Check cancellation
|
||||||
if self.is_cancelled(&q.id).await? { return Ok(()); }
|
if self.is_cancelled(&q.id).await? {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// Stage 4: fetch file metadata for IDs
|
// Stage 4: fetch file metadata for IDs
|
||||||
let mut files_json = Vec::new();
|
let mut files_json = Vec::new();
|
||||||
|
|
@ -222,13 +231,18 @@ impl Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_relationships_prompt(query: &str, files: &Vec<serde_json::Value>) -> String {
|
fn build_relationships_prompt(query: &str, files: &Vec<serde_json::Value>) -> String {
|
||||||
let files_snippets: Vec<String> = files.iter().map(|f| format!(
|
let files_snippets: Vec<String> = files
|
||||||
"- id: {id}, filename: {name}, path: {path}, desc: {desc}",
|
.iter()
|
||||||
id=f.get("id").and_then(|v| v.as_str()).unwrap_or(""),
|
.map(|f| {
|
||||||
name=f.get("filename").and_then(|v| v.as_str()).unwrap_or(""),
|
format!(
|
||||||
path=f.get("path").and_then(|v| v.as_str()).unwrap_or(""),
|
"- id: {id}, filename: {name}, path: {path}, desc: {desc}",
|
||||||
desc=f.get("description").and_then(|v| v.as_str()).unwrap_or("")
|
id = f.get("id").and_then(|v| v.as_str()).unwrap_or(""),
|
||||||
)).collect();
|
name = f.get("filename").and_then(|v| v.as_str()).unwrap_or(""),
|
||||||
|
path = f.get("path").and_then(|v| v.as_str()).unwrap_or(""),
|
||||||
|
desc = f.get("description").and_then(|v| v.as_str()).unwrap_or("")
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
format!(
|
format!(
|
||||||
"You are an assistant analyzing relationships STRICTLY within the provided files.\n\
|
"You are an assistant analyzing relationships STRICTLY within the provided files.\n\
|
||||||
Query: {query}\n\
|
Query: {query}\n\
|
||||||
|
|
@ -243,12 +257,21 @@ fn build_relationships_prompt(query: &str, files: &Vec<serde_json::Value>) -> St
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_final_answer_prompt(query: &str, files: &Vec<serde_json::Value>, relationships: &str) -> String {
|
fn build_final_answer_prompt(
|
||||||
let files_short: Vec<String> = files.iter().map(|f| format!(
|
query: &str,
|
||||||
"- {name} ({id})",
|
files: &Vec<serde_json::Value>,
|
||||||
id=f.get("id").and_then(|v| v.as_str()).unwrap_or(""),
|
relationships: &str,
|
||||||
name=f.get("filename").and_then(|v| v.as_str()).unwrap_or("")
|
) -> String {
|
||||||
)).collect();
|
let files_short: Vec<String> = files
|
||||||
|
.iter()
|
||||||
|
.map(|f| {
|
||||||
|
format!(
|
||||||
|
"- {name} ({id})",
|
||||||
|
id = f.get("id").and_then(|v| v.as_str()).unwrap_or(""),
|
||||||
|
name = f.get("filename").and_then(|v| v.as_str()).unwrap_or("")
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
format!(
|
format!(
|
||||||
"You are to compose a final answer to the user query using only the information from the files.\n\
|
"You are to compose a final answer to the user query using only the information from the files.\n\
|
||||||
Query: {query}\n\
|
Query: {query}\n\
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ FROM node:23-alpine
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
COPY package*.json ./
|
COPY package*.json ./
|
||||||
RUN npm ci
|
RUN npm i
|
||||||
COPY . .
|
COPY . .
|
||||||
RUN npm run format && npm run build
|
RUN npm run format && npm run build
|
||||||
EXPOSE 3000
|
EXPOSE 3000
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ const RUST_ENGINE_BASE =
|
||||||
process.env.RUST_ENGINE_BASE ||
|
process.env.RUST_ENGINE_BASE ||
|
||||||
process.env.RUST_ENGINE_URL ||
|
process.env.RUST_ENGINE_URL ||
|
||||||
'http://rust-engine:8000';
|
'http://rust-engine:8000';
|
||||||
|
const STORAGE_DIR = path.resolve(process.env.ASTRA_STORAGE || '/app/storage');
|
||||||
|
|
||||||
app.set('trust proxy', true);
|
app.set('trust proxy', true);
|
||||||
app.use(helmet({ contentSecurityPolicy: false }));
|
app.use(helmet({ contentSecurityPolicy: false }));
|
||||||
|
|
@ -43,6 +44,9 @@ app.post('/api/files/import-demo', async (req, res) => {
|
||||||
const distDir = path.resolve(__dirname, 'dist');
|
const distDir = path.resolve(__dirname, 'dist');
|
||||||
app.use(express.static(distDir));
|
app.use(express.static(distDir));
|
||||||
|
|
||||||
|
// Expose imported files for the UI (read-only)
|
||||||
|
app.use('/storage', express.static(STORAGE_DIR));
|
||||||
|
|
||||||
// SPA fallback (Express 5 requires middleware instead of bare '*')
|
// SPA fallback (Express 5 requires middleware instead of bare '*')
|
||||||
app.use((req, res) => {
|
app.use((req, res) => {
|
||||||
res.sendFile(path.join(distDir, 'index.html'));
|
res.sendFile(path.join(distDir, 'index.html'));
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue