feat(project): init

This commit is contained in:
2026-02-02 14:27:56 +08:00
commit ed3219deb4
46 changed files with 7235 additions and 0 deletions

5
.cargo/config.toml Normal file
View File

@@ -0,0 +1,5 @@
[registries.kellnr]
index = "sparse+https://kellnr.shay7sev.site/api/v1/crates/"
[net]
git-fetch-with-cli = true

19
.env.example Normal file
View File

@@ -0,0 +1,19 @@
SERVICE_NAME=cms-service
LOG_LEVEL=info
LOG_TO_FILE=false
LOG_DIR=./log
LOG_FILE_NAME=cms.log
PORT=3100
DATABASE_URL=postgres://cms_service_user:cms_service_password@127.0.0.1:5432/cms_service_db
DB_MAX_CONNECTIONS=20
DB_MIN_CONNECTIONS=5
IAM_BASE_URL=http://127.0.0.1:3000
IAM_JWKS_URL=
JWT_PUBLIC_KEY_PEM=
IAM_TIMEOUT_MS=2000
IAM_CACHE_TTL_SECONDS=10
IAM_STALE_IF_ERROR_SECONDS=60
IAM_CACHE_MAX_ENTRIES=50000

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/target
.env

7
CHANGELOG.md Normal file
View File

@@ -0,0 +1,7 @@
# Changelog
## [0.1.0] - 2026-01-31
- 初始化 cms-serviceDDD 目录结构、Telemetry 基础集成、文档与健康检查入口
- 完成 CMS 核心能力:栏目/文章/媒体/标签分类、草稿发布、版本回滚、分页搜索
- 复用 IAMJWT 认证与租户隔离中间件、RBAC 鉴权接口iam-client + 本地缓存/降级)

3456
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

52
Cargo.toml Normal file
View File

@@ -0,0 +1,52 @@
[package]
name = "cms-service"
version = "0.1.0"
edition = "2024"
[dependencies]
common-telemetry = { version = "0.1.5", registry = "kellnr", default-features = false, features = [
"response",
"telemetry",
"with-anyhow",
"with-sqlx",
] }
auth-kit = { path = "../auth-kit" }
axum = "0.8.8"
http = "1.4.0"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sqlx = { version = "0.8", features = [
"chrono",
"json",
"postgres",
"runtime-tokio-native-tls",
"uuid",
"migrate",
] }
uuid = { version = "1", features = ["v4", "serde"] }
chrono = { version = "0.4.43", features = ["serde"] }
config = "0.15.19"
dotenvy = "0.15"
anyhow = "1"
thiserror = "2.0.18"
tracing = "0.1"
tracing-subscriber = "0.3"
utoipa = { version = "5", features = ["axum_extras", "uuid", "chrono"] }
utoipa-scalar = { version = "0.3.0", features = ["axum"] }
futures-util = "0.3"
tower = "0.5"
dashmap = "6.1.0"
reqwest = { version = "0.12", default-features = false, features = [
"json",
"rustls-tls",
] }

109
README.md Normal file
View File

