Přeskočit obsah

S3 Architektura — Bezpečnost, uživatelé, adresáře

Rozhodnutá architektura

Otázka Rozhodnutí
Izolace firem Vlastní Ceph RADOS uživatel + vlastní bucket per firma
Šifrování klíčů DB s šifrováním přes env klíč (AES-256-GCM)
Custom shared dirs Ano — company_admin může vytvářet sdílené adresáře

Model: bucket per firma

Ceph / S3 API (https://s3.avaxis.cz)
├── bucket: avaxis-{company_id_1}/    ← RADOS user: avaxis-co-{company_id_1}
│   ├── apps/
│   │   └── {app-slug}/
│   │       ├── data/
│   │       └── backups/
│   ├── users/
│   │   └── {user_id}/                ← soukromý adresář uživatele
│   │       ├── documents/
│   │       └── private/
│   └── shared/
│       ├── all/                      ← všichni uživatelé firmy
│       ├── roles/
│       │   └── {role_id}/            ← jen uživatelé s danou rolí
│       └── custom/
│           └── {dir_id}/             ← company_admin definované adresáře
├── bucket: avaxis-{company_id_2}/    ← RADOS user: avaxis-co-{company_id_2}
│   └── ...
└── bucket: avaxis-system/            ← systémový bucket (launcher, SDK)
    ├── apps/catalog/                 ← binárky aplikací pro distribuci
    └── system/main-app/              ← bootstrapper payload

Výhody oproti prefix-izolaci v jednom bucketu: - Únik klíčů jedné firmy neohrožuje ostatní firmy - Nezávislé billing a kvóty na úrovni Ceph - Jednodušší GDPR výmaz — smaž celý bucket - Přirozenější IAM politika — RADOS user má Full Control na svůj bucket


Ceph RADOS — životní cyklus firmy

Registrace firmy → vytvoření RADOS uživatele + bucketu

# services/storage_admin.py

async def provision_company_storage(company_id: UUID, company_slug: str) -> CompanyStorage:
    bucket_name = f"avaxis-{company_id}"
    rados_user  = f"avaxis-co-{company_id}"

    # 1. Vytvoř RADOS uživatele přes Ceph Admin API (nebo radosgw-admin)
    creds = await ceph_admin.create_user(
        uid=rados_user,
        display_name=f"AVAX Company {company_slug}",
        max_buckets=1,
    )
    # creds: { access_key, secret_key }

    # 2. Vytvoř bucket
    s3 = boto3_client(creds.access_key, creds.secret_key)
    s3.create_bucket(Bucket=bucket_name)

    # 3. Nastav bucket politiku (DELETE zakázán pro tohoto uživatele)
    s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(
        deny_delete_policy(bucket_name, rados_user)
    ))

    # 4. Nastav kvótu (default 1 GB)
    await ceph_admin.set_user_quota(uid=rados_user, max_size_kb=1_048_576)

    # 5. Ulož šifrované credentials do DB
    encrypted = encrypt_credentials(creds.access_key, creds.secret_key)
    await db.add(CompanyS3Credentials(
        company_id=company_id,
        rados_user=rados_user,
        bucket_name=bucket_name,
        access_key_enc=encrypted.access_key,
        secret_key_enc=encrypted.secret_key,
    ))

    return CompanyStorage(bucket=bucket_name, rados_user=rados_user)

Výmaz firmy (GDPR) → smazání bucketu + RADOS uživatele

async def deprovision_company_storage(company_id: UUID):
    creds = await get_company_credentials(company_id)
    # Smaž celý bucket (a vše v něm)
    await ceph_admin.remove_bucket(bucket=creds.bucket_name, purge_objects=True)
    # Smaž RADOS uživatele
    await ceph_admin.remove_user(uid=creds.rados_user)
    # Smaž z DB
    await db.delete(CompanyS3Credentials, company_id=company_id)

Dočasné přihlašovací údaje (STS) — klientský přístup

RADOS/Ceph podporuje STS AssumeRole. Launcher dostane krátkodobý token s oprávněními omezenými na konkrétní prefix v bucketu firmy.

Tok vydání tokenu

Launcher                    Backend API                 Ceph STS
   │                             │                          │
   │── GET /storage/token ──────►│                          │
   │   scope=private             │                          │
   │                             │── AssumeRole ───────────►│
   │                             │   policy: prefix-scoped  │
   │◄── StorageToken ────────────│◄── temp credentials ──── │
   │    (AK, SK, session, TTL)   │    TTL: 1h               │
   │                             │                          │
   │── S3 operace přímo ────────────────────────────────────►Ceph
   │   (temp credentials)        │

API endpoint

GET /storage/token?scope=private
  → users/{user_id}/

GET /storage/token?scope=shared&dir_id={uuid}
  → shared/custom/{dir_id}/
  → Backend ověří: má user_id záznam v directory_access pro tento dir?
  → Oprávnění (read/write) z directory_access.can_read / can_write

GET /storage/token?scope=app&app_slug={slug}
  → apps/{slug}/data/

GET /storage/token?scope=app_backup&app_slug={slug}
  → apps/{slug}/backups/ (GET only)

Výpis adresářů dostupných uživateli:

GET /storage/directories
  → seznam shared_directories kde user má záznam v directory_access
  Response: [{ id, name, description, can_read, can_write, expires_at }]

Admin API — správa přístupů:

GET  /storage/directories                    → výpis (všechny, jen admin)
POST /storage/directories                    → vytvoření nového adresáře
DELETE /storage/directories/{id}             → smazání adresáře

GET  /storage/directories/{id}/access        → výpis ACL záznamy
POST /storage/directories/{id}/access        → udělení přístupu
DELETE /storage/directories/{id}/access/{access_id}  → odebrání přístupu

Udělení přístupu (POST /storage/directories/{id}/access):

// varianta: konkrétní uživatel
{ "user_id": "uuid", "can_read": true, "can_write": true, "expires_at": null }

// varianta: celá role
{ "role_id": "uuid", "can_read": true, "can_write": false }

// varianta: všichni ve firmě
{ "all_company": true, "can_read": true, "can_write": false }

Response:

{
  "endpoint":      "https://s3.avaxis.cz",
  "bucket":        "avaxis-{company_id}",
  "prefix":        "users/{user_id}/",
  "access_key":    "STS_AK_...",
  "secret_key":    "STS_SK_...",
  "session_token": "STS_TOKEN_...",
  "expires_at":    "2026-04-24T11:00:00Z",
  "permissions":   ["s3:GetObject", "s3:PutObject", "s3:ListBucket"]
}

TTL tokenů: | Scope | TTL | Oprávnění | |-------|-----|-----------| | private | 1 h | GET, PUT, LIST | | shared (dle can_read/can_write) | 1 h | dle ACL záznamu | | app data | 15 min | GET, PUT | | app_backup (restore) | 15 min | GET only |

Ověření přístupu ke sdílenému adresáři:

async def check_directory_access(
    user_id: UUID, company_id: UUID, directory_id: UUID
) -> DirectoryAccess | None:
    """Vrátí ACL záznam pokud má uživatel přístup, jinak None."""
    user_roles = await get_user_role_ids(user_id, company_id)

    return await db.scalar(
        select(DirectoryAccess).where(
            DirectoryAccess.directory_id == directory_id,
            # Přístup udělen přímo, přes roli nebo všem
            or_(
                DirectoryAccess.user_id == user_id,
                DirectoryAccess.role_id.in_(user_roles),
                DirectoryAccess.all_company == True,
            ),
            # Expirace
            or_(
                DirectoryAccess.expires_at == None,
                DirectoryAccess.expires_at > now(),
            ),
        ).limit(1)
    )


Šifrování klíčů v DB

# core/encryption.py

import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import base64, secrets

ENCRYPTION_KEY = bytes.fromhex(os.environ["S3_CREDS_ENCRYPTION_KEY"])  # 32 bytes hex

def encrypt_credentials(access_key: str, secret_key: str) -> EncryptedCreds:
    gcm = AESGCM(ENCRYPTION_KEY)
    nonce_ak = secrets.token_bytes(12)
    nonce_sk = secrets.token_bytes(12)
    return EncryptedCreds(
        access_key_enc  = base64.b64encode(nonce_ak + gcm.encrypt(nonce_ak, access_key.encode(),  None)).decode(),
        secret_key_enc  = base64.b64encode(nonce_sk + gcm.encrypt(nonce_sk, secret_key.encode(), None)).decode(),
    )

def decrypt_credential(encrypted: str) -> str:
    gcm = AESGCM(ENCRYPTION_KEY)
    raw   = base64.b64decode(encrypted)
    nonce = raw[:12]
    return gcm.decrypt(nonce, raw[12:], None).decode()

Env proměnná na vm-api:

# Generování (jednou při nasazení):
python -c "import secrets; print(secrets.token_hex(32))"

# .env
S3_CREDS_ENCRYPTION_KEY=<64 hex chars>


DB schéma

-- Credentials per firma (šifrované, jen na backendu)
CREATE TABLE company_s3_credentials (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    company_id      UUID REFERENCES companies(id) UNIQUE NOT NULL,
    rados_user      VARCHAR(200) NOT NULL,       -- avaxis-co-{company_id}
    bucket_name     VARCHAR(200) NOT NULL,       -- avaxis-{company_id}
    access_key_enc  TEXT NOT NULL,               -- AES-256-GCM šifrováno
    secret_key_enc  TEXT NOT NULL,               -- AES-256-GCM šifrováno
    quota_bytes     BIGINT DEFAULT 1073741824,   -- 1 GB default
    created_at      TIMESTAMPTZ DEFAULT NOW(),
    last_rotated    TIMESTAMPTZ
);

-- Audit vydaných STS tokenů
CREATE TABLE storage_token_audit (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id     UUID REFERENCES users(id),
    company_id  UUID REFERENCES companies(id),
    scope       VARCHAR(50) NOT NULL,
    prefix      TEXT NOT NULL,
    permissions TEXT[] NOT NULL,
    issued_at   TIMESTAMPTZ DEFAULT NOW(),
    expires_at  TIMESTAMPTZ NOT NULL,
    client_ip   INET
);

-- Sdílené adresáře (company_admin definuje)
CREATE TABLE shared_directories (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    company_id  UUID REFERENCES companies(id),
    name        VARCHAR(200) NOT NULL,
    description TEXT,
    s3_prefix   TEXT NOT NULL GENERATED ALWAYS AS
                  ('shared/custom/' || id::text || '/') STORED,
    created_by  UUID REFERENCES users(id),
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    is_active   BOOLEAN DEFAULT TRUE
);

-- ACL: kdo má přístup ke sdílenému adresáři (many-to-many)
-- Admin uděluje přístup konkrétním uživatelům nebo rolím
CREATE TABLE directory_access (
    id           UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    directory_id UUID REFERENCES shared_directories(id) ON DELETE CASCADE,
    -- Přesně jedno z těchto tří musí být non-null (CHECK níže):
    user_id      UUID REFERENCES users(id),          -- konkrétní uživatel
    role_id      UUID REFERENCES company_roles(id),  -- celá role
    all_company  BOOLEAN,                            -- TRUE = všichni ve firmě
    -- Oprávnění
    can_read     BOOLEAN NOT NULL DEFAULT TRUE,
    can_write    BOOLEAN NOT NULL DEFAULT TRUE,
    -- Audit
    granted_by   UUID REFERENCES users(id) NOT NULL,
    granted_at   TIMESTAMPTZ DEFAULT NOW(),
    expires_at   TIMESTAMPTZ,                        -- NULL = bez expirace

    CONSTRAINT dir_access_target CHECK (
        (user_id IS NOT NULL)::int +
        (role_id IS NOT NULL)::int +
        (all_company IS TRUE)::int = 1
    ),
    CONSTRAINT dir_access_unique_user
        UNIQUE (directory_id, user_id),
    CONSTRAINT dir_access_unique_role
        UNIQUE (directory_id, role_id)
);

Rotace klíčů (každých 90 dní)

# workers/tasks.py  —  Celery Beat: každou neděli ve 3:00

@celery.task
async def rotate_company_s3_keys():
    stale = await db.execute(
        select(CompanyS3Credentials).where(
            CompanyS3Credentials.last_rotated < now() - timedelta(days=90)
        )
    )
    for creds in stale.scalars():
        new_keys = await ceph_admin.create_user_key(uid=creds.rados_user)
        encrypted = encrypt_credentials(new_keys.access_key, new_keys.secret_key)
        creds.access_key_enc = encrypted.access_key_enc
        creds.secret_key_enc = encrypted.secret_key_enc
        creds.last_rotated   = now()
        # Smaž staré klíče po 5 minutách (grace period pro aktivní STS tokeny)
        await asyncio.sleep(300)
        await ceph_admin.remove_user_key(uid=creds.rados_user, access_key=old_ak)
    await db.commit()

Bezpečnostní přehled

Hrozba Ochrana
Únik klíčů firmy A → přístup k firmě B Separátní bucket per firma, klíče platí jen pro svůj bucket
Únik klíčů z backendu Šifrování AES-256-GCM v DB, dešifrování jen za runtime
Přístup uživatele do dat jiného uživatele STS token omezen na users/{user_id}/ prefix
Cross-role přístup Backend ověří roli před vydáním STS tokenu
Smazání dat (ransomware, chyba) DELETE zakázán bucket policy; jen GC job má DELETE
Replay STS tokenu Ceph STS tokeny jsou stavové — session_token jednorázový
Permanentní klíče na klientovi Nikdy — launcher dostane jen STS (max 1h)
GDPR výmaz firmy Smazání celého bucketu + RADOS uživatele jedním příkazem

Verze: 2026-04-24 | Stav: schváleno