Token validation

This commit is contained in:
Simon Gruber
2026-03-29 16:23:55 +02:00
parent 1ced3eb574
commit df86f98065
2 changed files with 39 additions and 22 deletions
+30 -22
View File
@@ -3,7 +3,7 @@ import sqlite3
from typing import Any
from uuid import uuid4
from fastapi import FastAPI, Header, HTTPException, UploadFile, File
from fastapi import Depends, FastAPI, Header, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
@@ -13,6 +13,7 @@ from .services.account_service import (
choose_new_user_id,
consume_confirmation_token,
create_confirmation_token,
ensure_user_profile_exists,
ensure_owner_access,
get_order_creator_info,
get_user_order_tokens,
@@ -145,13 +146,20 @@ def startup() -> None:
init_db()
def get_existing_user_id(user_id: str = Header(alias="X-User-Id")) -> str:
clean_id = clean_user_id(user_id)
with get_connection() as conn:
ensure_user_profile_exists(conn, clean_id)
return clean_id
@app.get("/api/config")
def get_config() -> dict[str, Any]:
return load_config()
@app.get("/api/me/profile")
def get_my_profile(user_id: str = Header(alias="X-User-Id")) -> dict[str, Any]:
def get_my_profile(user_id: str = Depends(get_existing_user_id)) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
with get_connection() as conn:
@@ -190,7 +198,7 @@ def get_my_profile(user_id: str = Header(alias="X-User-Id")) -> dict[str, Any]:
@app.put("/api/me/profile")
def update_my_profile(
payload: UserProfileUpdateRequest,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
email = clean_user_email(payload.email)
@@ -268,7 +276,7 @@ def lookup_account_by_email(email: str) -> AccountLookupResponse:
@app.post("/api/me/user-id/change/request")
def request_user_id_change(
payload: RequestUserIdChangeRequest,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
@@ -304,7 +312,7 @@ def request_user_id_change(
@app.post("/api/me/email/change/request")
def request_email_change(
payload: RequestEmailChangeRequest,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
new_email = clean_user_email(payload.new_email)
@@ -466,7 +474,7 @@ def confirm_account_action(token: str) -> dict[str, Any]:
@app.post("/api/orders")
def create_order(payload: OrderCreateRequest, user_id: str = Header(alias="X-User-Id")) -> dict[str, Any]:
def create_order(payload: OrderCreateRequest, user_id: str = Depends(get_existing_user_id)) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
order_id = str(uuid4())
admin_token = str(uuid4())
@@ -501,7 +509,7 @@ def create_order(payload: OrderCreateRequest, user_id: str = Header(alias="X-Use
def upload_order_image(
order_id: str,
file: UploadFile = File(...),
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, str]:
"""Upload an image for an order. Requires admin access."""
clean_id = clean_user_id(user_id)
@@ -528,7 +536,7 @@ def upload_order_image(
@app.delete("/api/orders/{order_id}/admin/image")
def delete_order_image(
order_id: str,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, str]:
"""Delete the image for an order. Requires admin access."""
clean_id = clean_user_id(user_id)
@@ -563,7 +571,7 @@ def delete_order_image(
@app.get("/api/orders/me")
def get_my_orders(
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
skip: int = 0,
limit: int = 10,
state: str | None = None,
@@ -663,7 +671,7 @@ def get_my_orders(
@app.get("/api/orders/{order_id}/me")
def get_my_order_access(order_id: str, user_id: str = Header(alias="X-User-Id")) -> dict[str, Any]:
def get_my_order_access(order_id: str, user_id: str = Depends(get_existing_user_id)) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
ensure_order_exists(order_id)
@@ -695,7 +703,7 @@ def get_order(order_id: str) -> dict[str, Any]:
@app.delete("/api/orders/{order_id}")
def delete_order(order_id: str, user_id: str = Header(alias="X-User-Id")) -> dict[str, bool]:
def delete_order(order_id: str, user_id: str = Depends(get_existing_user_id)) -> dict[str, bool]:
clean_id = clean_user_id(user_id)
with get_connection() as conn:
@@ -747,7 +755,7 @@ def get_order_config(order_id: str) -> dict[str, Any]:
def create_submission(
order_id: str,
payload: SubmissionPayload,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
user_email: str | None = Header(default=None, alias="X-User-Email"),
) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
@@ -806,7 +814,7 @@ def create_submission(
@app.get("/api/orders/{order_id}/submissions/me")
def get_my_submission(order_id: str, user_id: str = Header(alias="X-User-Id")) -> dict[str, Any]:
def get_my_submission(order_id: str, user_id: str = Depends(get_existing_user_id)) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
order = ensure_order_exists(order_id)
order_config = load_order_config(order["admin_token"])
@@ -831,7 +839,7 @@ def get_my_submission(order_id: str, user_id: str = Header(alias="X-User-Id")) -
def update_submission(
order_id: str,
payload: SubmissionPayload,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
user_email: str | None = Header(default=None, alias="X-User-Email"),
) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
@@ -888,7 +896,7 @@ def update_submission(
@app.delete("/api/orders/{order_id}/submissions/me")
def delete_submission(order_id: str, user_id: str = Header(alias="X-User-Id")) -> dict[str, bool]:
def delete_submission(order_id: str, user_id: str = Depends(get_existing_user_id)) -> dict[str, bool]:
clean_id = clean_user_id(user_id)
order = ensure_order_exists(order_id)
ensure_order_open(order)
@@ -910,7 +918,7 @@ def delete_submission(order_id: str, user_id: str = Header(alias="X-User-Id")) -
@app.delete("/api/orders/{order_id}/admin/submissions/{submission_id}")
def admin_delete_submission(order_id: str, submission_id: str, user_id: str = Header(alias="X-User-Id")) -> dict[str, bool]:
def admin_delete_submission(order_id: str, submission_id: str, user_id: str = Depends(get_existing_user_id)) -> dict[str, bool]:
clean_id = clean_user_id(user_id)
with get_connection() as conn:
@@ -928,7 +936,7 @@ def admin_delete_submission(order_id: str, submission_id: str, user_id: str = He
@app.get("/api/orders/{order_id}/admin")
def get_admin_view(order_id: str, user_id: str = Header(alias="X-User-Id")) -> dict[str, Any]:
def get_admin_view(order_id: str, user_id: str = Depends(get_existing_user_id)) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
with get_connection() as conn:
@@ -971,7 +979,7 @@ def admin_update_submission_status(
order_id: str,
submission_id: str,
payload: SubmissionStatusUpdateRequest,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
@@ -1019,7 +1027,7 @@ def admin_update_submission_status(
def admin_update_order_status(
order_id: str,
payload: OrderStatusUpdateRequest,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
@@ -1053,7 +1061,7 @@ def admin_update_order_status(
def admin_update_order_description(
order_id: str,
payload: OrderDescriptionUpdateRequest,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, Any]:
clean_id = clean_user_id(user_id)
@@ -1109,7 +1117,7 @@ def admin_update_order_description(
@app.get("/api/orders/{order_id}/admin/config")
def admin_get_order_config(order_id: str, user_id: str = Header(alias="X-User-Id")) -> dict[str, Any]:
def admin_get_order_config(order_id: str, user_id: str = Depends(get_existing_user_id)) -> dict[str, Any]:
"""Get order-specific form configuration."""
clean_id = clean_user_id(user_id)
@@ -1125,7 +1133,7 @@ def admin_get_order_config(order_id: str, user_id: str = Header(alias="X-User-Id
def admin_update_order_config(
order_id: str,
payload: OrderConfig,
user_id: str = Header(alias="X-User-Id"),
user_id: str = Depends(get_existing_user_id),
) -> dict[str, Any]:
"""Update order-specific form configuration."""
clean_id = clean_user_id(user_id)
@@ -82,6 +82,15 @@ def get_user_profile(conn: sqlite3.Connection, user_id: str) -> sqlite3.Row | No
).fetchone()
def ensure_user_profile_exists(conn: sqlite3.Connection, user_id: str) -> None:
row = conn.execute(
"SELECT user_id FROM user_profiles WHERE user_id = ?",
(user_id,),
).fetchone()
if not row:
raise HTTPException(status_code=401, detail="Invalid user ID")
def create_confirmation_token(
conn: sqlite3.Connection,
*,