@@ -0,0 +1,109 @@
# CMS Service
内容管理系统CMS微服务遵循 DDD 分层api/application/domain/infrastructure并与 IAM 服务集成实现“认证 + 租户隔离 + 权限裁决”:
- 认证/租户隔离:复用 [auth-kit](file:///home/shay/project/backend/auth-kit/README.md) 中间件JWT 验签 + `X-Tenant-ID` 一致性校验)
- 权限裁决:通过 iam-client 调用 IAM `POST /authorize/check`CMS 不内置 RBAC 聚合逻辑)
## 技术栈
- Rustedition 2024 / Tokio
- WebAxum
- DBPostgreSQL / SQLx启动时自动运行 migrations
- 文档utoipa + Scalar`GET /scalar`
- 观测tracing + `common-telemetry`
- 鉴权集成:
- JWTRS256优先 JWKS 拉取,或配置静态公钥 PEM
- RBAC由 IAM 统一裁决
## 项目结构
DDD 分层目录:
- [src/api](file:///home/shay/project/backend/cms-service/src/api)HTTP 层(路由/handlers/中间件/OpenAPI
- [src/application](file:///home/shay/project/backend/cms-service/src/application):应用服务编排(面向用例)
- [src/domain](file:///home/shay/project/backend/cms-service/src/domain):领域模型(实体/DTO
- [src/infrastructure](file:///home/shay/project/backend/cms-service/src/infrastructure)基础设施DB、repositories、iam-client
关键入口文件:
- [main.rs](file:///home/shay/project/backend/cms-service/src/main.rs)配置加载、Telemetry、DB 连接、迁移、JWT/Tenant 中间件挂载
- [api/mod.rs](file:///home/shay/project/backend/cms-service/src/api/mod.rs):路由组装(`/v1``/scalar``/healthz`
- [docs/API.md](file:///home/shay/project/backend/cms-service/docs/API.md):接口概览与权限点
## 快速开始(本地开发)
1. 复制并修改环境变量:
- `cp .env.example .env`
2. 准备 PostgreSQL 并配置 `DATABASE_URL`
3. 启动服务(会自动运行 migrations
- `cargo run`
## 文档
- Scalar`GET /scalar`
- 健康检查:`GET /healthz`
## APIv1
资源入口:
- 栏目:`/v1/columns`
- 标签/分类:`/v1/tags`
- 媒体库:`/v1/media`
- 文章:`/v1/articles`
更完整的接口清单与权限点见: [docs/API.md](file:///home/shay/project/backend/cms-service/docs/API.md)
## 鉴权与租户隔离
- 必须携带:
- `Authorization: Bearer <access_token>`
- `X-Tenant-ID: <tenant_uuid>`(与 token 内 tenant_id 不一致将被拒绝)
- CMS 侧 JWT 校验基于 IAM 的公钥:
- 优先读 `JWT_PUBLIC_KEY_PEM`(静态公钥,无需访问 IAM
- 否则读取 `IAM_JWKS_URL`(未配置则默认 `IAM_BASE_URL + /.well-known/jwks.json`
- 权限校验由 IAM 统一裁决CMS 侧仅通过 iam-client 调用 IAM `/authorize/check`)。
## 配置项(环境变量)
基础:
- `DATABASE_URL`PostgreSQL 连接串(必填)
- `PORT`:监听端口(默认 3100
- `SERVICE_NAME` / `LOG_LEVEL` / `LOG_TO_FILE` / `LOG_DIR` / `LOG_FILE_NAME`:日志与 Telemetry
IAM 集成:
- `IAM_BASE_URL`IAM 服务地址(默认 `http://localhost:3000`
- `IAM_JWKS_URL`JWKS 地址(可选;未配置时使用 `IAM_BASE_URL + /.well-known/jwks.json`
- `JWT_PUBLIC_KEY_PEM`:静态公钥 PEM可选配置后不走 JWKS 拉取)
- `IAM_TIMEOUT_MS`:调用 IAM 超时(默认 2000ms
- `IAM_CACHE_TTL_SECONDS`:鉴权结果缓存 TTL默认 10s
- `IAM_STALE_IF_ERROR_SECONDS`IAM 不可用时使用 stale cache 的窗口(默认 60s
- `IAM_CACHE_MAX_ENTRIES`:缓存最大条目数(默认 50000超过会清空
示例配置见: [.env.example](file:///home/shay/project/backend/cms-service/.env.example)
## 与 IAM 的对接约束
CMS 运行时依赖 IAM 提供以下能力:
- 公钥发布:`GET /.well-known/jwks.json`(用于 RS256 验签;若使用 `JWT_PUBLIC_KEY_PEM` 可不依赖此端点)
- 权限裁决:`POST /authorize/check`(由 iam-client 调用;用于 `cms:*` 权限点校验)
## 数据库迁移
- 迁移文件目录: [migrations](file:///home/shay/project/backend/cms-service/migrations)
- 启动时自动执行:见 [db::run_migrations](file:///home/shay/project/backend/cms-service/src/infrastructure/db/mod.rs#L14-L16)
- 运维脚本migrate/verify/rollback见 [scripts/db/README.md](file:///home/shay/project/backend/cms-service/scripts/db/README.md)
## 测试
```bash
cargo test
```
当前包含:
- iam-client 缓存/降级测试: [iam_client_cache.rs](file:///home/shay/project/backend/cms-service/tests/iam_client_cache.rs)

49
docs/API.md Normal file
View File

@@ -0,0 +1,49 @@
# CMS Service API概览
CMS 对外暴露 RESTful API并提供 Scalar 文档:
- `GET /scalar`
## 通用约定
- Header
- `Authorization: Bearer <access_token>`
- `X-Tenant-ID: <tenant_uuid>`
- JWT 校验:默认从 IAM 的 `/.well-known/jwks.json` 获取公钥(也可配置 `JWT_PUBLIC_KEY_PEM` 静态公钥)
- 所有资源均为多租户数据:所有表均包含 `tenant_id` 字段,并在查询/写入时强制按 `tenant_id` 过滤。
- 权限校验CMS 侧不实现 RBAC 规则聚合,仅通过 iam-client 调用 IAM `POST /authorize/check` 由 IAM 裁决。
## 接口清单v1
### 栏目Column
- `POST /v1/columns``cms:column:write`
- `GET /v1/columns``cms:column:read`,分页/搜索)
- `GET /v1/columns/{id}``cms:column:read`
- `PATCH /v1/columns/{id}``cms:column:write`
- `DELETE /v1/columns/{id}``cms:column:write`
### 标签/分类Tag
- `POST /v1/tags``cms:tag:write``kind` 支持 `tag|category`
- `GET /v1/tags``cms:tag:read`,分页/搜索/按 kind 过滤)
- `GET /v1/tags/{id}``cms:tag:read`
- `PATCH /v1/tags/{id}``cms:tag:write`
- `DELETE /v1/tags/{id}``cms:tag:write`
### 媒体库Media
- `POST /v1/media``cms:media:manage`,登记 URL/元数据)
- `GET /v1/media``cms:media:read`,分页/搜索)
- `GET /v1/media/{id}``cms:media:read`
- `DELETE /v1/media/{id}``cms:media:manage`
### 文章Article
- `POST /v1/articles``cms:article:write`,创建草稿)
- `GET /v1/articles``cms:article:read`,分页/搜索/按状态/栏目/标签过滤)
- `GET /v1/articles/{id}``cms:article:read`
- `PATCH /v1/articles/{id}``cms:article:write`
- `POST /v1/articles/{id}/publish``cms:article:publish`,发布并生成版本)
- `POST /v1/articles/{id}/rollback``cms:article:rollback`,回滚到指定版本并生成新版本)
- `GET /v1/articles/{id}/versions``cms:article:read`,版本列表分页)

18
docs/TEST_REPORT.md Normal file
View File

@@ -0,0 +1,18 @@
# 测试报告CMS Service
## 测试范围
- iam-client 适配层:
- 权限校验结果缓存命中
- IAM 不可用时的 stale-cache 降级
## 测试用例
- `tests/iam_client_cache.rs`
- `iam_client_caches_decisions`
- `iam_client_uses_stale_cache_on_error`
## 执行结果
- `cargo test`:全部通过

1
migrations/0001_core.sql Normal file
View File

@@ -0,0 +1 @@
CREATE EXTENSION IF NOT EXISTS pgcrypto;

90
migrations/0002_cms.sql Normal file
View File

@@ -0,0 +1,90 @@
CREATE TYPE cms_tag_kind AS ENUM ('tag', 'category');
CREATE TYPE cms_article_status AS ENUM ('draft', 'published');
CREATE TABLE cms_columns (
tenant_id uuid NOT NULL,
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
name text NOT NULL,
slug text NOT NULL,
description text,
parent_id uuid,
sort_order int NOT NULL DEFAULT 0,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (tenant_id, slug)
);
CREATE INDEX cms_columns_tenant_parent_idx ON cms_columns (tenant_id, parent_id);
CREATE TABLE cms_tags (
tenant_id uuid NOT NULL,
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
kind cms_tag_kind NOT NULL,
name text NOT NULL,
slug text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (tenant_id, kind, slug)
);
CREATE INDEX cms_tags_tenant_kind_idx ON cms_tags (tenant_id, kind);
CREATE TABLE cms_media (
tenant_id uuid NOT NULL,
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
url text NOT NULL,
mime_type text,
size_bytes bigint,
width int,
height int,
created_at timestamptz NOT NULL DEFAULT now(),
created_by uuid
);
CREATE INDEX cms_media_tenant_created_at_idx ON cms_media (tenant_id, created_at DESC);
CREATE TABLE cms_articles (
tenant_id uuid NOT NULL,
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
column_id uuid,
title text NOT NULL,
slug text NOT NULL,
summary text,
content text NOT NULL DEFAULT '',
status cms_article_status NOT NULL DEFAULT 'draft',
current_version int NOT NULL DEFAULT 0,
published_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
created_by uuid,
updated_by uuid,
UNIQUE (tenant_id, slug)
);
CREATE INDEX cms_articles_tenant_status_updated_idx ON cms_articles (tenant_id, status, updated_at DESC);
CREATE INDEX cms_articles_tenant_column_idx ON cms_articles (tenant_id, column_id);
CREATE TABLE cms_article_tags (
tenant_id uuid NOT NULL,
article_id uuid NOT NULL REFERENCES cms_articles(id) ON DELETE CASCADE,
tag_id uuid NOT NULL REFERENCES cms_tags(id) ON DELETE CASCADE,
PRIMARY KEY (tenant_id, article_id, tag_id)
);
CREATE INDEX cms_article_tags_tenant_tag_idx ON cms_article_tags (tenant_id, tag_id);
CREATE TABLE cms_article_versions (
tenant_id uuid NOT NULL,
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
article_id uuid NOT NULL REFERENCES cms_articles(id) ON DELETE CASCADE,
version int NOT NULL,
title text NOT NULL,
summary text,
content text NOT NULL,
status cms_article_status NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
created_by uuid,
UNIQUE (tenant_id, article_id, version)
);
CREATE INDEX cms_article_versions_tenant_article_idx ON cms_article_versions (tenant_id, article_id, version DESC);

69
scripts/db/README.md Normal file
View File

@@ -0,0 +1,69 @@
# cms-service 数据库脚本migrate / verify / rollback
本目录提供一套与 `iam-service/scripts/db` 类似的数据库操作脚本,适用于:
- 生产/预发:在启动 cms-service 前,显式执行迁移与校验
- 开发/测试:快速初始化/回滚 schema
说明:
- cms-service 运行时也会自动执行 SQLx migrations`src/infrastructure/db/mod.rs`)。如果你选择使用本目录脚本管理迁移,建议在部署流程中做到“先 migrate再启动服务”并在同一数据库上保持一致的迁移源`cms-service/migrations/*.sql`)。
- 本脚本会写入 SQLx 使用的 `_sqlx_migrations` 表,使得服务启动时不会重复执行已应用的迁移。
## 前置条件
- 已安装 `psql`
- `DATABASE_URL` 可用(可通过导出环境变量或在项目根目录 `.env` 中配置)
- 校验/迁移 checksum 需要 `sha384sum``openssl` + `xxd`
## 常用命令
```bash
export DATABASE_URL='postgres://...'
# 1) 应用迁移(写入 _sqlx_migrations
./scripts/db/migrate.sh
# 2) 校验 schema包含迁移校验 + 结构校验)
./scripts/db/verify.sh
# 3) 回滚到指定版本(例如回滚到 0001
ROLLBACK_TO_VERSION=1 ./scripts/db/rollback.sh
# 4) 回滚所有迁移(仅回滚 cms-service 自己的对象;不会卸载 pgcrypto extension
ROLLBACK_TO_VERSION=0 ./scripts/db/rollback.sh
```
## 版本号规则
- 迁移文件目录:`cms-service/migrations/*.sql`
- 迁移版本号:取文件名前缀数字(例如 `0002_cms.sql` -> version=2
- 回滚脚本:`scripts/db/rollback/<version>.down.sql`(例如 `0002.down.sql`
- 校验脚本:`scripts/db/verify/<version>_*.sql`
## 故障排查
### 启动时报 `failed to run migrations: VersionMismatch(<n>)`
含义:
- 数据库 `_sqlx_migrations` 表里记录的 `<n>` 号迁移 checksum和当前仓库里对应 `migrations/<n>_*.sql` 的 checksum 不一致。
- 常见原因是:迁移文件被改动过、或曾使用非 SQLx 算法写入了 `_sqlx_migrations.checksum`
解决(开发环境推荐做法):
1) 停止 cms-service
2) 回滚到 0会删除 cms 表/类型,并清理 `_sqlx_migrations` 中的记录):
```bash
ROLLBACK_TO_VERSION=0 ./scripts/db/rollback.sh
```
3) 重新执行迁移与校验:
```bash
./scripts/db/migrate.sh
./scripts/db/verify.sh
```
然后再启动 cms-service。

95
scripts/db/migrate.sh Executable file
View File

@@ -0,0 +1,95 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT_DIR="$(cd "${SCRIPT_DIR}/../.." && pwd)"
MIGRATIONS_DIR="${ROOT_DIR}/migrations"
load_database_url_from_env_file() {
local env_file="$1"
local line value
while IFS= read -r line || [[ -n "${line}" ]]; do
line="${line#"${line%%[![:space:]]*}"}"
[[ -z "${line}" || "${line}" == \#* ]] && continue
line="${line#export }"
if [[ "${line}" == DATABASE_URL=* ]]; then
value="${line#DATABASE_URL=}"
value="${value%$'\r'}"
value="${value%\"}"
value="${value#\"}"
value="${value%\'}"
value="${value#\'}"
printf '%s' "${value}"
return 0
fi
done < "${env_file}"
return 1
}
DATABASE_URL="${DATABASE_URL:-}"
if [[ -z "${DATABASE_URL}" && -f "${ROOT_DIR}/.env" ]]; then
DATABASE_URL="$(load_database_url_from_env_file "${ROOT_DIR}/.env" || true)"
fi
if [[ -z "${DATABASE_URL}" ]]; then
echo "DATABASE_URL is required (export it, or set it in ${ROOT_DIR}/.env)"
exit 1
fi
if ! command -v psql >/dev/null 2>&1; then
echo "psql not found in PATH"
exit 127
fi
checksum_hex_of_file() {
local file="$1"
if command -v sha384sum >/dev/null 2>&1; then
sha384sum "${file}" | awk '{print $1}'
return 0
fi
if command -v openssl >/dev/null 2>&1; then
openssl dgst -sha384 -binary "${file}" | xxd -p -c 256
return 0
fi
echo "sha384sum or openssl is required to compute sqlx checksum" >&2
return 127
}
TARGET_VERSION="${TARGET_VERSION:-}"
DRY_RUN="${DRY_RUN:-0}"
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -c "CREATE TABLE IF NOT EXISTS _sqlx_migrations (version BIGINT PRIMARY KEY, description TEXT NOT NULL, installed_on TIMESTAMPTZ NOT NULL DEFAULT now(), success BOOLEAN NOT NULL, checksum BYTEA NOT NULL, execution_time BIGINT NOT NULL)"
shopt -s nullglob
migrations=( "${MIGRATIONS_DIR}/"*.sql )
if [[ ${#migrations[@]} -eq 0 ]]; then
echo "No migration files found: ${MIGRATIONS_DIR}/*.sql"
exit 1
fi
for file in "${migrations[@]}"; do
base="$(basename "${file}")"
version_str="${base%%_*}"
version_num="$((10#${version_str}))"
description="${base#*_}"
description="${description%.sql}"
if [[ -n "${TARGET_VERSION}" && "${version_num}" -gt "${TARGET_VERSION}" ]]; then
continue
fi
applied="$(psql "${DATABASE_URL}" -At -c "SELECT 1 FROM _sqlx_migrations WHERE version=${version_num} AND success=true LIMIT 1" || true)"
if [[ "${applied}" == "1" ]]; then
continue
fi
echo "Applying ${version_str} (${base})"
if [[ "${DRY_RUN}" == "1" ]]; then
continue
fi
checksum="$(checksum_hex_of_file "${file}")"
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -f "${file}"
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -c "INSERT INTO _sqlx_migrations(version, description, success, checksum, execution_time) VALUES (${version_num}, '${description}', true, decode('${checksum}','hex'), 0) ON CONFLICT (version) DO NOTHING"
done
echo "Migrations completed"

8
scripts/db/rebuild_cms_db.sh Executable file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
"${SCRIPT_DIR}/reset.sh"
"${SCRIPT_DIR}/migrate.sh"
"${SCRIPT_DIR}/verify.sh"

6
scripts/db/reset.sh Executable file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROLLBACK_TO_VERSION=0 "${SCRIPT_DIR}/rollback.sh"

79
scripts/db/rollback.sh Executable file
View File

@@ -0,0 +1,79 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT_DIR="$(cd "${SCRIPT_DIR}/../.." && pwd)"
ROLLBACK_DIR="${SCRIPT_DIR}/rollback"
load_database_url_from_env_file() {
local env_file="$1"
local line value
while IFS= read -r line || [[ -n "${line}" ]]; do
line="${line#"${line%%[![:space:]]*}"}"
[[ -z "${line}" || "${line}" == \#* ]] && continue
line="${line#export }"
if [[ "${line}" == DATABASE_URL=* ]]; then
value="${line#DATABASE_URL=}"
value="${value%$'\r'}"
value="${value%\"}"
value="${value#\"}"
value="${value%\'}"
value="${value#\'}"
printf '%s' "${value}"
return 0
fi
done < "${env_file}"
return 1
}
DATABASE_URL="${DATABASE_URL:-}"
if [[ -z "${DATABASE_URL}" && -f "${ROOT_DIR}/.env" ]]; then
DATABASE_URL="$(load_database_url_from_env_file "${ROOT_DIR}/.env" || true)"
fi
if [[ -z "${DATABASE_URL}" ]]; then
echo "DATABASE_URL is required (export it, or set it in ${ROOT_DIR}/.env)"
exit 1
fi
if ! command -v psql >/dev/null 2>&1; then
echo "psql not found in PATH"
exit 127
fi
ROLLBACK_TO_VERSION="${ROLLBACK_TO_VERSION:-}"
DRY_RUN="${DRY_RUN:-0}"
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -c "CREATE TABLE IF NOT EXISTS _sqlx_migrations (version BIGINT PRIMARY KEY, description TEXT NOT NULL, installed_on TIMESTAMPTZ NOT NULL DEFAULT now(), success BOOLEAN NOT NULL, checksum BYTEA NOT NULL, execution_time BIGINT NOT NULL)" >/dev/null
if [[ -z "${ROLLBACK_TO_VERSION}" ]]; then
echo "ROLLBACK_TO_VERSION is required (e.g. 1 or 0)"
exit 1
fi
while true; do
current="$(psql "${DATABASE_URL}" -At -c "SELECT version FROM _sqlx_migrations WHERE success=true ORDER BY version DESC LIMIT 1" || true)"
if [[ -z "${current}" ]]; then
echo "No applied migrations found"
exit 0
fi
if [[ "${current}" -le "${ROLLBACK_TO_VERSION}" ]]; then
echo "Rollback completed (current=${current}, target=${ROLLBACK_TO_VERSION})"
exit 0
fi
file="${ROLLBACK_DIR}/$(printf "%04d" "${current}").down.sql"
if [[ ! -f "${file}" ]]; then
echo "Rollback file not found for version ${current}: ${file}"
exit 1
fi
echo "Rolling back version ${current} using $(basename "${file}")"
if [[ "${DRY_RUN}" == "1" ]]; then
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -c "SELECT ${current}" >/dev/null
else
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -f "${file}"
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -c "DELETE FROM _sqlx_migrations WHERE version=${current}"
fi
done

View File

@@ -0,0 +1,7 @@
BEGIN;
-- cms-service 0001 仅创建 pgcrypto extension。为避免影响同库其他服务这里不卸载 extension。
SELECT 1;
COMMIT;

View File

@@ -0,0 +1,14 @@
BEGIN;
DROP TABLE IF EXISTS cms_article_versions;
DROP TABLE IF EXISTS cms_article_tags;
DROP TABLE IF EXISTS cms_articles;
DROP TABLE IF EXISTS cms_media;
DROP TABLE IF EXISTS cms_tags;
DROP TABLE IF EXISTS cms_columns;
DROP TYPE IF EXISTS cms_article_status;
DROP TYPE IF EXISTS cms_tag_kind;
COMMIT;

91
scripts/db/verify.sh Executable file
View File

@@ -0,0 +1,91 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT_DIR="$(cd "${SCRIPT_DIR}/../.." && pwd)"
MIGRATIONS_DIR="${ROOT_DIR}/migrations"
VERIFY_DIR="${SCRIPT_DIR}/verify"
load_database_url_from_env_file() {
local env_file="$1"
local line value
while IFS= read -r line || [[ -n "${line}" ]]; do
line="${line#"${line%%[![:space:]]*}"}"
[[ -z "${line}" || "${line}" == \#* ]] && continue
line="${line#export }"
if [[ "${line}" == DATABASE_URL=* ]]; then
value="${line#DATABASE_URL=}"
value="${value%$'\r'}"
value="${value%\"}"
value="${value#\"}"
value="${value%\'}"
value="${value#\'}"
printf '%s' "${value}"
return 0
fi
done < "${env_file}"
return 1
}
DATABASE_URL="${DATABASE_URL:-}"
if [[ -z "${DATABASE_URL}" && -f "${ROOT_DIR}/.env" ]]; then
DATABASE_URL="$(load_database_url_from_env_file "${ROOT_DIR}/.env" || true)"
fi
if [[ -z "${DATABASE_URL}" ]]; then
echo "DATABASE_URL is required (export it, or set it in ${ROOT_DIR}/.env)"
exit 1
fi
if ! command -v psql >/dev/null 2>&1; then
echo "psql not found in PATH"
exit 127
fi
checksum_hex_of_file() {
local file="$1"
if command -v sha384sum >/dev/null 2>&1; then
sha384sum "${file}" | awk '{print $1}'
return 0
fi
if command -v openssl >/dev/null 2>&1; then
openssl dgst -sha384 -binary "${file}" | xxd -p -c 256
return 0
fi
echo "sha384sum or openssl is required to compute sqlx checksum" >&2
return 127
}
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -c "CREATE TABLE IF NOT EXISTS _sqlx_migrations (version BIGINT PRIMARY KEY, description TEXT NOT NULL, installed_on TIMESTAMPTZ NOT NULL DEFAULT now(), success BOOLEAN NOT NULL, checksum BYTEA NOT NULL, execution_time BIGINT NOT NULL)"
echo "Checking migration checksums in _sqlx_migrations..."
shopt -s nullglob
migrations=( "${MIGRATIONS_DIR}/"*.sql )
for file in "${migrations[@]}"; do
base="$(basename "${file}")"
version_str="${base%%_*}"
version_num="$((10#${version_str}))"
expected_checksum="$(checksum_hex_of_file "${file}")"
actual_checksum="$(psql "${DATABASE_URL}" -At -c "SELECT encode(checksum,'hex') FROM _sqlx_migrations WHERE version=${version_num} AND success=true LIMIT 1" || true)"
if [[ -n "${actual_checksum}" && "${actual_checksum}" != "${expected_checksum}" ]]; then
echo "Checksum mismatch: version=${version_str} expected=${expected_checksum} actual=${actual_checksum}"
exit 2
fi
done
echo "Running schema verify scripts..."
verify_files=( "${VERIFY_DIR}/"*.sql )
if [[ ${#verify_files[@]} -eq 0 ]]; then
echo "No verify files found: ${VERIFY_DIR}/*.sql"
exit 1
fi
for file in "${verify_files[@]}"; do
base="$(basename "${file}")"
echo "Verifying ${base}"
psql "${DATABASE_URL}" -v ON_ERROR_STOP=1 -f "${file}" >/dev/null
done
echo "Verify completed"

View File

@@ -0,0 +1,7 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgcrypto') THEN
RAISE EXCEPTION 'missing extension: pgcrypto';
END IF;
END $$;

View File

@@ -0,0 +1,29 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'cms_tag_kind') THEN
RAISE EXCEPTION 'missing type: cms_tag_kind';
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'cms_article_status') THEN
RAISE EXCEPTION 'missing type: cms_article_status';
END IF;
IF to_regclass('public.cms_columns') IS NULL THEN
RAISE EXCEPTION 'missing table: cms_columns';
END IF;
IF to_regclass('public.cms_tags') IS NULL THEN
RAISE EXCEPTION 'missing table: cms_tags';
END IF;
IF to_regclass('public.cms_media') IS NULL THEN
RAISE EXCEPTION 'missing table: cms_media';
END IF;
IF to_regclass('public.cms_articles') IS NULL THEN
RAISE EXCEPTION 'missing table: cms_articles';
END IF;
IF to_regclass('public.cms_article_tags') IS NULL THEN
RAISE EXCEPTION 'missing table: cms_article_tags';
END IF;
IF to_regclass('public.cms_article_versions') IS NULL THEN
RAISE EXCEPTION 'missing table: cms_article_versions';
END IF;
END $$;

81
src/api/docs.rs Normal file
View File

@@ -0,0 +1,81 @@
use utoipa::openapi::security::{HttpAuthScheme, HttpBuilder, SecurityScheme};
use utoipa::{Modify, OpenApi};
struct SecurityAddon;
impl Modify for SecurityAddon {
fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
let components = openapi
.components
.get_or_insert_with(utoipa::openapi::Components::new);
components.add_security_scheme(
"bearer_auth",
SecurityScheme::Http(
HttpBuilder::new()
.scheme(HttpAuthScheme::Bearer)
.bearer_format("JWT")
.build(),
),
);
}
}
#[derive(OpenApi)]
#[openapi(
modifiers(&SecurityAddon),
info(
title = "CMS Service API",
version = "0.1.0",
description = include_str!("../../docs/API.md")
),
paths(
crate::api::handlers::column::create_column_handler,
crate::api::handlers::column::list_columns_handler,
crate::api::handlers::column::get_column_handler,
crate::api::handlers::column::update_column_handler,
crate::api::handlers::column::delete_column_handler,
crate::api::handlers::tag::create_tag_handler,
crate::api::handlers::tag::list_tags_handler,
crate::api::handlers::tag::get_tag_handler,
crate::api::handlers::tag::update_tag_handler,
crate::api::handlers::tag::delete_tag_handler,
crate::api::handlers::media::create_media_handler,
crate::api::handlers::media::list_media_handler,
crate::api::handlers::media::get_media_handler,
crate::api::handlers::media::delete_media_handler,
crate::api::handlers::article::create_article_handler,
crate::api::handlers::article::list_articles_handler,
crate::api::handlers::article::get_article_handler,
crate::api::handlers::article::update_article_handler,
crate::api::handlers::article::publish_article_handler,
crate::api::handlers::article::rollback_article_handler,
crate::api::handlers::article::list_versions_handler
),
components(
schemas(
crate::api::handlers::column::CreateColumnRequest,
crate::api::handlers::column::UpdateColumnRequest,
crate::api::handlers::tag::CreateTagRequest,
crate::api::handlers::tag::UpdateTagRequest,
crate::api::handlers::media::CreateMediaRequest,
crate::api::handlers::article::CreateArticleRequest,
crate::api::handlers::article::UpdateArticleRequest,
crate::api::handlers::article::RollbackRequest,
crate::domain::models::Column,
crate::domain::models::Tag,
crate::domain::models::Media,
crate::domain::models::Article,
crate::domain::models::ArticleVersion,
crate::infrastructure::repositories::article::ArticleWithTags
)
),
tags(
(name = "System", description = "系统:健康检查/文档"),
(name = "Column", description = "栏目管理"),
(name = "Article", description = "文章管理"),
(name = "Media", description = "媒体库"),
(name = "Tag", description = "标签与分类"),
(name = "Version", description = "版本与回滚")
)
)]
pub struct ApiDoc;

334
src/api/handlers/article.rs Normal file
View File

@@ -0,0 +1,334 @@
use axum::{
Json, Router,
extract::{Path, Query, State},
routing::{get, post},
};
use common_telemetry::{AppError, AppResponse};
use utoipa::IntoParams;
use uuid::Uuid;
use crate::api::{AppState, handlers::common::extract_bearer_token};
use auth_kit::middleware::{tenant::TenantId, auth::AuthContext};
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct CreateArticleRequest {
pub column_id: Option<Uuid>,
pub title: String,
pub slug: String,
pub summary: Option<String>,
pub content: String,
pub tag_ids: Option<Vec<Uuid>>,
}
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct UpdateArticleRequest {
pub column_id: Option<Option<Uuid>>,
pub title: Option<String>,
pub slug: Option<String>,
pub summary: Option<Option<String>>,
pub content: Option<String>,
pub tag_ids: Option<Vec<Uuid>>,
}
#[derive(Debug, serde::Deserialize, IntoParams)]
pub struct ListArticlesQuery {
pub page: Option<u32>,
pub page_size: Option<u32>,
pub q: Option<String>,
pub status: Option<String>,
pub column_id: Option<Uuid>,
pub tag_id: Option<Uuid>,
}
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct RollbackRequest {
pub to_version: i32,
}
#[derive(Debug, serde::Deserialize, IntoParams)]
pub struct ListVersionsQuery {
pub page: Option<u32>,
pub page_size: Option<u32>,
}
pub fn router() -> Router<AppState> {
Router::new()
.route("/", post(create_article_handler).get(list_articles_handler))
.route(
"/{id}",
get(get_article_handler).patch(update_article_handler),
)
.route("/{id}/publish", post(publish_article_handler))
.route("/{id}/rollback", post(rollback_article_handler))
.route("/{id}/versions", get(list_versions_handler))
}
#[utoipa::path(
post,
path = "/v1/articles",
tag = "Article",
request_body = CreateArticleRequest,
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "创建文章(草稿)", body = crate::infrastructure::repositories::article::ArticleWithTags)
)
)]
pub async fn create_article_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(body): Json<CreateArticleRequest>,
) -> Result<AppResponse<crate::infrastructure::repositories::article::ArticleWithTags>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:article:write", &token)
.await?;
let article = state
.services
.create_article(
tenant_id,
body.column_id,
body.title,
body.slug,
body.summary,
body.content,
body.tag_ids.unwrap_or_default(),
Some(user_id),
)
.await?;
Ok(AppResponse::ok(article))
}
#[utoipa::path(
get,
path = "/v1/articles",
tag = "Article",
params(ListArticlesQuery),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "文章列表/搜索", body = crate::infrastructure::repositories::column::Paged<crate::domain::models::Article>)
)
)]
pub async fn list_articles_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(query): Query<ListArticlesQuery>,
) -> Result<AppResponse<crate::infrastructure::repositories::column::Paged<crate::domain::models::Article>>, AppError>
{
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:article:read", &token)
.await?;
let result = state
.services
.list_articles(
tenant_id,
crate::infrastructure::repositories::article::ListArticlesQuery {
page: query.page.unwrap_or(1),
page_size: query.page_size.unwrap_or(20),
q: query.q,
status: query.status,
column_id: query.column_id,
tag_id: query.tag_id,
},
)
.await?;
Ok(AppResponse::ok(result))
}
#[utoipa::path(
get,
path = "/v1/articles/{id}",
tag = "Article",
params(
("id" = String, Path, description = "文章ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "文章详情", body = crate::infrastructure::repositories::article::ArticleWithTags)
)
)]
pub async fn get_article_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
) -> Result<AppResponse<crate::infrastructure::repositories::article::ArticleWithTags>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:article:read", &token)
.await?;
let article = state.services.get_article(tenant_id, id).await?;
Ok(AppResponse::ok(article))
}
#[utoipa::path(
patch,
path = "/v1/articles/{id}",
tag = "Article",
request_body = UpdateArticleRequest,
params(
("id" = String, Path, description = "文章ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "更新文章", body = crate::infrastructure::repositories::article::ArticleWithTags)
)
)]
pub async fn update_article_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
Json(body): Json<UpdateArticleRequest>,
) -> Result<AppResponse<crate::infrastructure::repositories::article::ArticleWithTags>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:article:write", &token)
.await?;
let article = state
.services
.update_article(
tenant_id,
id,
body.column_id,
body.title,
body.slug,
body.summary,
body.content,
body.tag_ids,
Some(user_id),
)
.await?;
Ok(AppResponse::ok(article))
}
#[utoipa::path(
post,
path = "/v1/articles/{id}/publish",
tag = "Article",
params(
("id" = String, Path, description = "文章ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "发布文章", body = crate::domain::models::Article)
)
)]
pub async fn publish_article_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
) -> Result<AppResponse<crate::domain::models::Article>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:article:publish", &token)
.await?;
let article = state.services.publish_article(tenant_id, id, Some(user_id)).await?;
Ok(AppResponse::ok(article))
}
#[utoipa::path(
post,
path = "/v1/articles/{id}/rollback",
tag = "Version",
request_body = RollbackRequest,
params(
("id" = String, Path, description = "文章ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "回滚到指定版本并生成新版本", body = crate::domain::models::Article)
)
)]
pub async fn rollback_article_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
Json(body): Json<RollbackRequest>,
) -> Result<AppResponse<crate::domain::models::Article>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:article:rollback", &token)
.await?;
let article = state
.services
.rollback_article(tenant_id, id, body.to_version, Some(user_id))
.await?;
Ok(AppResponse::ok(article))
}
#[utoipa::path(
get,
path = "/v1/articles/{id}/versions",
tag = "Version",
params(
("id" = String, Path, description = "文章ID"),
ListVersionsQuery
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "版本列表", body = crate::infrastructure::repositories::column::Paged<crate::domain::models::ArticleVersion>)
)
)]
pub async fn list_versions_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
Query(query): Query<ListVersionsQuery>,
) -> Result<AppResponse<crate::infrastructure::repositories::column::Paged<crate::domain::models::ArticleVersion>>, AppError>
{
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:article:read", &token)
.await?;
let versions = state
.services
.list_versions(
tenant_id,
id,
query.page.unwrap_or(1),
query.page_size.unwrap_or(20),
)
.await?;
Ok(AppResponse::ok(versions))
}

247
src/api/handlers/column.rs Normal file
View File

@@ -0,0 +1,247 @@
use axum::{
Json, Router,
extract::{Path, Query, State},
routing::{get, post},
};
use common_telemetry::{AppError, AppResponse};
use utoipa::IntoParams;
use uuid::Uuid;
use crate::api::{AppState, handlers::common::extract_bearer_token};
use auth_kit::middleware::{tenant::TenantId, auth::AuthContext};
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct CreateColumnRequest {
pub name: String,
pub slug: String,
pub description: Option<String>,
pub parent_id: Option<Uuid>,
pub sort_order: Option<i32>,
}
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct UpdateColumnRequest {
pub name: Option<String>,
pub slug: Option<String>,
pub description: Option<Option<String>>,
pub parent_id: Option<Option<Uuid>>,
pub sort_order: Option<i32>,
}
#[derive(Debug, serde::Deserialize, IntoParams)]
pub struct ListColumnsQuery {
pub page: Option<u32>,
pub page_size: Option<u32>,
pub search: Option<String>,
pub parent_id: Option<Uuid>,
}
pub fn router() -> Router<AppState> {
Router::new()
.route("/", post(create_column_handler).get(list_columns_handler))
.route(
"/{id}",
get(get_column_handler)
.patch(update_column_handler)
.delete(delete_column_handler),
)
}
#[utoipa::path(
post,
path = "/v1/columns",
tag = "Column",
request_body = CreateColumnRequest,
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "创建栏目", body = crate::domain::models::Column),
(status = 401, description = "未认证"),
(status = 403, description = "无权限")
)
)]
pub async fn create_column_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(body): Json<CreateColumnRequest>,
) -> Result<AppResponse<crate::domain::models::Column>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:column:write", &token)
.await?;
let column = state
.services
.create_column(
tenant_id,
body.name,
body.slug,
body.description,
body.parent_id,
body.sort_order.unwrap_or(0),
)
.await?;
Ok(AppResponse::ok(column))
}
#[utoipa::path(
get,
path = "/v1/columns",
tag = "Column",
params(ListColumnsQuery),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "栏目列表", body = crate::infrastructure::repositories::column::Paged<crate::domain::models::Column>),
(status = 401, description = "未认证"),
(status = 403, description = "无权限")
)
)]
pub async fn list_columns_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(query): Query<ListColumnsQuery>,
) -> Result<AppResponse<crate::infrastructure::repositories::column::Paged<crate::domain::models::Column>>, AppError>
{
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:column:read", &token)
.await?;
let result = state
.services
.list_columns(
tenant_id,
crate::infrastructure::repositories::column::ListColumnsQuery {
page: query.page.unwrap_or(1),
page_size: query.page_size.unwrap_or(20),
search: query.search,
parent_id: query.parent_id,
},
)
.await?;
Ok(AppResponse::ok(result))
}
#[utoipa::path(
get,
path = "/v1/columns/{id}",
tag = "Column",
params(
("id" = String, Path, description = "栏目ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "栏目详情", body = crate::domain::models::Column),
(status = 401, description = "未认证"),
(status = 403, description = "无权限"),
(status = 404, description = "不存在")
)
)]
pub async fn get_column_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
) -> Result<AppResponse<crate::domain::models::Column>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:column:read", &token)
.await?;
let column = state.services.get_column(tenant_id, id).await?;
Ok(AppResponse::ok(column))
}
#[utoipa::path(
patch,
path = "/v1/columns/{id}",
tag = "Column",
request_body = UpdateColumnRequest,
params(
("id" = String, Path, description = "栏目ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "更新栏目", body = crate::domain::models::Column),
(status = 401, description = "未认证"),
(status = 403, description = "无权限"),
(status = 404, description = "不存在")
)
)]
pub async fn update_column_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
Json(body): Json<UpdateColumnRequest>,
) -> Result<AppResponse<crate::domain::models::Column>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:column:write", &token)
.await?;
let column = state
.services
.update_column(
tenant_id,
id,
body.name,
body.slug,
body.description,
body.parent_id,
body.sort_order,
)
.await?;
Ok(AppResponse::ok(column))
}
#[utoipa::path(
delete,
path = "/v1/columns/{id}",
tag = "Column",
params(
("id" = String, Path, description = "栏目ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "删除成功"),
(status = 401, description = "未认证"),
(status = 403, description = "无权限"),
(status = 404, description = "不存在")
)
)]
pub async fn delete_column_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
) -> Result<AppResponse<serde_json::Value>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:column:write", &token)
.await?;
state.services.delete_column(tenant_id, id).await?;
Ok(AppResponse::ok(serde_json::json!({"deleted": true})))
}

View File

@@ -0,0 +1,11 @@
use axum::http::HeaderMap;
use common_telemetry::AppError;
pub fn extract_bearer_token(headers: &HeaderMap) -> Result<String, AppError> {
let token = headers
.get(axum::http::header::AUTHORIZATION)
.and_then(|h| h.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.ok_or(AppError::MissingAuthHeader)?;
Ok(token.to_string())
}

175
src/api/handlers/media.rs Normal file
View File

@@ -0,0 +1,175 @@
use axum::{
Json, Router,
extract::{Path, Query, State},
routing::{get, post},
};
use common_telemetry::{AppError, AppResponse};
use utoipa::IntoParams;
use uuid::Uuid;
use crate::api::{AppState, handlers::common::extract_bearer_token};
use auth_kit::middleware::{tenant::TenantId, auth::AuthContext};
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct CreateMediaRequest {
pub url: String,
pub mime_type: Option<String>,
pub size_bytes: Option<i64>,
pub width: Option<i32>,
pub height: Option<i32>,
}
#[derive(Debug, serde::Deserialize, IntoParams)]
pub struct ListMediaQuery {
pub page: Option<u32>,
pub page_size: Option<u32>,
pub search: Option<String>,
}
pub fn router() -> Router<AppState> {
Router::new()
.route("/", post(create_media_handler).get(list_media_handler))
.route("/{id}", get(get_media_handler).delete(delete_media_handler))
}
#[utoipa::path(
post,
path = "/v1/media",
tag = "Media",
request_body = CreateMediaRequest,
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "创建媒体记录", body = crate::domain::models::Media)
)
)]
pub async fn create_media_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(body): Json<CreateMediaRequest>,
) -> Result<AppResponse<crate::domain::models::Media>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:media:manage", &token)
.await?;
let media = state
.services
.create_media(
tenant_id,
body.url,
body.mime_type,
body.size_bytes,
body.width,
body.height,
Some(user_id),
)
.await?;
Ok(AppResponse::ok(media))
}
#[utoipa::path(
get,
path = "/v1/media",
tag = "Media",
params(ListMediaQuery),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "媒体列表", body = crate::infrastructure::repositories::column::Paged<crate::domain::models::Media>)
)
)]
pub async fn list_media_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(query): Query<ListMediaQuery>,
) -> Result<AppResponse<crate::infrastructure::repositories::column::Paged<crate::domain::models::Media>>, AppError>
{
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:media:read", &token)
.await?;
let result = state
.services
.list_media(
tenant_id,
crate::infrastructure::repositories::media::ListMediaQuery {
page: query.page.unwrap_or(1),
page_size: query.page_size.unwrap_or(20),
search: query.search,
},
)
.await?;
Ok(AppResponse::ok(result))
}
#[utoipa::path(
get,
path = "/v1/media/{id}",
tag = "Media",
params(
("id" = String, Path, description = "媒体ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "媒体详情", body = crate::domain::models::Media)
)
)]
pub async fn get_media_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
) -> Result<AppResponse<crate::domain::models::Media>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:media:read", &token)
.await?;
let media = state.services.get_media(tenant_id, id).await?;
Ok(AppResponse::ok(media))
}
#[utoipa::path(
delete,
path = "/v1/media/{id}",
tag = "Media",
params(
("id" = String, Path, description = "媒体ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "删除成功")
)
)]
pub async fn delete_media_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
) -> Result<AppResponse<serde_json::Value>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:media:manage", &token)
.await?;
state.services.delete_media(tenant_id, id).await?;
Ok(AppResponse::ok(serde_json::json!({"deleted": true})))
}

5
src/api/handlers/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
pub mod article;
pub mod column;
pub mod common;
pub mod media;
pub mod tag;

214
src/api/handlers/tag.rs Normal file
View File

@@ -0,0 +1,214 @@
use axum::{
Json, Router,
extract::{Path, Query, State},
routing::{get, post},
};
use common_telemetry::{AppError, AppResponse};
use utoipa::IntoParams;
use uuid::Uuid;
use crate::api::{AppState, handlers::common::extract_bearer_token};
use auth_kit::middleware::{tenant::TenantId, auth::AuthContext};
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct CreateTagRequest {
pub kind: String,
pub name: String,
pub slug: String,
}
#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
pub struct UpdateTagRequest {
pub name: Option<String>,
pub slug: Option<String>,
}
#[derive(Debug, serde::Deserialize, IntoParams)]
pub struct ListTagsQuery {
pub page: Option<u32>,
pub page_size: Option<u32>,
pub search: Option<String>,
pub kind: Option<String>,
}
pub fn router() -> Router<AppState> {
Router::new()
.route("/", post(create_tag_handler).get(list_tags_handler))
.route(
"/{id}",
get(get_tag_handler)
.patch(update_tag_handler)
.delete(delete_tag_handler),
)
}
#[utoipa::path(
post,
path = "/v1/tags",
tag = "Tag",
request_body = CreateTagRequest,
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "创建标签/分类", body = crate::domain::models::Tag)
)
)]
pub async fn create_tag_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(body): Json<CreateTagRequest>,
) -> Result<AppResponse<crate::domain::models::Tag>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:tag:write", &token)
.await?;
let tag = state
.services
.create_tag(tenant_id, body.kind, body.name, body.slug)
.await?;
Ok(AppResponse::ok(tag))
}
#[utoipa::path(
get,
path = "/v1/tags",
tag = "Tag",
params(ListTagsQuery),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "标签/分类列表", body = crate::infrastructure::repositories::column::Paged<crate::domain::models::Tag>)
)
)]
pub async fn list_tags_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(query): Query<ListTagsQuery>,
) -> Result<AppResponse<crate::infrastructure::repositories::column::Paged<crate::domain::models::Tag>>, AppError>
{
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:tag:read", &token)
.await?;
let result = state
.services
.list_tags(
tenant_id,
crate::infrastructure::repositories::tag::ListTagsQuery {
page: query.page.unwrap_or(1),
page_size: query.page_size.unwrap_or(20),
search: query.search,
kind: query.kind,
},
)
.await?;
Ok(AppResponse::ok(result))
}
#[utoipa::path(
get,
path = "/v1/tags/{id}",
tag = "Tag",
params(
("id" = String, Path, description = "标签/分类ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "详情", body = crate::domain::models::Tag)
)
)]
pub async fn get_tag_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
) -> Result<AppResponse<crate::domain::models::Tag>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:tag:read", &token)
.await?;
let tag = state.services.get_tag(tenant_id, id).await?;
Ok(AppResponse::ok(tag))
}
#[utoipa::path(
patch,
path = "/v1/tags/{id}",
tag = "Tag",
request_body = UpdateTagRequest,
params(
("id" = String, Path, description = "标签/分类ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "更新", body = crate::domain::models::Tag)
)
)]
pub async fn update_tag_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
Json(body): Json<UpdateTagRequest>,
) -> Result<AppResponse<crate::domain::models::Tag>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:tag:write", &token)
.await?;
let tag = state
.services
.update_tag(tenant_id, id, body.name, body.slug)
.await?;
Ok(AppResponse::ok(tag))
}
#[utoipa::path(
delete,
path = "/v1/tags/{id}",
tag = "Tag",
params(
("id" = String, Path, description = "标签/分类ID")
),
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "删除成功")
)
)]
pub async fn delete_tag_handler(
TenantId(tenant_id): TenantId,
AuthContext { user_id, .. }: AuthContext,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<Uuid>,
) -> Result<AppResponse<serde_json::Value>, AppError> {
let token = extract_bearer_token(&headers)?;
state
.iam_client
.require_permission(tenant_id, user_id, "cms:tag:write", &token)
.await?;
state.services.delete_tag(tenant_id, id).await?;
Ok(AppResponse::ok(serde_json::json!({"deleted": true})))
}

128
src/api/middleware/mod.rs Normal file
View File

@@ -0,0 +1,128 @@
use axum::{
extract::{MatchedPath, Request},
middleware::Next,
response::{IntoResponse, Response},
};
use common_telemetry::AppError;
use futures_util::FutureExt;
use http::HeaderValue;
use std::{panic::AssertUnwindSafe, time::Instant};
pub async fn ensure_request_id(mut req: Request, next: Next) -> Response {
let request_id = req
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
if let Ok(v) = HeaderValue::from_str(&request_id) {
req.headers_mut().insert("x-request-id", v);
}
let mut resp = next.run(req).await;
if let Ok(v) = HeaderValue::from_str(&request_id) {
resp.headers_mut().insert("x-request-id", v);
}
resp
}
pub async fn request_logger(req: Request, next: Next) -> Response {
let started = Instant::now();
let method = req.method().to_string();
let path = req.uri().path().to_string();
let request_id = req
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();
let action = req
.extensions()
.get::<MatchedPath>()
.map(|m| format!("{} {}", method, m.as_str()))
.unwrap_or_else(|| format!("{} {}", method, path));
let tenant_id = req
.extensions()
.get::<auth_kit::middleware::tenant::TenantId>()
.map(|t| t.0.to_string())
.unwrap_or_else(|| "unknown".to_string());
let user_id = req
.extensions()
.get::<auth_kit::middleware::auth::AuthContext>()
.map(|c| c.user_id.to_string())
.unwrap_or_else(|| "unknown".to_string());
let resp = next.run(req).await;
let latency_ms = started.elapsed().as_millis() as u64;
let status = resp.status().as_u16();
let error_code = match status {
200..=399 => "ok",
400 => "bad_request",
401 => "unauthorized",
403 => "permission_denied",
404 => "not_found",
409 => "conflict",
429 => "rate_limited",
500..=599 => "server_error",
_ => "unknown",
};
tracing::info!(
trace_id = %request_id,
tenant_id = %tenant_id,
user_id = %user_id,
action = %action,
latency_ms = latency_ms,
error_code = %error_code,
status = status
);
resp
}
pub async fn catch_panic(req: Request, next: Next) -> Response {
let request_id = req
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();
let method = req.method().to_string();
let path = req.uri().path().to_string();
let action = req
.extensions()
.get::<MatchedPath>()
.map(|m| format!("{} {}", method, m.as_str()))
.unwrap_or_else(|| format!("{} {}", method, path));
let tenant_id = req
.extensions()
.get::<auth_kit::middleware::tenant::TenantId>()
.map(|t| t.0.to_string())
.unwrap_or_else(|| "unknown".to_string());
let user_id = req
.extensions()
.get::<auth_kit::middleware::auth::AuthContext>()
.map(|c| c.user_id.to_string())
.unwrap_or_else(|| "unknown".to_string());
let result = AssertUnwindSafe(next.run(req)).catch_unwind().await;
match result {
Ok(resp) => resp,
Err(_) => {
tracing::error!(
trace_id = %request_id,
tenant_id = %tenant_id,
user_id = %user_id,
action = %action,
latency_ms = 0_u64,
error_code = "panic"
);
AppError::AnyhowError(anyhow::anyhow!("panic")).into_response()
}
}
}

40
src/api/mod.rs Normal file
View File

@@ -0,0 +1,40 @@
pub mod docs;
pub mod handlers;
pub mod middleware;
use axum::routing::get;
use axum::Router;
use utoipa::OpenApi;
use utoipa_scalar::{Scalar, Servable};
use crate::api::docs::ApiDoc;
use crate::api::middleware::{catch_panic, request_logger};
use crate::application::services::CmsServices;
use crate::infrastructure::iam_client::IamClient;
#[derive(Clone)]
pub struct AppState {
pub services: CmsServices,
pub iam_client: IamClient,
}
pub fn build_router(state: AppState) -> Router {
let health = Router::new().route("/healthz", get(|| async { axum::http::StatusCode::OK }));
let v1 = Router::new()
.nest("/columns", handlers::column::router())
.nest("/tags", handlers::tag::router())
.nest("/media", handlers::media::router())
.nest("/articles", handlers::article::router());
let app = Router::new()
.route("/favicon.ico", get(|| async { axum::http::StatusCode::NO_CONTENT }))
.merge(Scalar::with_url("/scalar", ApiDoc::openapi()))
.merge(health)
.nest("/v1", v1)
.layer(axum::middleware::from_fn(catch_panic))
.layer(axum::middleware::from_fn(request_logger))
.with_state(state);
app
}

1
src/application/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod services;

View File

@@ -0,0 +1,250 @@
use crate::domain::models::{Article, ArticleVersion, Column, Media, Tag};
use crate::infrastructure::repositories;
use common_telemetry::AppError;
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Clone)]
pub struct CmsServices {
pool: PgPool,
}
impl CmsServices {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
pub async fn create_column(
&self,
tenant_id: Uuid,
name: String,
slug: String,
description: Option<String>,
parent_id: Option<Uuid>,
sort_order: i32,
) -> Result<Column, AppError> {
repositories::column::create_column(
&self.pool,
tenant_id,
name,
slug,
description,
parent_id,
sort_order,
)
.await
}
pub async fn list_columns(
&self,
tenant_id: Uuid,
q: repositories::column::ListColumnsQuery,
) -> Result<repositories::column::Paged<Column>, AppError> {
repositories::column::list_columns(&self.pool, tenant_id, q).await
}
pub async fn get_column(&self, tenant_id: Uuid, id: Uuid) -> Result<Column, AppError> {
repositories::column::get_column(&self.pool, tenant_id, id).await
}
pub async fn update_column(
&self,
tenant_id: Uuid,
id: Uuid,
name: Option<String>,
slug: Option<String>,
description: Option<Option<String>>,
parent_id: Option<Option<Uuid>>,
sort_order: Option<i32>,
) -> Result<Column, AppError> {
repositories::column::update_column(
&self.pool,
tenant_id,
id,
name,
slug,
description,
parent_id,
sort_order,
)
.await
}
pub async fn delete_column(&self, tenant_id: Uuid, id: Uuid) -> Result<(), AppError> {
repositories::column::delete_column(&self.pool, tenant_id, id).await
}
pub async fn create_tag(
&self,
tenant_id: Uuid,
kind: String,
name: String,
slug: String,
) -> Result<Tag, AppError> {
repositories::tag::create_tag(&self.pool, tenant_id, kind, name, slug).await
}
pub async fn list_tags(
&self,
tenant_id: Uuid,
q: repositories::tag::ListTagsQuery,
) -> Result<repositories::column::Paged<Tag>, AppError> {
repositories::tag::list_tags(&self.pool, tenant_id, q).await
}
pub async fn get_tag(&self, tenant_id: Uuid, id: Uuid) -> Result<Tag, AppError> {
repositories::tag::get_tag(&self.pool, tenant_id, id).await
}
pub async fn update_tag(
&self,
tenant_id: Uuid,
id: Uuid,
name: Option<String>,
slug: Option<String>,
) -> Result<Tag, AppError> {
repositories::tag::update_tag(&self.pool, tenant_id, id, name, slug).await
}
pub async fn delete_tag(&self, tenant_id: Uuid, id: Uuid) -> Result<(), AppError> {
repositories::tag::delete_tag(&self.pool, tenant_id, id).await
}
pub async fn create_media(
&self,
tenant_id: Uuid,
url: String,
mime_type: Option<String>,
size_bytes: Option<i64>,
width: Option<i32>,
height: Option<i32>,
created_by: Option<Uuid>,
) -> Result<Media, AppError> {
repositories::media::create_media(
&self.pool,
tenant_id,
url,
mime_type,
size_bytes,
width,
height,
created_by,
)
.await
}
pub async fn list_media(
&self,
tenant_id: Uuid,
q: repositories::media::ListMediaQuery,
) -> Result<repositories::column::Paged<Media>, AppError> {
repositories::media::list_media(&self.pool, tenant_id, q).await
}
pub async fn get_media(&self, tenant_id: Uuid, id: Uuid) -> Result<Media, AppError> {
repositories::media::get_media(&self.pool, tenant_id, id).await
}
pub async fn delete_media(&self, tenant_id: Uuid, id: Uuid) -> Result<(), AppError> {
repositories::media::delete_media(&self.pool, tenant_id, id).await
}
pub async fn create_article(
&self,
tenant_id: Uuid,
column_id: Option<Uuid>,
title: String,
slug: String,
summary: Option<String>,
content: String,
tag_ids: Vec<Uuid>,
created_by: Option<Uuid>,
) -> Result<repositories::article::ArticleWithTags, AppError> {
repositories::article::create_article(
&self.pool,
tenant_id,
column_id,
title,
slug,
summary,
content,
tag_ids,
created_by,
)
.await
}
pub async fn get_article(
&self,
tenant_id: Uuid,
id: Uuid,
) -> Result<repositories::article::ArticleWithTags, AppError> {
repositories::article::get_article(&self.pool, tenant_id, id).await
}
pub async fn list_articles(
&self,
tenant_id: Uuid,
q: repositories::article::ListArticlesQuery,
) -> Result<repositories::column::Paged<Article>, AppError> {
repositories::article::list_articles(&self.pool, tenant_id, q).await
}
pub async fn update_article(
&self,
tenant_id: Uuid,
id: Uuid,
column_id: Option<Option<Uuid>>,
title: Option<String>,
slug: Option<String>,
summary: Option<Option<String>>,
content: Option<String>,
tag_ids: Option<Vec<Uuid>>,
updated_by: Option<Uuid>,
) -> Result<repositories::article::ArticleWithTags, AppError> {
repositories::article::update_article(
&self.pool,
tenant_id,
id,
column_id,
title,
slug,
summary,
content,
tag_ids,
updated_by,
)
.await
}
pub async fn publish_article(
&self,
tenant_id: Uuid,
id: Uuid,
user_id: Option<Uuid>,
) -> Result<Article, AppError> {
repositories::article::publish_article(&self.pool, tenant_id, id, user_id).await
}
pub async fn rollback_article(
&self,
tenant_id: Uuid,
id: Uuid,
to_version: i32,
user_id: Option<Uuid>,
) -> Result<Article, AppError> {
repositories::article::rollback_article(&self.pool, tenant_id, id, to_version, user_id)
.await
}
pub async fn list_versions(
&self,
tenant_id: Uuid,
article_id: Uuid,
page: u32,
page_size: u32,
) -> Result<repositories::column::Paged<ArticleVersion>, AppError> {
repositories::article::list_versions(&self.pool, tenant_id, article_id, page, page_size)
.await
}
}

69
src/config/mod.rs Normal file
View File

@@ -0,0 +1,69 @@
use std::env;
#[derive(Clone, Debug)]
pub struct AppConfig {
pub service_name: String,
pub log_level: String,
pub log_to_file: bool,
pub log_dir: String,
pub log_file_name: String,
pub database_url: String,
pub db_max_connections: u32,
pub db_min_connections: u32,
pub port: u16,
pub iam_base_url: String,
pub iam_jwks_url: Option<String>,
pub jwt_public_key_pem: Option<String>,
pub iam_timeout_ms: u64,
pub iam_cache_ttl_seconds: u64,
pub iam_stale_if_error_seconds: u64,
pub iam_cache_max_entries: usize,
}
impl AppConfig {
pub fn from_env() -> Result<Self, String> {
Ok(Self {
service_name: env::var("SERVICE_NAME").unwrap_or_else(|_| "cms-service".into()),
log_level: env::var("LOG_LEVEL").unwrap_or_else(|_| "info".into()),
log_to_file: env::var("LOG_TO_FILE")
.map(|v| v == "true" || v == "1")
.unwrap_or(false),
log_dir: env::var("LOG_DIR").unwrap_or_else(|_| "./log".into()),
log_file_name: env::var("LOG_FILE_NAME").unwrap_or_else(|_| "cms.log".into()),
database_url: env::var("DATABASE_URL")
.map_err(|_| "DATABASE_URL environment variable is required".to_string())?,
db_max_connections: env::var("DB_MAX_CONNECTIONS")
.unwrap_or("20".into())
.parse()
.map_err(|_| "DB_MAX_CONNECTIONS must be a number".to_string())?,
db_min_connections: env::var("DB_MIN_CONNECTIONS")
.unwrap_or("5".into())
.parse()
.map_err(|_| "DB_MIN_CONNECTIONS must be a number".to_string())?,
port: env::var("PORT")
.unwrap_or_else(|_| "3100".to_string())
.parse()
.map_err(|_| "PORT must be a valid number".to_string())?,
iam_base_url: env::var("IAM_BASE_URL")
.unwrap_or_else(|_| "http://localhost:3000".into()),
iam_jwks_url: env::var("IAM_JWKS_URL").ok(),
jwt_public_key_pem: env::var("JWT_PUBLIC_KEY_PEM").ok(),
iam_timeout_ms: env::var("IAM_TIMEOUT_MS")
.unwrap_or_else(|_| "2000".into())
.parse()
.map_err(|_| "IAM_TIMEOUT_MS must be a number".to_string())?,
iam_cache_ttl_seconds: env::var("IAM_CACHE_TTL_SECONDS")
.unwrap_or_else(|_| "10".into())
.parse()
.map_err(|_| "IAM_CACHE_TTL_SECONDS must be a number".to_string())?,
iam_stale_if_error_seconds: env::var("IAM_STALE_IF_ERROR_SECONDS")
.unwrap_or_else(|_| "60".into())
.parse()
.map_err(|_| "IAM_STALE_IF_ERROR_SECONDS must be a number".to_string())?,
iam_cache_max_entries: env::var("IAM_CACHE_MAX_ENTRIES")
.unwrap_or_else(|_| "50000".into())
.parse()
.map_err(|_| "IAM_CACHE_MAX_ENTRIES must be a number".to_string())?,
})
}
}

1
src/domain/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod models;

73
src/domain/models.rs Normal file
View File

@@ -0,0 +1,73 @@
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use utoipa::ToSchema;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, FromRow)]
pub struct Column {
pub tenant_id: Uuid,
pub id: Uuid,
pub name: String,
pub slug: String,
pub description: Option<String>,
pub parent_id: Option<Uuid>,
pub sort_order: i32,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, FromRow)]
pub struct Tag {
pub tenant_id: Uuid,
pub id: Uuid,
pub kind: String,
pub name: String,
pub slug: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, FromRow)]
pub struct Media {
pub tenant_id: Uuid,
pub id: Uuid,
pub url: String,
pub mime_type: Option<String>,
pub size_bytes: Option<i64>,
pub width: Option<i32>,
pub height: Option<i32>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub created_by: Option<Uuid>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, FromRow)]
pub struct Article {
pub tenant_id: Uuid,
pub id: Uuid,
pub column_id: Option<Uuid>,
pub title: String,
pub slug: String,
pub summary: Option<String>,
pub content: String,
pub status: String,
pub current_version: i32,
pub published_at: Option<chrono::DateTime<chrono::Utc>>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub created_by: Option<Uuid>,
pub updated_by: Option<Uuid>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, FromRow)]
pub struct ArticleVersion {
pub tenant_id: Uuid,
pub id: Uuid,
pub article_id: Uuid,
pub version: i32,
pub title: String,
pub summary: Option<String>,
pub content: String,
pub status: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub created_by: Option<Uuid>,
}

View File

@@ -0,0 +1,16 @@
use crate::config::AppConfig;
use sqlx::postgres::{PgPool, PgPoolOptions};
use std::time::Duration;
pub async fn init_pool(config: &AppConfig) -> Result<PgPool, sqlx::Error> {
PgPoolOptions::new()
.max_connections(config.db_max_connections)
.min_connections(config.db_min_connections)
.acquire_timeout(Duration::from_secs(3))
.connect(&config.database_url)
.await
}
pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::migrate::MigrateError> {
sqlx::migrate!("./migrations").run(pool).await
}

View File

@@ -0,0 +1,224 @@
use std::{
hash::{Hash, Hasher},
sync::Arc,
time::{Duration, Instant},
};
use common_telemetry::AppError;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, Debug)]
pub struct IamClientConfig {
pub base_url: String,
pub timeout: Duration,
pub cache_ttl: Duration,
pub cache_stale_if_error: Duration,
pub cache_max_entries: usize,
}
#[derive(Clone)]
pub struct IamClient {
inner: Arc<IamClientInner>,
}
struct IamClientInner {
http: reqwest::Client,
cfg: IamClientConfig,
cache: DashMap<CacheKey, CacheEntry>,
}
#[derive(Clone)]
struct CacheKey {
tenant_id: Uuid,
user_id: Uuid,
permission: String,
}
impl PartialEq for CacheKey {
fn eq(&self, other: &Self) -> bool {
self.tenant_id == other.tenant_id
&& self.user_id == other.user_id
&& self.permission == other.permission
}
}
impl Eq for CacheKey {}
impl Hash for CacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.tenant_id.hash(state);
self.user_id.hash(state);
self.permission.hash(state);
}
}
#[derive(Clone)]
struct CacheEntry {
allowed: bool,
expires_at: Instant,
stale_until: Instant,
}
#[derive(Debug, Serialize)]
struct AuthorizationCheckRequest {
permission: String,
}
#[derive(Debug, Deserialize)]
struct AuthorizationCheckResponse {
allowed: bool,
}
#[derive(Debug, Deserialize)]
struct ApiSuccessResponse<T> {
#[allow(dead_code)]
code: u32,
#[allow(dead_code)]
message: String,
data: Option<T>,
}
impl IamClient {
pub fn new(cfg: IamClientConfig) -> Self {
let http = reqwest::Client::builder()
.timeout(cfg.timeout)
.build()
.expect("failed to build reqwest client");
Self {
inner: Arc::new(IamClientInner {
http,
cfg,
cache: DashMap::new(),
}),
}
}
pub async fn require_permission(
&self,
tenant_id: Uuid,
user_id: Uuid,
permission: &str,
access_token: &str,
) -> Result<(), AppError> {
let allowed = self
.check_permission(tenant_id, user_id, permission, access_token)
.await?;
if allowed {
Ok(())
} else {
Err(AppError::PermissionDenied(permission.to_string()))
}
}
async fn check_permission(
&self,
tenant_id: Uuid,
user_id: Uuid,
permission: &str,
access_token: &str,
) -> Result<bool, AppError> {
let key = CacheKey {
tenant_id,
user_id,
permission: permission.to_string(),
};
let now = Instant::now();
if let Some(entry) = self.inner.cache.get(&key).map(|e| e.clone()) {
if entry.expires_at > now {
return Ok(entry.allowed);
}
}
let remote = self
.check_permission_remote(tenant_id, permission, access_token)
.await;
match remote {
Ok(allowed) => {
self.set_cache(key, allowed);
Ok(allowed)
}
Err(e) => {
if let Some(entry) = self.inner.cache.get(&key).map(|e| e.clone()) {
if entry.stale_until > now {
tracing::warn!(
tenant_id = %tenant_id,
user_id = %user_id,
action = "iam_client.degraded_cache",
latency_ms = 0_u64,
error_code = "iam_degraded_cache"
);
return Ok(entry.allowed);
}
}
Err(e)
}
}
}
fn set_cache(&self, key: CacheKey, allowed: bool) {
if self.inner.cache.len() > self.inner.cfg.cache_max_entries {
self.inner.cache.clear();
}
let now = Instant::now();
let entry = CacheEntry {
allowed,
expires_at: now + self.inner.cfg.cache_ttl,
stale_until: now + self.inner.cfg.cache_ttl + self.inner.cfg.cache_stale_if_error,
};
self.inner.cache.insert(key, entry);
}
async fn check_permission_remote(
&self,
tenant_id: Uuid,
permission: &str,
access_token: &str,
) -> Result<bool, AppError> {
let url = format!(
"{}/authorize/check",
self.inner.cfg.base_url.trim_end_matches('/')
);
let resp = self
.inner
.http
.post(url)
.bearer_auth(access_token)
.header("X-Tenant-ID", tenant_id.to_string())
.json(&AuthorizationCheckRequest {
permission: permission.to_string(),
})
.send()
.await
.map_err(|e| AppError::ExternalReqError(format!("iam:request_failed:{}", e)))?;
let status = resp.status();
if status == reqwest::StatusCode::UNAUTHORIZED {
return Err(AppError::AuthError("iam:unauthorized".into()));
}
if status == reqwest::StatusCode::FORBIDDEN {
return Err(AppError::PermissionDenied("iam:forbidden".into()));
}
if !status.is_success() {
return Err(AppError::ExternalReqError(format!(
"iam:unexpected_status:{}",
status.as_u16()
)));
}
let body: ApiSuccessResponse<AuthorizationCheckResponse> = resp
.json()
.await
.map_err(|e| AppError::ExternalReqError(format!("iam:decode_failed:{}", e)))?;
let allowed = body
.data
.map(|d| d.allowed)
.ok_or_else(|| AppError::ExternalReqError("iam:missing_data".into()))?;
Ok(allowed)
}
}

View File

@@ -0,0 +1,3 @@
pub mod db;
pub mod iam_client;
pub mod repositories;

View File

