ai-rag — specifikace (V2 retrieval pro AI dokumenty)¶
Status: draft Verze spec: 0.1 Aktualizováno: 2026-05-14 Závisí na:
ai-chat.mdv1.0 —ai_documentsstorage hotový, retrieval chybí
1. Cíl¶
Po V1.5 (ai-chat shipped) AI dnes:
- ✓ Číst text z PDF/DOCX/TXT inline v user message (klient-side extract, max 20k znaků v _ai_compose_message)
- ✓ Storage pro libovolnou velikost dokumentu (ai_documents + S3, hash dedup, max 100 MB)
- ✗ AI nevidí dokumenty co nejsou inline — uploadnutý 50MB PDF leží v S3, ale chat ho neumí query-ovat
Cíl V2: přidat retrieval augmentation — když uživatel napíše dotaz, AI dostane relevantní úryvky z jeho dokumentů, ne celé soubory.
Klíčové vlastnosti V2:
- Multi-model embedding — admin per-server volí bge-m3 (1024 dim, multilingual) nebo nomic-embed-text-v1.5 (768 dim). Dokument je pin-nutý na embedding server (cross-server search nemožný bez re-index).
- Hybrid retrieval — per-conversation attach (uživatel přidá doc) má precedenci, fallback na global per-user search pokud žádný attach.
- Late-chunking preferovaný (jediný embed call per doc + token-level chunky retrieval-time), fallback fixed-size pro krátké dokumenty / single-pass models.
- Async indexing přes Celery — ai_documents.status přechody: uploaded → processing → indexed | failed.
- Citations v assistant odpovědi — ai_messages.sources JSONB s [{document_id, chunk_pos, score}, ...] pro UI tooltip „čerpáno z…".
Co V2 NEDĚLÁ (záměrně): - Reranking přes cross-encoder — Phase 3, prozatím pouhý cosine similarity z embedding. - Hybrid BM25 + vector — Phase 3, jen vector teď. - Cross-language retrieval (CZ query → EN doc) — bge-m3 to umí v 5 z 6 případů; není explicit feature. - Chat history embedding — RAG jen nad dokumenty, ne nad předchozími konverzacemi. - Web search injection — žádná external search (Google/Bing/Brave).
2. Architektura¶
┌────────────────────────────────────────────────────────────────────┐
│ User uploaduje PDF → /ai/documents/upload-url → /finalize │
│ ai_documents.status = 'uploaded' │
└────────────────────────────────────┬───────────────────────────────┘
│ Celery dispatch
▼
┌────────────────────────────────────────────────────────────────────┐
│ Celery task `ai.index_document(doc_id)` │
│ 1. Download S3 object │
│ 2. Extract text (PDF: pypdf, DOCX: python-docx, plain: as-is) │
│ 3. Volá embedding server (`/ai/servers/{embedding_server_id}/...`)│
│ - Late-chunking: jeden /v1/embeddings call s celým doc │
│ - Fixed-size: split na 1000 char chunky, batch embed │
│ 4. Persist do `ai_chunks` (embedding vector + pos + content) │
│ 5. ai_documents.status = 'indexed' │
└────────────────────────────────────┬───────────────────────────────┘
│
┌────────────────────────────────────▼───────────────────────────────┐
│ Uživatel chat — POST /ai/conversations/{id}/messages │
│ send_message() flow: │
│ 1. Persist user message │
│ 2. RAG: pokud konverzace má attach docs → search v nich │
│ jinak global per-user search │
│ - Embed query (stejným modelem co byl použit při index) │
│ - SELECT ... ORDER BY embedding <-> query_vec LIMIT 5 │
│ - top K chunků pre-pend jako system context │
│ 3. LLM call s rozšířeným kontextem │
│ 4. Persist assistant message s `sources` JSONB │
│ 5. Stream SSE klientovi (token + sources event) │
└────────────────────────────────────────────────────────────────────┘
3. Datový model¶
3.1 Rozšíření ai_servers (capabilities)¶
Sloupec capabilities JSONB rozšířen o embedding metadata:
{
"chat": true,
"embeddings": true,
"embedding_model": "text-embedding-bge-m3",
"embedding_dim": 1024,
"embedding_max_input_tokens": 8192
}
Admin v UI při create/edit serveru vyplní pokud má embedding capability.
3.2 Rozšíření ai_documents (status + embedding ref)¶
| Sloupec | Typ | Účel |
|---|---|---|
embedding_server_id |
UUID FK ai_servers(id) NULL | Server jehož embedding model byl použit. NULL = ne-indexed. |
embedding_model |
TEXT NULL | Kopie z ai_servers.capabilities.embedding_model v čase index (immutable). |
embedding_dim |
INT NULL | Kopie z capabilities (1024, 768, …). |
chunking_strategy |
TEXT NULL | late / fixed-1000-200 / markdown |
chunk_count |
INT NULL | Počet chunků v ai_chunks (pro UI status) |
indexed_at |
TIMESTAMP NULL | Kdy bylo indexování dokončeno |
index_error |
TEXT NULL | Pokud status='failed', popis |
status CHECK rozšíří na: uploaded | processing | indexed | failed | reindexing.
3.3 ai_chunks (vector storage)¶
| Sloupec | Typ | Účel |
|---|---|---|
id |
UUID PK | |
document_id |
UUID FK ai_documents(id) ON DELETE CASCADE | |
chunk_index |
INT NOT NULL | 0-based pořadí v dokumentu (pro citation UI) |
content |
TEXT NOT NULL | Text chunku (search-time fallback display) |
start_offset |
INT NOT NULL | Pozice v původním textu (char) — pro citation hover „ukázat v dokumentu" |
end_offset |
INT NOT NULL | |
embedding_1024 |
vector(1024) NULL | bge-m3 vector |
embedding_768 |
vector(768) NULL | nomic-embed vector |
metadata |
JSONB DEFAULT '{}' | Page number (PDF), heading hierarchy (markdown), ... |
created_at |
TIMESTAMP DEFAULT now() |
Indexy:
- (document_id, chunk_index) — pro fetch celého dokumentu v pořadí
- HNSW vector index na embedding_1024 WHERE embedding_1024 IS NOT NULL (pgvector default vector_cosine_ops)
- HNSW vector index na embedding_768 WHERE embedding_768 IS NOT NULL
Multi-dim approach: Místo single embedding column máme 2 columns (jeden pro každou podporovanou dim). Per row jen jedna naplněná (závisí na ai_documents.embedding_dim). Důvod: pgvector vyžaduje fixed dim per index, a per-server pinning by jinak vyžadoval N tabulek. Pokud v budoucnu přidáme třetí model s jinou dim, doplníme embedding_N column + migrace.
3.4 ai_conversation_documents (M:N attach)¶
| Sloupec | Typ | Účel |
|---|---|---|
conversation_id |
UUID FK ai_conversations(id) ON DELETE CASCADE | |
document_id |
UUID FK ai_documents(id) ON DELETE CASCADE | |
attached_at |
TIMESTAMP DEFAULT now() | |
| PK | (conversation_id, document_id) |
Index na (document_id) pro reverse lookup (kde všude je doc attached).
3.5 Rozšíření ai_messages (citations)¶
| Sloupec | Typ | Účel |
|---|---|---|
sources |
JSONB NULL | Pro assistant role: [{"document_id": "uuid", "chunk_index": 3, "score": 0.84, "preview": "..."}] |
UI klient renderuje sources jako tooltip pod assistant zprávou. Kliknutí na source → otevře PDF v download URL s anchor na chunk.start_offset (pokud PDF má text mapping).
4. Migrace 023 (023_ai_rag.py)¶
def upgrade():
# 1. pgvector extension (idempotent)
op.execute("CREATE EXTENSION IF NOT EXISTS vector")
# 2. ai_documents — extend
op.add_column("ai_documents", sa.Column("embedding_server_id", sa.UUID,
sa.ForeignKey("ai_servers.id", ondelete="SET NULL"), nullable=True))
op.add_column("ai_documents", sa.Column("embedding_model", sa.String(255), nullable=True))
op.add_column("ai_documents", sa.Column("embedding_dim", sa.Integer, nullable=True))
op.add_column("ai_documents", sa.Column("chunking_strategy", sa.String(50), nullable=True))
op.add_column("ai_documents", sa.Column("chunk_count", sa.Integer, nullable=True))
op.add_column("ai_documents", sa.Column("indexed_at", sa.DateTime(timezone=True), nullable=True))
op.add_column("ai_documents", sa.Column("index_error", sa.Text, nullable=True))
# Rozšířit status enum
op.drop_constraint("ck_ai_documents_status", "ai_documents")
op.create_check_constraint("ck_ai_documents_status", "ai_documents",
"status IN ('uploaded','processing','indexed','failed','reindexing')")
# 3. ai_chunks
op.create_table(
"ai_chunks",
sa.Column("id", sa.UUID, primary_key=True),
sa.Column("document_id", sa.UUID,
sa.ForeignKey("ai_documents.id", ondelete="CASCADE"), nullable=False),
sa.Column("chunk_index", sa.Integer, nullable=False),
sa.Column("content", sa.Text, nullable=False),
sa.Column("start_offset", sa.Integer, nullable=False),
sa.Column("end_offset", sa.Integer, nullable=False),
# pgvector typy přes execute (Alembic ne-rozumí Vector type direct)
sa.Column("metadata", postgresql.JSONB, server_default="{}"),
sa.Column("created_at", sa.DateTime(timezone=True),
server_default=sa.text("NOW()"), nullable=False),
)
op.execute("ALTER TABLE ai_chunks ADD COLUMN embedding_1024 vector(1024)")
op.execute("ALTER TABLE ai_chunks ADD COLUMN embedding_768 vector(768)")
op.create_index("ix_ai_chunks_doc_idx", "ai_chunks", ["document_id", "chunk_index"])
op.execute("""
CREATE INDEX ix_ai_chunks_emb_1024 ON ai_chunks
USING hnsw (embedding_1024 vector_cosine_ops)
WHERE embedding_1024 IS NOT NULL
""")
op.execute("""
CREATE INDEX ix_ai_chunks_emb_768 ON ai_chunks
USING hnsw (embedding_768 vector_cosine_ops)
WHERE embedding_768 IS NOT NULL
""")
# 4. ai_conversation_documents (M:N)
op.create_table(
"ai_conversation_documents",
sa.Column("conversation_id", sa.UUID,
sa.ForeignKey("ai_conversations.id", ondelete="CASCADE"), nullable=False),
sa.Column("document_id", sa.UUID,
sa.ForeignKey("ai_documents.id", ondelete="CASCADE"), nullable=False),
sa.Column("attached_at", sa.DateTime(timezone=True),
server_default=sa.text("NOW()"), nullable=False),
sa.PrimaryKeyConstraint("conversation_id", "document_id"),
)
op.create_index("ix_ai_conv_docs_document_id", "ai_conversation_documents", ["document_id"])
# 5. ai_messages — sources
op.add_column("ai_messages", sa.Column("sources", postgresql.JSONB, nullable=True))
5. Embedding workflow (Celery)¶
5.1 Task ai.index_document(document_id)¶
@shared_task(name="ai.index_document", max_retries=3, default_retry_delay=60)
def index_document_task(document_id: str) -> dict:
"""Pre-condition: ai_documents.status='uploaded'.
Post-condition: status='indexed' (success) nebo 'failed' (s index_error).
"""
return asyncio.run(_index_document_async(document_id))
async def _index_document_async(document_id: str):
# 1. Lock — UPDATE ai_documents SET status='processing' WHERE id=... AND status='uploaded'
# 2. Pick embedding server (assistant.default_server pokud má capabilities.embeddings,
# jinak první aktivní s capabilities.embeddings=true)
# 3. Download S3 object + extract text
# 4. Chunk podle strategie (late-chunking pokud server.max_input_tokens >= doc_tokens)
# 5. Volá embedding API (LM Studio /v1/embeddings, OpenAI compat)
# 6. INSERT do ai_chunks (batch)
# 7. UPDATE ai_documents SET status='indexed', indexed_at=NOW(), chunk_count=N
5.2 Late-chunking algoritmus¶
- Embed celý dokument jediným voláním
/v1/embeddingssinput: full_text. Vrací token-level embeddings (vyžaduje server podporu — bge-m3 long context model). - Identifikuj sentence/paragraph boundaries v textu (pomocí
nltk.sent_tokenizenebo regex\n\npro markdown). - Pro každý chunk: mean-pool tokenu embeddings co spadají do chunk char range.
- Persist do
ai_chunksschunk_index,start_offset,end_offset, mean-pooledembedding_*.
Fallback fixed-size: Pokud server embedding_max_input_tokens < doc_tokens, split text na chunky 1000 char (overlap 200), embed každý zvlášť.
5.3 Trigger¶
Po úspěšném POST /ai/documents/finalize:
# v finalize endpoint, po db.commit() pro ai_documents row
from app.workers.celery_app import celery_app
celery_app.send_task("ai.index_document", args=[str(doc.id)])
Status klient může polling-ovat přes GET /ai/documents/{id} (existing endpoint vrátí status, chunk_count, index_error).
6. Retrieval workflow¶
6.1 Při user message v send_message¶
async def _retrieve_context(conv: AiConversation, user_query: str,
db: AsyncSession, top_k: int = 5) -> list[dict]:
"""Vrátí top K chunků s metadaty. Empty list pokud žádné docs nebo retrieval skip."""
# 1. Resolve scope
attached = (await db.execute(
select(AiConversationDocument.document_id)
.where(AiConversationDocument.conversation_id == conv.id)
)).scalars().all()
if attached:
doc_filter = AiDocument.id.in_(attached)
else:
# Fallback global: vsechny owner_user_id == user docs ktere jsou indexed
doc_filter = and_(
AiDocument.owner_user_id == conv.user_id,
AiDocument.status == "indexed",
)
# 2. Najdi prevladajici embedding server v docs scope
server_id = (await db.execute(
select(AiDocument.embedding_server_id)
.where(doc_filter, AiDocument.embedding_server_id.isnot(None))
.group_by(AiDocument.embedding_server_id)
.order_by(func.count().desc()).limit(1)
)).scalar_one_or_none()
if not server_id:
return [] # zadne indexed docs
# 3. Embed query (stejnym modelem)
server = await db.get(AiServer, server_id)
query_vec = await ai_svc.embed(server, [user_query])
query_vec = query_vec[0] # single embedding
# 4. Vector search (vybirat column dle dim)
dim = server.capabilities.get("embedding_dim", 1024)
emb_col = "embedding_1024" if dim == 1024 else "embedding_768"
rows = (await db.execute(text(f"""
SELECT c.id, c.document_id, c.chunk_index, c.content, c.start_offset,
c.end_offset, c.{emb_col} <=> :qvec AS distance
FROM ai_chunks c
JOIN ai_documents d ON d.id = c.document_id
WHERE c.{emb_col} IS NOT NULL
AND d.id IN (SELECT * FROM unnest(:doc_ids))
AND d.embedding_server_id = :server_id
ORDER BY c.{emb_col} <=> :qvec
LIMIT :k
"""), {"qvec": query_vec, "doc_ids": resolved_doc_ids,
"server_id": str(server_id), "k": top_k})).mappings().all()
return [{"document_id": str(r["document_id"]), "chunk_index": r["chunk_index"],
"content": r["content"], "score": 1.0 - r["distance"],
"start_offset": r["start_offset"], "end_offset": r["end_offset"]}
for r in rows]
6.2 Context injection¶
# v send_message po retrieve_context
chunks = await _retrieve_context(c, body.content, db, top_k=5)
if chunks:
# Pre-pend chunků jako system kontext (NE jako user message)
context_text = "\n\n".join(
f"[Document {c['document_id']}, chunk {c['chunk_index']}]\n{c['content']}"
for c in chunks
)
# Inject jako další system message před user query
history.insert(0, {"role": "system",
"content": f"Relevant context from user's documents:\n{context_text}"})
sources pro ai_messages.sources:
[
{"document_id": "uuid1", "chunk_index": 3, "score": 0.87, "preview": "Podle § 24 ZDP..."},
{"document_id": "uuid1", "chunk_index": 12, "score": 0.81, "preview": "..."},
{"document_id": "uuid7", "chunk_index": 0, "score": 0.74, "preview": "..."}
]
7. Backend API rozšíření¶
| Endpoint | Účel |
|---|---|
POST /ai/conversations/{id}/documents |
Body {document_id}. Attach doc do konverzace. |
DELETE /ai/conversations/{id}/documents/{doc_id} |
Detach. |
GET /ai/conversations/{id}/documents |
List attached docs (vrátí jejich ai_documents rows + status). |
POST /ai/documents/{id}/reindex |
Force re-trigger indexace (admin / user pokud failed). Status reindexing. |
GET /ai/documents |
Extend — vrací embedding_server_id, chunk_count, indexed_at, index_error |
POST /ai/embed (admin debug) |
Body {server_id, texts} → vector list. Pro testování embedding model. |
POST /ai/conversations/{id}/messages (existing) — backend interně volá _retrieve_context, klient se nezmění.
8. UI changes (launcher2)¶
8.1 Document status badge¶
V SupportScreen AI panelu (kde už je 📎 Historie attach pill), přidat status indikátor:
| Status | Badge |
|---|---|
uploaded |
🟡 Nahráno (čeká na index) |
processing |
⏳ Indexuji… |
indexed |
✓ Indexováno (N chunků) |
failed |
⚠ Index selhal — kliknout pro retry |
reindexing |
⏳ Re-index… |
8.2 Attach do konverzace (nový workflow)¶
Tlačítko 📎 v AI input area otevře nový modal „Vybrat z mé knihovny":
- List uživatelových ai_documents (sort by created_at DESC)
- Filtr „jen indexované" toggle
- Multi-select (víc docs do jedné konverzace)
- Aktivní attach indicator — pills 📄 file.pdf · indexed (12 chunků) ✕ nad input area
8.3 Citations v assistant odpovědi¶
Pod každou assistant bublinou tlačítko 🔗 N zdrojů pokud sources non-empty. Klik otevře pop-out s:
- File name + preview snippet
- Score (% match)
- Tlačítko „Otevřít dokument" → presigned GET URL + anchor (pokud PDF má text mapping)
9. Bezpečnost a izolace¶
- Per-user retrieval scope — vždy filter
ai_documents.owner_user_id = current_user.idv vector search. Žádný global search napříč usery. - Company sharing (z
ai_documents.company_id) — V2 jen owner. Cross-user retrieval v rámci firmy je V3. - Embedding obsahuje text — pokud útočník dostane access k
ai_chunks.content, vidí text dokumentu. RBAC: jen owner user může číst své chunks (přes endpoint, ne přímý DB access). - PII v embedding — embedding sám neuvolňuje text, ale prefix tokenu může korelovat. Žádný cross-company embedding sharing.
10. Telemetrie¶
Backend app/services/ai_rag.py strukturovaný JSON log:
{"msg": "ai_rag_retrieve",
"user_id": "uuid", "conversation_id": "uuid",
"scope": "attached|global",
"candidate_docs": 7, "candidate_chunks": 234,
"top_k": 5, "max_score": 0.87, "min_score": 0.41,
"embed_latency_ms": 120, "search_latency_ms": 8}
Prometheus:
- ai_rag_retrieve_total{scope} counter
- ai_rag_chunks_retrieved histogram
- ai_rag_embed_latency_seconds histogram
- ai_index_jobs_total{status} counter (success/failed)
11. Acceptance criteria¶
Phase 1 — schema + sync embedding (target 2026-05-20)¶
- Migrace
023_ai_rag.pyaplikovaná na avaxdev —vectorextension +ai_chunks+ai_conversation_documents+ai_documentscolumns +ai_messages.sources - LM Studio embedding server zaregistrovaný v
ai_serversscapabilities.embeddings=true, embedding_model=bge-m3, embedding_dim=1024, embedding_max_input_tokens=8192 -
ai_svc.embed(server, texts)funkce — volá/v1/embeddingsLM Studio - Celery task
ai.index_document— fixed-size chunking pro V2.0 první ship (late-chunking je Phase 2) -
POST /ai/documents/finalizetriggernutý Celery task po commit -
POST /ai/documents/{id}/reindexendpoint pro retry - Smoke test: upload PDF → status='processing' → 'indexed' → chunks v DB
Phase 2 — retrieval + UI¶
-
_retrieve_contextv send_message (per-conversation + global fallback) -
POST /ai/conversations/{id}/documentsattach + DELETE detach + GET list - launcher2 attach modal („Vybrat z knihovny")
- Status badge v document list
- Citations 🔗 pod assistant zprávou
Phase 3 — late-chunking + reranking (TBD)¶
- Late-chunking implementace (jediný embed call, token-level mean-pool)
- Cross-encoder reranking nad top K (volitelné)
- Hybrid BM25 + vector
12. Otevřené otázky¶
- Multi-dim search napříč modely — pokud user má docs indexované různými servery (1024 + 768), aktuální implementace volí dominantní server. Lepší: re-index strategy nebo unified retrieval s score normalizace?
- Chunk size pro fixed-size — 1000 znaků (~~250 tokenů cz) je pro paragraph-grained search. Pro shorter QA možná 500.
- Top K hodnota — 5 chunků v contextu znamená ~~1250 tokenů additional. Pro 8k context model OK, pro 4k tight. Adjustovat per assistant
default_max_tokens? - Status polling vs SSE — UI dnes polluje
GET /ai/documents/{id}pro status. SSE notifikace přesevents.documents.indexedchannel by bylo lepší. - Reindex strategy — pokud admin změní
ai_servers.capabilities.embedding_modelna nový, všechny docs indexované tím serverem jsou nekompatibilní. Auto re-index queue? Nebo manuální (admin tlačítko „Reindex all on this server")? - Storage growth —
ai_chunksrychle roste. 100 MB PDF ~~ 50k znaků ~~ 50 chunků × 1024 floats × 4 B = 200 KB embedding storage per doc. Pro 10k docs = 2 GB. Při HNSW indexu ~10x = 20 GB. Capacity plan: pgvector data + index v separate tablespace? Cold storage? - Embedding model upgrade — když LM Studio uveřejní bge-m4, jak migrovat? Background re-index task při capabilities change?
13. Timeline¶
| Fáze | Status | Cíl |
|---|---|---|
| Spec draft | ✓ done | 2026-05-14 |
| Migrace 023 + ai_svc.embed | ⏳ open | 2026-05-18 |
Celery ai.index_document (fixed-size) |
⏳ open | 2026-05-20 |
Retrieval _retrieve_context + send_message integration |
⏳ open | 2026-05-22 |
| launcher2 attach UI + citations | ⏳ open | 2026-05-25 |
| Phase 1 shipped | ⏳ | 2026-05-25 |
| Late-chunking (Phase 2) | ⏳ open | 2026-06-01 |
| Reranking (Phase 3) | ⏳ open | TBD |
14. Související¶
ai-chat.md— V1.5 storage + chat, RAG nestaví bez něhoapps-gateway.md— apps mohou taky používat RAG přes M2M/ai/chataž bude shippeds3-architecture.md— kde leží source dokumentygarbage-collector.md— pokud doc smazán, GC mažme i jeho chunks (CASCADE už hotovo, ale velký delete vector index rebuild může být drahý — TBD)
pgvector reference: github.com/pgvector/pgvector Late-chunking paper: arxiv.org/abs/2409.04701 (Jina AI, 2024) bge-m3: huggingface.co/BAAI/bge-m3 — multilingual, 8192 context