Fix worker
This commit is contained in:
parent
995cfbd9b0
commit
18081a28ba
1 changed files with 67 additions and 24 deletions
|
|
@ -127,30 +127,11 @@ impl Worker {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stage 4: fetch file metadata for IDs
|
// Stage 4: fetch file metadata for IDs and fall back to recent ready files when empty
|
||||||
let mut files_json = Vec::new();
|
let mut files_json = self.load_files_by_hits(&hits).await?;
|
||||||
for (fid, score) in hits {
|
|
||||||
if let Some(row) = sqlx::query("SELECT id, filename, path, description, analysis_status FROM files WHERE id = ? AND pending_analysis = FALSE")
|
if files_json.is_empty() {
|
||||||
.bind(&fid)
|
files_json = self.load_recent_ready_files(top_k).await?;
|
||||||
.fetch_optional(&self.pool)
|
|
||||||
.await? {
|
|
||||||
use sqlx::Row;
|
|
||||||
let id: String = row.get("id");
|
|
||||||
let filename: String = row.get("filename");
|
|
||||||
let path: String = row.get("path");
|
|
||||||
let description: Option<String> = row.get("description");
|
|
||||||
let status: Option<String> = row.try_get("analysis_status").ok();
|
|
||||||
let storage_url = storage::public_url_for(&filename);
|
|
||||||
files_json.push(serde_json::json!({
|
|
||||||
"id": id,
|
|
||||||
"filename": filename,
|
|
||||||
"path": path,
|
|
||||||
"storage_url": storage_url,
|
|
||||||
"description": description,
|
|
||||||
"analysis_status": status,
|
|
||||||
"score": score
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stage 5: call Gemini to analyze relationships and propose follow-up details strictly from provided files
|
// Stage 5: call Gemini to analyze relationships and propose follow-up details strictly from provided files
|
||||||
|
|
@ -188,6 +169,68 @@ impl Worker {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn load_files_by_hits(&self, hits: &[(String, f32)]) -> Result<Vec<serde_json::Value>> {
|
||||||
|
let mut files_json = Vec::new();
|
||||||
|
for (fid, score) in hits {
|
||||||
|
if let Some(row) = sqlx::query(
|
||||||
|
"SELECT id, filename, path, description, analysis_status FROM files WHERE id = ? AND pending_analysis = FALSE",
|
||||||
|
)
|
||||||
|
.bind(fid)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
use sqlx::Row;
|
||||||
|
let id: String = row.get("id");
|
||||||
|
let filename: String = row.get("filename");
|
||||||
|
let path: String = row.get("path");
|
||||||
|
let description: Option<String> = row.get("description");
|
||||||
|
let analysis_status: Option<String> = row.try_get("analysis_status").ok();
|
||||||
|
let storage_url = storage::public_url_for(&filename);
|
||||||
|
files_json.push(serde_json::json!({
|
||||||
|
"id": id,
|
||||||
|
"filename": filename,
|
||||||
|
"path": path,
|
||||||
|
"storage_url": storage_url,
|
||||||
|
"description": description,
|
||||||
|
"analysis_status": analysis_status,
|
||||||
|
"score": score
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(files_json)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_recent_ready_files(&self, top_k: usize) -> Result<Vec<serde_json::Value>> {
|
||||||
|
let limit = top_k.max(1) as i64;
|
||||||
|
let rows = sqlx::query(
|
||||||
|
"SELECT id, filename, path, description, analysis_status FROM files WHERE pending_analysis = FALSE ORDER BY created_at DESC LIMIT ?",
|
||||||
|
)
|
||||||
|
.bind(limit)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut files_json = Vec::new();
|
||||||
|
for row in rows {
|
||||||
|
use sqlx::Row;
|
||||||
|
let id: String = row.get("id");
|
||||||
|
let filename: String = row.get("filename");
|
||||||
|
let path: String = row.get("path");
|
||||||
|
let description: Option<String> = row.get("description");
|
||||||
|
let analysis_status: Option<String> = row.try_get("analysis_status").ok();
|
||||||
|
let storage_url = storage::public_url_for(&filename);
|
||||||
|
files_json.push(serde_json::json!({
|
||||||
|
"id": id,
|
||||||
|
"filename": filename,
|
||||||
|
"path": path,
|
||||||
|
"storage_url": storage_url,
|
||||||
|
"description": description,
|
||||||
|
"analysis_status": analysis_status,
|
||||||
|
"score": serde_json::Value::Null
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
Ok(files_json)
|
||||||
|
}
|
||||||
|
|
||||||
async fn update_status(&self, id: &str, status: QueryStatus) -> Result<()> {
|
async fn update_status(&self, id: &str, status: QueryStatus) -> Result<()> {
|
||||||
let s = match status {
|
let s = match status {
|
||||||
QueryStatus::Queued => "Queued",
|
QueryStatus::Queued => "Queued",
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue