Přeskočit obsah

ai-rag — specifikace (V2 retrieval pro AI dokumenty)

Status: draft Verze spec: 0.1 Aktualizováno: 2026-05-14 Závisí na: ai-chat.md v1.0 — ai_documents storage hotový, retrieval chybí

1. Cíl

Po V1.5 (ai-chat shipped) AI dnes: - ✓ Číst text z PDF/DOCX/TXT inline v user message (klient-side extract, max 20k znaků v _ai_compose_message) - ✓ Storage pro libovolnou velikost dokumentu (ai_documents + S3, hash dedup, max 100 MB) - ✗ AI nevidí dokumenty co nejsou inline — uploadnutý 50MB PDF leží v S3, ale chat ho neumí query-ovat

Cíl V2: přidat retrieval augmentation — když uživatel napíše dotaz, AI dostane relevantní úryvky z jeho dokumentů, ne celé soubory.

Klíčové vlastnosti V2: - Multi-model embedding — admin per-server volí bge-m3 (1024 dim, multilingual) nebo nomic-embed-text-v1.5 (768 dim). Dokument je pin-nutý na embedding server (cross-server search nemožný bez re-index). - Hybrid retrieval — per-conversation attach (uživatel přidá doc) má precedenci, fallback na global per-user search pokud žádný attach. - Late-chunking preferovaný (jediný embed call per doc + token-level chunky retrieval-time), fallback fixed-size pro krátké dokumenty / single-pass models. - Async indexing přes Celery — ai_documents.status přechody: uploaded → processing → indexed | failed. - Citations v assistant odpovědi — ai_messages.sources JSONB s [{document_id, chunk_pos, score}, ...] pro UI tooltip „čerpáno z…".

Co V2 NEDĚLÁ (záměrně): - Reranking přes cross-encoder — Phase 3, prozatím pouhý cosine similarity z embedding. - Hybrid BM25 + vector — Phase 3, jen vector teď. - Cross-language retrieval (CZ query → EN doc) — bge-m3 to umí v 5 z 6 případů; není explicit feature. - Chat history embedding — RAG jen nad dokumenty, ne nad předchozími konverzacemi. - Web search injection — žádná external search (Google/Bing/Brave).

2. Architektura

┌────────────────────────────────────────────────────────────────────┐
│  User uploaduje PDF → /ai/documents/upload-url → /finalize         │
│  ai_documents.status = 'uploaded'                                  │
└────────────────────────────────────┬───────────────────────────────┘
                                     │ Celery dispatch
┌────────────────────────────────────────────────────────────────────┐
│  Celery task `ai.index_document(doc_id)`                           │
│  1. Download S3 object                                             │
│  2. Extract text (PDF: pypdf, DOCX: python-docx, plain: as-is)     │
│  3. Volá embedding server (`/ai/servers/{embedding_server_id}/...`)│
│     - Late-chunking: jeden /v1/embeddings call s celým doc         │
│     - Fixed-size: split na 1000 char chunky, batch embed           │
│  4. Persist do `ai_chunks` (embedding vector + pos + content)      │
│  5. ai_documents.status = 'indexed'                                │
└────────────────────────────────────┬───────────────────────────────┘
┌────────────────────────────────────▼───────────────────────────────┐
│  Uživatel chat — POST /ai/conversations/{id}/messages              │
│  send_message() flow:                                              │
│  1. Persist user message                                           │
│  2. RAG: pokud konverzace má attach docs → search v nich           │
│         jinak global per-user search                               │
│         - Embed query (stejným modelem co byl použit při index)    │
│         - SELECT ... ORDER BY embedding <-> query_vec LIMIT 5      │
│         - top K chunků pre-pend jako system context                │
│  3. LLM call s rozšířeným kontextem                                │
│  4. Persist assistant message s `sources` JSONB                    │
│  5. Stream SSE klientovi (token + sources event)                   │
└────────────────────────────────────────────────────────────────────┘

3. Datový model

3.1 Rozšíření ai_servers (capabilities)