@@ -0,0 +1,480 @@
use crate::domain::models::{Article, ArticleVersion};
use common_telemetry::AppError;
use sqlx::{PgPool, Postgres, Transaction};
use uuid::Uuid;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
pub struct ArticleWithTags {
pub article: Article,
pub tag_ids: Vec<Uuid>,
}
async fn list_tag_ids_for_article(
tx: &mut Transaction<'_, Postgres>,
tenant_id: Uuid,
article_id: Uuid,
) -> Result<Vec<Uuid>, AppError> {
let tags = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT tag_id
FROM cms_article_tags
WHERE tenant_id = $1 AND article_id = $2
ORDER BY tag_id
"#,
)
.bind(tenant_id)
.bind(article_id)
.fetch_all(&mut **tx)
.await?;
Ok(tags)
}
pub async fn create_article(
pool: &PgPool,
tenant_id: Uuid,
column_id: Option<Uuid>,
title: String,
slug: String,
summary: Option<String>,
content: String,
tag_ids: Vec<Uuid>,
created_by: Option<Uuid>,
) -> Result<ArticleWithTags, AppError> {
let mut tx = pool.begin().await?;
let article = sqlx::query_as::<_, Article>(
r#"
INSERT INTO cms_articles (tenant_id, column_id, title, slug, summary, content, created_by, updated_by)
VALUES ($1, $2, $3, $4, $5, $6, $7, $7)
RETURNING
tenant_id, id, column_id, title, slug, summary, content,
status::text as status, current_version, published_at,
created_at, updated_at, created_by, updated_by
"#,
)
.bind(tenant_id)
.bind(column_id)
.bind(title)
.bind(slug)
.bind(summary)
.bind(content)
.bind(created_by)
.fetch_one(&mut *tx)
.await?;
for tag_id in &tag_ids {
sqlx::query(
r#"
INSERT INTO cms_article_tags (tenant_id, article_id, tag_id)
VALUES ($1, $2, $3)
ON CONFLICT DO NOTHING
"#,
)
.bind(tenant_id)
.bind(article.id)
.bind(tag_id)
.execute(&mut *tx)
.await?;
}
let tag_ids = list_tag_ids_for_article(&mut tx, tenant_id, article.id).await?;
tx.commit().await?;
Ok(ArticleWithTags { article, tag_ids })
}
pub async fn get_article(
pool: &PgPool,
tenant_id: Uuid,
id: Uuid,
) -> Result<ArticleWithTags, AppError> {
let mut tx = pool.begin().await?;
let article = sqlx::query_as::<_, Article>(
r#"
SELECT
tenant_id, id, column_id, title, slug, summary, content,
status::text as status, current_version, published_at,
created_at, updated_at, created_by, updated_by
FROM cms_articles
WHERE tenant_id = $1 AND id = $2
"#,
)
.bind(tenant_id)
.bind(id)
.fetch_one(&mut *tx)
.await?;
let tag_ids = list_tag_ids_for_article(&mut tx, tenant_id, id).await?;
tx.commit().await?;
Ok(ArticleWithTags { article, tag_ids })
}
pub async fn update_article(
pool: &PgPool,
tenant_id: Uuid,
id: Uuid,
column_id: Option<Option<Uuid>>,
title: Option<String>,
slug: Option<String>,
summary: Option<Option<String>>,
content: Option<String>,
tag_ids: Option<Vec<Uuid>>,
updated_by: Option<Uuid>,
) -> Result<ArticleWithTags, AppError> {
let mut tx = pool.begin().await?;
let article = sqlx::query_as::<_, Article>(
r#"
UPDATE cms_articles
SET
column_id = COALESCE($3, column_id),
title = COALESCE($4, title),
slug = COALESCE($5, slug),
summary = COALESCE($6, summary),
content = COALESCE($7, content),
updated_at = now(),
updated_by = COALESCE($8, updated_by)
WHERE tenant_id = $1 AND id = $2
RETURNING
tenant_id, id, column_id, title, slug, summary, content,
status::text as status, current_version, published_at,
created_at, updated_at, created_by, updated_by
"#,
)
.bind(tenant_id)
.bind(id)
.bind(column_id)
.bind(title)
.bind(slug)
.bind(summary)
.bind(content)
.bind(updated_by)
.fetch_one(&mut *tx)
.await?;
if let Some(tag_ids) = tag_ids {
sqlx::query(
r#"
DELETE FROM cms_article_tags
WHERE tenant_id = $1 AND article_id = $2
"#,
)
.bind(tenant_id)
.bind(id)
.execute(&mut *tx)
.await?;
for tag_id in &tag_ids {
sqlx::query(
r#"
INSERT INTO cms_article_tags (tenant_id, article_id, tag_id)
VALUES ($1, $2, $3)
ON CONFLICT DO NOTHING
"#,
)
.bind(tenant_id)
.bind(id)
.bind(tag_id)
.execute(&mut *tx)
.await?;
}
}
let tag_ids = list_tag_ids_for_article(&mut tx, tenant_id, id).await?;
tx.commit().await?;
Ok(ArticleWithTags { article, tag_ids })
}
#[derive(Debug, Clone)]
pub struct ListArticlesQuery {
pub page: u32,
pub page_size: u32,
pub q: Option<String>,
pub status: Option<String>,
pub column_id: Option<Uuid>,
pub tag_id: Option<Uuid>,
}
pub async fn list_articles(
pool: &PgPool,
tenant_id: Uuid,
q: ListArticlesQuery,
) -> Result<super::column::Paged<Article>, AppError> {
let page = q.page.max(1);
let page_size = q.page_size.clamp(1, 200);
let offset = ((page - 1) * page_size) as i64;
let limit = page_size as i64;
let like = q.q.map(|s| format!("%{}%", s));
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(DISTINCT a.id)
FROM cms_articles a
LEFT JOIN cms_article_tags at ON at.tenant_id = a.tenant_id AND at.article_id = a.id
WHERE a.tenant_id = $1
AND ($2::cms_article_status IS NULL OR a.status = $2::cms_article_status)
AND ($3::uuid IS NULL OR a.column_id = $3)
AND ($4::uuid IS NULL OR at.tag_id = $4)
AND ($5::text IS NULL OR a.title ILIKE $5 OR a.slug ILIKE $5 OR COALESCE(a.summary, '') ILIKE $5)
"#,
)
.bind(tenant_id)
.bind(q.status.as_deref())
.bind(q.column_id)
.bind(q.tag_id)
.bind(like.as_deref())
.fetch_one(pool)
.await?;
let items = sqlx::query_as::<_, Article>(
r#"
SELECT
a.tenant_id, a.id, a.column_id, a.title, a.slug, a.summary, a.content,
a.status::text as status, a.current_version, a.published_at,
a.created_at, a.updated_at, a.created_by, a.updated_by
FROM cms_articles a
WHERE a.tenant_id = $1
AND ($2::cms_article_status IS NULL OR a.status = $2::cms_article_status)
AND ($3::uuid IS NULL OR a.column_id = $3)
AND ($5::text IS NULL OR a.title ILIKE $5 OR a.slug ILIKE $5 OR COALESCE(a.summary, '') ILIKE $5)
AND (
$4::uuid IS NULL OR EXISTS (
SELECT 1 FROM cms_article_tags at
WHERE at.tenant_id = a.tenant_id AND at.article_id = a.id AND at.tag_id = $4
)
)
ORDER BY a.updated_at DESC
OFFSET $6
LIMIT $7
"#,
)
.bind(tenant_id)
.bind(q.status.as_deref())
.bind(q.column_id)
.bind(q.tag_id)
.bind(like.as_deref())
.bind(offset)
.bind(limit)
.fetch_all(pool)
.await?;
Ok(super::column::Paged {
items,
page,
page_size,
total,
})
}
pub async fn publish_article(
pool: &PgPool,
tenant_id: Uuid,
id: Uuid,
user_id: Option<Uuid>,
) -> Result<Article, AppError> {
let mut tx = pool.begin().await?;
let article = sqlx::query_as::<_, Article>(
r#"
SELECT
tenant_id, id, column_id, title, slug, summary, content,
status::text as status, current_version, published_at,
created_at, updated_at, created_by, updated_by
FROM cms_articles
WHERE tenant_id = $1 AND id = $2
FOR UPDATE
"#,
)
.bind(tenant_id)
.bind(id)
.fetch_one(&mut *tx)
.await?;
let next_version = article.current_version + 1;
sqlx::query(
r#"
INSERT INTO cms_article_versions (tenant_id, article_id, version, title, summary, content, status, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7::cms_article_status, $8)
"#,
)
.bind(tenant_id)
.bind(id)
.bind(next_version)
.bind(&article.title)
.bind(&article.summary)
.bind(&article.content)
.bind("published")
.bind(user_id)
.execute(&mut *tx)
.await?;
let updated = sqlx::query_as::<_, Article>(
r#"
UPDATE cms_articles
SET
status = 'published',
current_version = $3,
published_at = COALESCE(published_at, now()),
updated_at = now(),
updated_by = COALESCE($4, updated_by)
WHERE tenant_id = $1 AND id = $2
RETURNING
tenant_id, id, column_id, title, slug, summary, content,
status::text as status, current_version, published_at,
created_at, updated_at, created_by, updated_by
"#,
)
.bind(tenant_id)
.bind(id)
.bind(next_version)
.bind(user_id)
.fetch_one(&mut *tx)
.await?;
tx.commit().await?;
Ok(updated)
}
pub async fn rollback_article(
pool: &PgPool,
tenant_id: Uuid,
id: Uuid,
to_version: i32,
user_id: Option<Uuid>,
) -> Result<Article, AppError> {
let mut tx = pool.begin().await?;
let target = sqlx::query_as::<_, ArticleVersion>(
r#"
SELECT
tenant_id, id, article_id, version,
title, summary, content, status::text as status,
created_at, created_by
FROM cms_article_versions
WHERE tenant_id = $1 AND article_id = $2 AND version = $3
"#,
)
.bind(tenant_id)
.bind(id)
.bind(to_version)
.fetch_one(&mut *tx)
.await?;
let current_version: i32 = sqlx::query_scalar(
r#"
SELECT current_version
FROM cms_articles
WHERE tenant_id = $1 AND id = $2
FOR UPDATE
"#,
)
.bind(tenant_id)
.bind(id)
.fetch_one(&mut *tx)
.await?;
let next_version = current_version + 1;
let updated = sqlx::query_as::<_, Article>(
r#"
UPDATE cms_articles
SET
title = $3,
summary = $4,
content = $5,
status = $6::cms_article_status,
current_version = $7,
updated_at = now(),
updated_by = COALESCE($8, updated_by)
WHERE tenant_id = $1 AND id = $2
RETURNING
tenant_id, id, column_id, title, slug, summary, content,
status::text as status, current_version, published_at,
created_at, updated_at, created_by, updated_by
"#,
)
.bind(tenant_id)
.bind(id)
.bind(&target.title)
.bind(&target.summary)
.bind(&target.content)
.bind(&target.status)
.bind(next_version)
.bind(user_id)
.fetch_one(&mut *tx)
.await?;
sqlx::query(
r#"
INSERT INTO cms_article_versions (tenant_id, article_id, version, title, summary, content, status, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7::cms_article_status, $8)
"#,
)
.bind(tenant_id)
.bind(id)
.bind(next_version)
.bind(&updated.title)
.bind(&updated.summary)
.bind(&updated.content)
.bind(&updated.status)
.bind(user_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(updated)
}
pub async fn list_versions(
pool: &PgPool,
tenant_id: Uuid,
article_id: Uuid,
page: u32,
page_size: u32,
) -> Result<super::column::Paged<ArticleVersion>, AppError> {
let page = page.max(1);
let page_size = page_size.clamp(1, 200);
let offset = ((page - 1) * page_size) as i64;
let limit = page_size as i64;
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM cms_article_versions
WHERE tenant_id = $1 AND article_id = $2
"#,
)
.bind(tenant_id)
.bind(article_id)
.fetch_one(pool)
.await?;
let items = sqlx::query_as::<_, ArticleVersion>(
r#"
SELECT
tenant_id, id, article_id, version,
title, summary, content, status::text as status,
created_at, created_by
FROM cms_article_versions
WHERE tenant_id = $1 AND article_id = $2
ORDER BY version DESC
OFFSET $3
LIMIT $4
"#,
)
.bind(tenant_id)
.bind(article_id)
.bind(offset)
.bind(limit)
.fetch_all(pool)
.await?;
Ok(super::column::Paged {
items,
page,
page_size,
total,
})
}

View File