Sloupec capabilities JSONB rozšířen o embedding metadata:

{
  "chat": true,
  "embeddings": true,
  "embedding_model": "text-embedding-bge-m3",
  "embedding_dim": 1024,
  "embedding_max_input_tokens": 8192
}

Admin v UI při create/edit serveru vyplní pokud má embedding capability.

3.2 Rozšíření ai_documents (status + embedding ref)

Sloupec Typ Účel
embedding_server_id UUID FK ai_servers(id) NULL Server jehož embedding model byl použit. NULL = ne-indexed.
embedding_model TEXT NULL Kopie z ai_servers.capabilities.embedding_model v čase index (immutable).
embedding_dim INT NULL Kopie z capabilities (1024, 768, …).
chunking_strategy TEXT NULL late / fixed-1000-200 / markdown
chunk_count INT NULL Počet chunků v ai_chunks (pro UI status)
indexed_at TIMESTAMP NULL Kdy bylo indexování dokončeno
index_error TEXT NULL Pokud status='failed', popis

status CHECK rozšíří na: uploaded | processing | indexed | failed | reindexing.

3.3 ai_chunks (vector storage)

Sloupec Typ Účel
id UUID PK
document_id UUID FK ai_documents(id) ON DELETE CASCADE
chunk_index INT NOT NULL 0-based pořadí v dokumentu (pro citation UI)
content TEXT NOT NULL Text chunku (search-time fallback display)
start_offset INT NOT NULL Pozice v původním textu (char) — pro citation hover „ukázat v dokumentu"
end_offset INT NOT NULL
embedding_1024 vector(1024) NULL bge-m3 vector
embedding_768 vector(768) NULL nomic-embed vector
metadata JSONB DEFAULT '{}' Page number (PDF), heading hierarchy (markdown), ...
created_at TIMESTAMP DEFAULT now()

Indexy: - (document_id, chunk_index) — pro fetch celého dokumentu v pořadí - HNSW vector index na embedding_1024 WHERE embedding_1024 IS NOT NULL (pgvector default vector_cosine_ops) - HNSW vector index na embedding_768 WHERE embedding_768 IS NOT NULL

Multi-dim approach: Místo single embedding column máme 2 columns (jeden pro každou podporovanou dim). Per row jen jedna naplněná (závisí na ai_documents.embedding_dim). Důvod: pgvector vyžaduje fixed dim per index, a per-server pinning by jinak vyžadoval N tabulek. Pokud v budoucnu přidáme třetí model s jinou dim, doplníme embedding_N column + migrace.

3.4 ai_conversation_documents (M:N attach)

Sloupec Typ Účel
conversation_id UUID FK ai_conversations(id) ON DELETE CASCADE
document_id UUID FK ai_documents(id) ON DELETE CASCADE
attached_at TIMESTAMP DEFAULT now()
PK (conversation_id, document_id)

Index na (document_id) pro reverse lookup (kde všude je doc attached).

3.5 Rozšíření ai_messages (citations)

Sloupec Typ Účel
sources JSONB NULL Pro assistant role: [{"document_id": "uuid", "chunk_index": 3, "score": 0.84, "preview": "..."}]

UI klient renderuje sources jako tooltip pod assistant zprávou. Kliknutí na source → otevře PDF v download URL s anchor na chunk.start_offset (pokud PDF má text mapping).

4. Migrace 023 (023_ai_rag.py)

def upgrade():
    # 1. pgvector extension (idempotent)
    op.execute("CREATE EXTENSION IF NOT EXISTS vector")

    # 2. ai_documents — extend
    op.add_column("ai_documents", sa.Column("embedding_server_id", sa.UUID,
                  sa.ForeignKey("ai_servers.id", ondelete="SET NULL"), nullable=True))
    op.add_column("ai_documents", sa.Column("embedding_model", sa.String(255), nullable=True))
    op.add_column("ai_documents", sa.Column("embedding_dim", sa.Integer, nullable=True))
    op.add_column("ai_documents", sa.Column("chunking_strategy", sa.String(50), nullable=True))
    op.add_column("ai_documents", sa.Column("chunk_count", sa.Integer, nullable=True))
    op.add_column("ai_documents", sa.Column("indexed_at", sa.DateTime(timezone=True), nullable=True))
    op.add_column("ai_documents", sa.Column("index_error", sa.Text, nullable=True))
    # Rozšířit status enum
    op.drop_constraint("ck_ai_documents_status", "ai_documents")
    op.create_check_constraint("ck_ai_documents_status", "ai_documents",
        "status IN ('uploaded','processing','indexed','failed','reindexing')")

    # 3. ai_chunks
    op.create_table(
        "ai_chunks",
        sa.Column("id", sa.UUID, primary_key=True),
        sa.Column("document_id", sa.UUID,
                  sa.ForeignKey("ai_documents.id", ondelete="CASCADE"), nullable=False),
        sa.Column("chunk_index", sa.Integer, nullable=False),
        sa.Column("content", sa.Text, nullable=False),
        sa.Column("start_offset", sa.Integer, nullable=False),
        sa.Column("end_offset", sa.Integer, nullable=False),
        # pgvector typy přes execute (Alembic ne-rozumí Vector type direct)
        sa.Column("metadata", postgresql.JSONB, server_default="{}"),
        sa.Column("created_at", sa.DateTime(timezone=True),
                  server_default=sa.text("NOW()"), nullable=False),
    )
    op.execute("ALTER TABLE ai_chunks ADD COLUMN embedding_1024 vector(1024)")
    op.execute("ALTER TABLE ai_chunks ADD COLUMN embedding_768 vector(768)")
    op.create_index("ix_ai_chunks_doc_idx", "ai_chunks", ["document_id", "chunk_index"])
    op.execute("""
        CREATE INDEX ix_ai_chunks_emb_1024 ON ai_chunks
        USING hnsw (embedding_1024 vector_cosine_ops)
        WHERE embedding_1024 IS NOT NULL
    """)
    op.execute("""
        CREATE INDEX ix_ai_chunks_emb_768 ON ai_chunks
        USING hnsw (embedding_768 vector_cosine_ops)
        WHERE embedding_768 IS NOT NULL
    """)

    # 4. ai_conversation_documents (M:N)
    op.create_table(
        "ai_conversation_documents",
        sa.Column("conversation_id", sa.UUID,
                  sa.ForeignKey("ai_conversations.id", ondelete="CASCADE"), nullable=False),
        sa.Column("document_id", sa.UUID,
                  sa.ForeignKey("ai_documents.id", ondelete="CASCADE"), nullable=False),
        sa.Column("attached_at", sa.DateTime(timezone=True),
                  server_default=sa.text("NOW()"), nullable=False),
        sa.PrimaryKeyConstraint("conversation_id", "document_id"),
    )
    op.create_index("ix_ai_conv_docs_document_id", "ai_conversation_documents", ["document_id"])

    # 5. ai_messages — sources
    op.add_column("ai_messages", sa.Column("sources", postgresql.JSONB, nullable=True))

5. Embedding workflow (Celery)

5.1 Task ai.index_document(document_id)

@shared_task(name="ai.index_document", max_retries=3, default_retry_delay=60)
def index_document_task(document_id: str) -> dict:
    """Pre-condition: ai_documents.status='uploaded'.
    Post-condition: status='indexed' (success) nebo 'failed' (s index_error).
    """
    return asyncio.run(_index_document_async(document_id))


async def _index_document_async(document_id: str):
    # 1. Lock — UPDATE ai_documents SET status='processing' WHERE id=... AND status='uploaded'
    # 2. Pick embedding server (assistant.default_server pokud má capabilities.embeddings,
    #    jinak první aktivní s capabilities.embeddings=true)
    # 3. Download S3 object + extract text
    # 4. Chunk podle strategie (late-chunking pokud server.max_input_tokens >= doc_tokens)
    # 5. Volá embedding API (LM Studio /v1/embeddings, OpenAI compat)
    # 6. INSERT do ai_chunks (batch)
    # 7. UPDATE ai_documents SET status='indexed', indexed_at=NOW(), chunk_count=N

5.2 Late-chunking algoritmus

  1. Embed celý dokument jediným voláním /v1/embeddings s input: full_text. Vrací token-level embeddings (vyžaduje server podporu — bge-m3 long context model).
  2. Identifikuj sentence/paragraph boundaries v textu (pomocí nltk.sent_tokenize nebo regex \n\n pro markdown).
  3. Pro každý chunk: mean-pool tokenu embeddings co spadají do chunk char range.
  4. Persist do ai_chunks s chunk_index, start_offset, end_offset, mean-pooled embedding_*.

Fallback fixed-size: Pokud server embedding_max_input_tokens < doc_tokens, split text na chunky 1000 char (overlap 200), embed každý zvlášť.

5.3 Trigger

Po úspěšném POST /ai/documents/finalize:

# v finalize endpoint, po db.commit() pro ai_documents row
from app.workers.celery_app import celery_app
celery_app.send_task("ai.index_document", args=[str(doc.id)])

Status klient může polling-ovat přes GET /ai/documents/{id} (existing endpoint vrátí status, chunk_count, index_error).

6. Retrieval workflow

6.1 Při user message v send_message

async def _retrieve_context(conv: AiConversation, user_query: str,
                              db: AsyncSession, top_k: int = 5) -> list[dict]:
    """Vrátí top K chunků s metadaty. Empty list pokud žádné docs nebo retrieval skip."""
    # 1. Resolve scope
    attached = (await db.execute(
        select(AiConversationDocument.document_id)
        .where(AiConversationDocument.conversation_id == conv.id)
    )).scalars().all()

    if attached:
        doc_filter = AiDocument.id.in_(attached)
    else:
        # Fallback global: vsechny owner_user_id == user docs ktere jsou indexed
        doc_filter = and_(
            AiDocument.owner_user_id == conv.user_id,
            AiDocument.status == "indexed",
        )

    # 2. Najdi prevladajici embedding server v docs scope
    server_id = (await db.execute(
        select(AiDocument.embedding_server_id)
        .where(doc_filter, AiDocument.embedding_server_id.isnot(None))
        .group_by(AiDocument.embedding_server_id)
        .order_by(func.count().desc()).limit(1)
    )).scalar_one_or_none()
    if not server_id:
        return []  # zadne indexed docs

    # 3. Embed query (stejnym modelem)
    server = await db.get(AiServer, server_id)
    query_vec = await ai_svc.embed(server, [user_query])
    query_vec = query_vec[0]  # single embedding

    # 4. Vector search (vybirat column dle dim)
    dim = server.capabilities.get("embedding_dim", 1024)
    emb_col = "embedding_1024" if dim == 1024 else "embedding_768"

    rows = (await db.execute(text(f"""
        SELECT c.id, c.document_id, c.chunk_index, c.content, c.start_offset,
               c.end_offset, c.{emb_col} <=> :qvec AS distance
        FROM ai_chunks c
        JOIN ai_documents d ON d.id = c.document_id
        WHERE c.{emb_col} IS NOT NULL
          AND d.id IN (SELECT * FROM unnest(:doc_ids))
          AND d.embedding_server_id = :server_id
        ORDER BY c.{emb_col} <=> :qvec
        LIMIT :k
    """), {"qvec": query_vec, "doc_ids": resolved_doc_ids,
           "server_id": str(server_id), "k": top_k})).mappings().all()

    return [{"document_id": str(r["document_id"]), "chunk_index": r["chunk_index"],
             "content": r["content"], "score": 1.0 - r["distance"],
             "start_offset": r["start_offset"], "end_offset": r["end_offset"]}
            for r in rows]

6.2 Context injection

# v send_message po retrieve_context
chunks = await _retrieve_context(c, body.content, db, top_k=5)

if chunks:
    # Pre-pend chunků jako system kontext (NE jako user message)
    context_text = "\n\n".join(
        f"[Document {c['document_id']}, chunk {c['chunk_index']}]\n{c['content']}"
        for c in chunks
    )
    # Inject jako další system message před user query
    history.insert(0, {"role": "system",
                       "content": f"Relevant context from user's documents:\n{context_text}"})

sources pro ai_messages.sources:

[
  {"document_id": "uuid1", "chunk_index": 3, "score": 0.87, "preview": "Podle § 24 ZDP..."},
  {"document_id": "uuid1", "chunk_index": 12, "score": 0.81, "preview": "..."},
  {"document_id": "uuid7", "chunk_index": 0, "score": 0.74, "preview": "..."}
]

7. Backend API rozšíření

Endpoint Účel
POST /ai/conversations/{id}/documents Body {document_id}. Attach doc do konverzace.
DELETE /ai/conversations/{id}/documents/{doc_id} Detach.
GET /ai/conversations/{id}/documents List attached docs (vrátí jejich ai_documents rows + status).
POST /ai/documents/{id}/reindex Force re-trigger indexace (admin / user pokud failed). Status reindexing.
GET /ai/documents Extend — vrací embedding_server_id, chunk_count, indexed_at, index_error
POST /ai/embed (admin debug) Body {server_id, texts} → vector list. Pro testování embedding model.

POST /ai/conversations/{id}/messages (existing) — backend interně volá _retrieve_context, klient se nezmění.

8. UI changes (launcher2)

8.1 Document status badge

V SupportScreen AI panelu (kde už je 📎 Historie attach pill), přidat status indikátor:

Status Badge
uploaded 🟡 Nahráno (čeká na index)
processing Indexuji…
indexed Indexováno (N chunků)
failed Index selhal — kliknout pro retry
reindexing Re-index…

8.2 Attach do konverzace (nový workflow)

Tlačítko 📎 v AI input area otevře nový modal „Vybrat z mé knihovny": - List uživatelových ai_documents (sort by created_at DESC) - Filtr „jen indexované" toggle - Multi-select (víc docs do jedné konverzace) - Aktivní attach indicator — pills 📄 file.pdf · indexed (12 chunků) ✕ nad input area

8.3 Citations v assistant odpovědi

Pod každou assistant bublinou tlačítko 🔗 N zdrojů pokud sources non-empty. Klik otevře pop-out s: - File name + preview snippet - Score (% match) - Tlačítko „Otevřít dokument" → presigned GET URL + anchor (pokud PDF má text mapping)

9. Bezpečnost a izolace

  • Per-user retrieval scope — vždy filter ai_documents.owner_user_id = current_user.id v vector search. Žádný global search napříč usery.
  • Company sharing (z ai_documents.company_id) — V2 jen owner. Cross-user retrieval v rámci firmy je V3.
  • Embedding obsahuje text — pokud útočník dostane access k ai_chunks.content, vidí text dokumentu. RBAC: jen owner user může číst své chunks (přes endpoint, ne přímý DB access).
  • PII v embedding — embedding sám neuvolňuje text, ale prefix tokenu může korelovat. Žádný cross-company embedding sharing.

10. Telemetrie

Backend app/services/ai_rag.py strukturovaný JSON log:

{"msg": "ai_rag_retrieve",
 "user_id": "uuid", "conversation_id": "uuid",
 "scope": "attached|global",
 "candidate_docs": 7, "candidate_chunks": 234,
 "top_k": 5, "max_score": 0.87, "min_score": 0.41,
 "embed_latency_ms": 120, "search_latency_ms": 8}

Prometheus: - ai_rag_retrieve_total{scope} counter - ai_rag_chunks_retrieved histogram - ai_rag_embed_latency_seconds histogram - ai_index_jobs_total{status} counter (success/failed)

11. Acceptance criteria