@@ -0,0 +1,174 @@
use crate::domain::models::Column;
use common_telemetry::AppError;
use sqlx::PgPool;
use uuid::Uuid;
pub async fn create_column(
pool: &PgPool,
tenant_id: Uuid,
name: String,
slug: String,
description: Option<String>,
parent_id: Option<Uuid>,
sort_order: i32,
) -> Result<Column, AppError> {
let column = sqlx::query_as::<_, Column>(
r#"
INSERT INTO cms_columns (tenant_id, name, slug, description, parent_id, sort_order)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING tenant_id, id, name, slug, description, parent_id, sort_order, created_at, updated_at
"#,
)
.bind(tenant_id)
.bind(name)
.bind(slug)
.bind(description)
.bind(parent_id)
.bind(sort_order)
.fetch_one(pool)
.await?;
Ok(column)
}
pub async fn get_column(pool: &PgPool, tenant_id: Uuid, id: Uuid) -> Result<Column, AppError> {
let column = sqlx::query_as::<_, Column>(
r#"
SELECT tenant_id, id, name, slug, description, parent_id, sort_order, created_at, updated_at
FROM cms_columns
WHERE tenant_id = $1 AND id = $2
"#,
)
.bind(tenant_id)
.bind(id)
.fetch_one(pool)
.await?;
Ok(column)
}
pub async fn update_column(
pool: &PgPool,
tenant_id: Uuid,
id: Uuid,
name: Option<String>,
slug: Option<String>,
description: Option<Option<String>>,
parent_id: Option<Option<Uuid>>,
sort_order: Option<i32>,
) -> Result<Column, AppError> {
let column = sqlx::query_as::<_, Column>(
r#"
UPDATE cms_columns
SET
name = COALESCE($3, name),
slug = COALESCE($4, slug),
description = COALESCE($5, description),
parent_id = COALESCE($6, parent_id),
sort_order = COALESCE($7, sort_order),
updated_at = now()
WHERE tenant_id = $1 AND id = $2
RETURNING tenant_id, id, name, slug, description, parent_id, sort_order, created_at, updated_at
"#,
)
.bind(tenant_id)
.bind(id)
.bind(name)
.bind(slug)
.bind(description)
.bind(parent_id)
.bind(sort_order)
.fetch_one(pool)
.await?;
Ok(column)
}
pub async fn delete_column(pool: &PgPool, tenant_id: Uuid, id: Uuid) -> Result<(), AppError> {
let res = sqlx::query(
r#"
DELETE FROM cms_columns
WHERE tenant_id = $1 AND id = $2
"#,
)
.bind(tenant_id)
.bind(id)
.execute(pool)
.await?;
if res.rows_affected() == 0 {
return Err(AppError::NotFound("column:not_found".into()));
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct ListColumnsQuery {
pub page: u32,
pub page_size: u32,
pub search: Option<String>,
pub parent_id: Option<Uuid>,
}
#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
pub struct Paged<T> {
pub items: Vec<T>,
pub page: u32,
pub page_size: u32,
pub total: i64,
}
pub async fn list_columns(
pool: &PgPool,
tenant_id: Uuid,
q: ListColumnsQuery,
) -> Result<Paged<Column>, AppError> {
let page = q.page.max(1);
let page_size = q.page_size.clamp(1, 200);
let offset = ((page - 1) * page_size) as i64;
let limit = page_size as i64;
let like = q.search.map(|s| format!("%{}%", s));
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM cms_columns
WHERE tenant_id = $1
AND ($2::uuid IS NULL OR parent_id = $2)
AND ($3::text IS NULL OR name ILIKE $3 OR slug ILIKE $3)
"#,
)
.bind(tenant_id)
.bind(q.parent_id)
.bind(like.as_deref())
.fetch_one(pool)
.await?;
let items = sqlx::query_as::<_, Column>(
r#"
SELECT tenant_id, id, name, slug, description, parent_id, sort_order, created_at, updated_at
FROM cms_columns
WHERE tenant_id = $1
AND ($2::uuid IS NULL OR parent_id = $2)
AND ($3::text IS NULL OR name ILIKE $3 OR slug ILIKE $3)
ORDER BY sort_order ASC, updated_at DESC
OFFSET $4
LIMIT $5
"#,
)
.bind(tenant_id)
.bind(q.parent_id)
.bind(like.as_deref())
.bind(offset)
.bind(limit)
.fetch_all(pool)
.await?;
Ok(Paged {
items,
page,
page_size,
total,
})
}

View File

@@ -0,0 +1,124 @@
use crate::domain::models::Media;
use common_telemetry::AppError;
use sqlx::PgPool;
use uuid::Uuid;
pub async fn create_media(
pool: &PgPool,
tenant_id: Uuid,
url: String,
mime_type: Option<String>,
size_bytes: Option<i64>,
width: Option<i32>,
height: Option<i32>,
created_by: Option<Uuid>,
) -> Result<Media, AppError> {
let media = sqlx::query_as::<_, Media>(
r#"
INSERT INTO cms_media (tenant_id, url, mime_type, size_bytes, width, height, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING tenant_id, id, url, mime_type, size_bytes, width, height, created_at, created_by
"#,
)
.bind(tenant_id)
.bind(url)
.bind(mime_type)
.bind(size_bytes)
.bind(width)
.bind(height)
.bind(created_by)
.fetch_one(pool)
.await?;
Ok(media)
}
pub async fn get_media(pool: &PgPool, tenant_id: Uuid, id: Uuid) -> Result<Media, AppError> {
let media = sqlx::query_as::<_, Media>(
r#"
SELECT tenant_id, id, url, mime_type, size_bytes, width, height, created_at, created_by
FROM cms_media
WHERE tenant_id = $1 AND id = $2
"#,
)
.bind(tenant_id)
.bind(id)
.fetch_one(pool)
.await?;
Ok(media)
}
pub async fn delete_media(pool: &PgPool, tenant_id: Uuid, id: Uuid) -> Result<(), AppError> {
let res = sqlx::query(
r#"
DELETE FROM cms_media
WHERE tenant_id = $1 AND id = $2
"#,
)
.bind(tenant_id)
.bind(id)
.execute(pool)
.await?;
if res.rows_affected() == 0 {
return Err(AppError::NotFound("media:not_found".into()));
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct ListMediaQuery {
pub page: u32,
pub page_size: u32,
pub search: Option<String>,
}
pub async fn list_media(
pool: &PgPool,
tenant_id: Uuid,
q: ListMediaQuery,
) -> Result<super::column::Paged<Media>, AppError> {
let page = q.page.max(1);
let page_size = q.page_size.clamp(1, 200);
let offset = ((page - 1) * page_size) as i64;
let limit = page_size as i64;
let like = q.search.map(|s| format!("%{}%", s));
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM cms_media
WHERE tenant_id = $1
AND ($2::text IS NULL OR url ILIKE $2)
"#,
)
.bind(tenant_id)
.bind(like.as_deref())
.fetch_one(pool)
.await?;
let items = sqlx::query_as::<_, Media>(
r#"
SELECT tenant_id, id, url, mime_type, size_bytes, width, height, created_at, created_by
FROM cms_media
WHERE tenant_id = $1
AND ($2::text IS NULL OR url ILIKE $2)
ORDER BY created_at DESC
OFFSET $3
LIMIT $4
"#,
)
.bind(tenant_id)
.bind(like.as_deref())
.bind(offset)
.bind(limit)
.fetch_all(pool)
.await?;
Ok(super::column::Paged {
items,
page,
page_size,
total,
})
}

View File

@@ -0,0 +1,4 @@
pub mod article;
pub mod column;
pub mod media;
pub mod tag;

View File

@@ -0,0 +1,150 @@
use crate::domain::models::Tag;
use common_telemetry::AppError;
use sqlx::PgPool;
use uuid::Uuid;
pub async fn create_tag(
pool: &PgPool,
tenant_id: Uuid,
kind: String,
name: String,
slug: String,
) -> Result<Tag, AppError> {
let tag = sqlx::query_as::<_, Tag>(
r#"
INSERT INTO cms_tags (tenant_id, kind, name, slug)
VALUES ($1, $2::cms_tag_kind, $3, $4)
RETURNING tenant_id, id, kind::text as kind, name, slug, created_at, updated_at
"#,
)
.bind(tenant_id)
.bind(kind)
.bind(name)
.bind(slug)
.fetch_one(pool)
.await?;
Ok(tag)
}
pub async fn get_tag(pool: &PgPool, tenant_id: Uuid, id: Uuid) -> Result<Tag, AppError> {
let tag = sqlx::query_as::<_, Tag>(
r#"
SELECT tenant_id, id, kind::text as kind, name, slug, created_at, updated_at
FROM cms_tags
WHERE tenant_id = $1 AND id = $2
"#,
)
.bind(tenant_id)
.bind(id)
.fetch_one(pool)
.await?;
Ok(tag)
}
pub async fn update_tag(
pool: &PgPool,
tenant_id: Uuid,
id: Uuid,
name: Option<String>,
slug: Option<String>,
) -> Result<Tag, AppError> {
let tag = sqlx::query_as::<_, Tag>(
r#"
UPDATE cms_tags
SET
name = COALESCE($3, name),
slug = COALESCE($4, slug),
updated_at = now()
WHERE tenant_id = $1 AND id = $2
RETURNING tenant_id, id, kind::text as kind, name, slug, created_at, updated_at
"#,
)
.bind(tenant_id)
.bind(id)
.bind(name)
.bind(slug)
.fetch_one(pool)
.await?;
Ok(tag)
}
pub async fn delete_tag(pool: &PgPool, tenant_id: Uuid, id: Uuid) -> Result<(), AppError> {
let res = sqlx::query(
r#"
DELETE FROM cms_tags
WHERE tenant_id = $1 AND id = $2
"#,
)
.bind(tenant_id)
.bind(id)
.execute(pool)
.await?;
if res.rows_affected() == 0 {
return Err(AppError::NotFound("tag:not_found".into()));
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct ListTagsQuery {
pub page: u32,
pub page_size: u32,
pub search: Option<String>,
pub kind: Option<String>,
}
pub async fn list_tags(
pool: &PgPool,
tenant_id: Uuid,
q: ListTagsQuery,
) -> Result<super::column::Paged<Tag>, AppError> {
let page = q.page.max(1);
let page_size = q.page_size.clamp(1, 200);
let offset = ((page - 1) * page_size) as i64;
let limit = page_size as i64;
let like = q.search.map(|s| format!("%{}%", s));
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM cms_tags
WHERE tenant_id = $1
AND ($2::cms_tag_kind IS NULL OR kind = $2::cms_tag_kind)
AND ($3::text IS NULL OR name ILIKE $3 OR slug ILIKE $3)
"#,
)
.bind(tenant_id)
.bind(q.kind.as_deref())
.bind(like.as_deref())
.fetch_one(pool)
.await?;
let items = sqlx::query_as::<_, Tag>(
r#"
SELECT tenant_id, id, kind::text as kind, name, slug, created_at, updated_at
FROM cms_tags
WHERE tenant_id = $1
AND ($2::cms_tag_kind IS NULL OR kind = $2::cms_tag_kind)
AND ($3::text IS NULL OR name ILIKE $3 OR slug ILIKE $3)
ORDER BY updated_at DESC
OFFSET $4
LIMIT $5
"#,
)
.bind(tenant_id)
.bind(q.kind.as_deref())
.bind(like.as_deref())
.bind(offset)
.bind(limit)
.fetch_all(pool)
.await?;
Ok(super::column::Paged {
items,
page,
page_size,
total,
})
}

5
src/lib.rs Normal file
View File

@@ -0,0 +1,5 @@
pub mod api;
pub mod application;
pub mod config;
pub mod domain;
pub mod infrastructure;

85
src/main.rs Normal file
View File

@@ -0,0 +1,85 @@
use axum::middleware::{from_fn, from_fn_with_state};
use cms_service::{
api::{self, AppState},
application::services::CmsServices,
config::AppConfig,
infrastructure::{db, iam_client::{IamClient, IamClientConfig}},
};
use common_telemetry::telemetry::{self, TelemetryConfig};
use auth_kit::middleware::{tenant::TenantMiddlewareConfig, auth::AuthMiddlewareConfig};
use std::net::SocketAddr;
use std::time::Duration;
#[tokio::main]
async fn main() {
dotenvy::dotenv().ok();
let config = AppConfig::from_env().expect("failed to load config");
let _guard = telemetry::init(TelemetryConfig {
service_name: config.service_name.clone(),
log_level: config.log_level.clone(),
log_to_file: config.log_to_file,
log_dir: Some(config.log_dir.clone()),
log_file: Some(config.log_file_name.clone()),
});
let pool = db::init_pool(&config).await.expect("failed to init db pool");
db::run_migrations(&pool)
.await
.expect("failed to run migrations");
let state = AppState {
services: CmsServices::new(pool),
iam_client: IamClient::new(IamClientConfig {
base_url: config.iam_base_url.clone(),
timeout: Duration::from_millis(config.iam_timeout_ms),
cache_ttl: Duration::from_secs(config.iam_cache_ttl_seconds),
cache_stale_if_error: Duration::from_secs(config.iam_stale_if_error_seconds),
cache_max_entries: config.iam_cache_max_entries,
}),
};
let auth_cfg = AuthMiddlewareConfig {
skip_exact_paths: vec!["/healthz".to_string()],
skip_path_prefixes: vec!["/scalar".to_string()],
jwt: match &config.jwt_public_key_pem {
Some(pem) => auth_kit::jwt::JwtVerifyConfig::rs256_from_pem("iam-service", pem)
.expect("invalid JWT_PUBLIC_KEY_PEM"),
None => {
let jwks_url = config.iam_jwks_url.clone().unwrap_or_else(|| {
format!(
"{}/.well-known/jwks.json",
config.iam_base_url.trim_end_matches('/')
)
});
auth_kit::jwt::JwtVerifyConfig::rs256_from_jwks("iam-service", &jwks_url)
.expect("invalid IAM_JWKS_URL")
}
},
};
let tenant_cfg = TenantMiddlewareConfig {
skip_exact_paths: vec!["/healthz".to_string()],
skip_path_prefixes: vec!["/scalar".to_string()],
};
let app = api::build_router(state)
.layer(from_fn_with_state(
tenant_cfg,
auth_kit::middleware::tenant::resolve_tenant_with_config,
))
.layer(from_fn_with_state(
auth_cfg,
auth_kit::middleware::auth::authenticate_with_config,
))
.layer(from_fn(common_telemetry::axum_middleware::trace_http_request))
.layer(from_fn(cms_service::api::middleware::ensure_request_id));
let addr = SocketAddr::from(([0, 0, 0, 0], config.port));
tracing::info!("Server started at http://{}", addr);
tracing::info!("Docs available at http://{}/scalar", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
.await
.unwrap();
}

128
tests/iam_client_cache.rs Normal file
View File

@@ -0,0 +1,128 @@
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use std::time::Duration;
use axum::{Json, Router, routing::post};
use axum::response::IntoResponse;
use cms_service::infrastructure::iam_client::{IamClient, IamClientConfig};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
struct AuthorizationCheckRequest {
permission: String,
}
#[derive(Debug, Serialize)]
struct AuthorizationCheckResponse {
allowed: bool,
}
#[derive(Debug, Serialize)]
struct ApiSuccessResponse<T> {
code: u32,
message: String,
data: T,
trace_id: Option<String>,
}
async fn start_mock_iam(
call_count: Arc<AtomicUsize>,
fail: Arc<AtomicBool>,
) -> (String, tokio::task::JoinHandle<()>) {
let app = Router::new().route(
"/authorize/check",
post(move |Json(body): Json<AuthorizationCheckRequest>| {
let call_count = call_count.clone();
let fail = fail.clone();
async move {
call_count.fetch_add(1, Ordering::SeqCst);
if fail.load(Ordering::SeqCst) {
return (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "fail").into_response();
}
let allowed = body.permission == "cms:article:read";
let resp = ApiSuccessResponse {
code: 0,
message: "ok".to_string(),
data: AuthorizationCheckResponse { allowed },
trace_id: None,
};
(axum::http::StatusCode::OK, Json(resp)).into_response()
}
}),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let base_url = format!("http://{}", addr);
let handle = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(base_url, handle)
}
#[tokio::test]
async fn iam_client_caches_decisions() {
let call_count = Arc::new(AtomicUsize::new(0));
let fail = Arc::new(AtomicBool::new(false));
let (base_url, handle) = start_mock_iam(call_count.clone(), fail.clone()).await;
let client = IamClient::new(IamClientConfig {
base_url,
timeout: Duration::from_millis(500),
cache_ttl: Duration::from_secs(5),
cache_stale_if_error: Duration::from_secs(30),
cache_max_entries: 1000,
});
let tenant_id = uuid::Uuid::new_v4();
let user_id = uuid::Uuid::new_v4();
client
.require_permission(tenant_id, user_id, "cms:article:read", "token")
.await
.unwrap();
client
.require_permission(tenant_id, user_id, "cms:article:read", "token")
.await
.unwrap();
assert_eq!(call_count.load(Ordering::SeqCst), 1);
handle.abort();
}
#[tokio::test]
async fn iam_client_uses_stale_cache_on_error() {
let call_count = Arc::new(AtomicUsize::new(0));
let fail = Arc::new(AtomicBool::new(false));
let (base_url, handle) = start_mock_iam(call_count.clone(), fail.clone()).await;
let client = IamClient::new(IamClientConfig {
base_url,
timeout: Duration::from_millis(500),
cache_ttl: Duration::from_millis(50),
cache_stale_if_error: Duration::from_secs(30),
cache_max_entries: 1000,
});
let tenant_id = uuid::Uuid::new_v4();
let user_id = uuid::Uuid::new_v4();
client
.require_permission(tenant_id, user_id, "cms:article:read", "token")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(70)).await;
fail.store(true, Ordering::SeqCst);
client
.require_permission(tenant_id, user_id, "cms:article:read", "token")
.await
.unwrap();
assert!(call_count.load(Ordering::SeqCst) >= 1);
handle.abort();
}