Phase 1 — schema + sync embedding (target 2026-05-20)

  • Migrace 023_ai_rag.py aplikovaná na avaxdev — vector extension + ai_chunks + ai_conversation_documents + ai_documents columns + ai_messages.sources
  • LM Studio embedding server zaregistrovaný v ai_servers s capabilities.embeddings=true, embedding_model=bge-m3, embedding_dim=1024, embedding_max_input_tokens=8192
  • ai_svc.embed(server, texts) funkce — volá /v1/embeddings LM Studio
  • Celery task ai.index_document — fixed-size chunking pro V2.0 první ship (late-chunking je Phase 2)
  • POST /ai/documents/finalize triggernutý Celery task po commit
  • POST /ai/documents/{id}/reindex endpoint pro retry
  • Smoke test: upload PDF → status='processing' → 'indexed' → chunks v DB

Phase 2 — retrieval + UI

  • _retrieve_context v send_message (per-conversation + global fallback)
  • POST /ai/conversations/{id}/documents attach + DELETE detach + GET list
  • launcher2 attach modal („Vybrat z knihovny")
  • Status badge v document list
  • Citations 🔗 pod assistant zprávou

Phase 3 — late-chunking + reranking (TBD)

  • Late-chunking implementace (jediný embed call, token-level mean-pool)
  • Cross-encoder reranking nad top K (volitelné)
  • Hybrid BM25 + vector

12. Otevřené otázky

  • Multi-dim search napříč modely — pokud user má docs indexované různými servery (1024 + 768), aktuální implementace volí dominantní server. Lepší: re-index strategy nebo unified retrieval s score normalizace?
  • Chunk size pro fixed-size — 1000 znaků (~~250 tokenů cz) je pro paragraph-grained search. Pro shorter QA možná 500.
  • Top K hodnota — 5 chunků v contextu znamená ~~1250 tokenů additional. Pro 8k context model OK, pro 4k tight. Adjustovat per assistant default_max_tokens?
  • Status polling vs SSE — UI dnes polluje GET /ai/documents/{id} pro status. SSE notifikace přes events.documents.indexed channel by bylo lepší.
  • Reindex strategy — pokud admin změní ai_servers.capabilities.embedding_model na nový, všechny docs indexované tím serverem jsou nekompatibilní. Auto re-index queue? Nebo manuální (admin tlačítko „Reindex all on this server")?
  • Storage growthai_chunks rychle roste. 100 MB PDF ~~ 50k znaků ~~ 50 chunků × 1024 floats × 4 B = 200 KB embedding storage per doc. Pro 10k docs = 2 GB. Při HNSW indexu ~10x = 20 GB. Capacity plan: pgvector data + index v separate tablespace? Cold storage?
  • Embedding model upgrade — když LM Studio uveřejní bge-m4, jak migrovat? Background re-index task při capabilities change?

13. Timeline

Fáze Status Cíl
Spec draft ✓ done 2026-05-14
Migrace 023 + ai_svc.embed ⏳ open 2026-05-18
Celery ai.index_document (fixed-size) ⏳ open 2026-05-20
Retrieval _retrieve_context + send_message integration ⏳ open 2026-05-22
launcher2 attach UI + citations ⏳ open 2026-05-25
Phase 1 shipped 2026-05-25
Late-chunking (Phase 2) ⏳ open 2026-06-01
Reranking (Phase 3) ⏳ open TBD

14. Související

  • ai-chat.md — V1.5 storage + chat, RAG nestaví bez něho
  • apps-gateway.md — apps mohou taky používat RAG přes M2M /ai/chat až bude shipped
  • s3-architecture.md — kde leží source dokumenty
  • garbage-collector.md — pokud doc smazán, GC mažme i jeho chunks (CASCADE už hotovo, ale velký delete vector index rebuild může být drahý — TBD)

pgvector reference: github.com/pgvector/pgvector Late-chunking paper: arxiv.org/abs/2409.04701 (Jina AI, 2024) bge-m3: huggingface.co/BAAI/bge-m3 — multilingual, 8192 context