use crate::models::{CreateTenantRequest, Tenant, UpdateTenantRequest, UpdateTenantStatusRequest}; use common_telemetry::AppError; use serde_json::Value; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; use tracing::instrument; use uuid::Uuid; #[derive(Clone)] struct EnabledAppsCacheEntry { enabled_apps: Vec, version: i32, updated_at: String, expires_at: Instant, } #[derive(Clone)] pub struct TenantService { pool: PgPool, enabled_apps_cache: Arc>>, } impl TenantService { /// 创建租户服务实例。 pub fn new(pool: PgPool) -> Self { Self { pool, enabled_apps_cache: Arc::new(RwLock::new(HashMap::new())), } } #[instrument(skip(self, req))] /// 创建新租户并初始化默认状态与配置。 /// /// 业务规则: /// - 默认 `status=active`。 /// - `config` 未提供时默认 `{}`。 /// /// 输出: /// - 返回新建租户记录(含 `id`) /// /// 异常: /// - 数据库写入失败(如连接异常、约束失败等) pub async fn create_tenant(&self, req: CreateTenantRequest) -> Result { let mut config = req .config .unwrap_or_else(|| Value::Object(Default::default())); if !config.is_object() { config = Value::Object(Default::default()); } if let Some(obj) = config.as_object_mut() { obj.insert("enabled_apps".to_string(), Value::Array(vec![])); obj.insert( "enabled_apps_version".to_string(), Value::Number(serde_json::Number::from(0)), ); } let query = r#" INSERT INTO tenants (name, status, config) VALUES ($1, 'active', $2) RETURNING id, name, status, config "#; let mut tx = self.pool.begin().await?; let tenant = sqlx::query_as::<_, Tenant>(query) .bind(req.name) .bind(config) .fetch_one(&mut *tx) .await?; sqlx::query( r#" INSERT INTO tenant_entitlements (tenant_id, enabled_apps, version) VALUES ($1, '{}'::text[], 0) ON CONFLICT (tenant_id) DO NOTHING "#, ) .bind(tenant.id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(tenant) } #[instrument(skip(self))] /// 根据租户 ID 查询租户信息。 /// /// 异常: /// - 若租户不存在,返回 `NotFound("Tenant not found")`。 pub async fn get_tenant(&self, tenant_id: Uuid) -> Result { let query = "SELECT id, name, status, config FROM tenants WHERE id = $1"; sqlx::query_as::<_, Tenant>(query) .bind(tenant_id) .fetch_optional(&self.pool) .await? .ok_or_else(|| AppError::NotFound("Tenant not found".into())) } #[instrument(skip(self, req))] /// 更新租户基础信息(名称 / 配置)。 /// /// 说明: /// - 仅更新 `UpdateTenantRequest` 中提供的字段,未提供字段保持不变。 /// /// 异常: /// - 若租户不存在,返回 `NotFound("Tenant not found")`。 pub async fn update_tenant( &self, tenant_id: Uuid, req: UpdateTenantRequest, ) -> Result { let query = r#" UPDATE tenants SET name = COALESCE($1, name), config = COALESCE($2, config), updated_at = NOW() WHERE id = $3 RETURNING id, name, status, config "#; sqlx::query_as::<_, Tenant>(query) .bind(req.name) .bind(req.config) .bind(tenant_id) .fetch_optional(&self.pool) .await? .ok_or_else(|| AppError::NotFound("Tenant not found".into())) } #[instrument(skip(self, req))] /// 更新租户状态字段(如 active / disabled)。 /// /// 异常: /// - 若租户不存在,返回 `NotFound("Tenant not found")`。 pub async fn update_tenant_status( &self, tenant_id: Uuid, req: UpdateTenantStatusRequest, ) -> Result { let query = r#" UPDATE tenants SET status = $1, updated_at = NOW() WHERE id = $2 RETURNING id, name, status, config "#; sqlx::query_as::<_, Tenant>(query) .bind(req.status) .bind(tenant_id) .fetch_optional(&self.pool) .await? .ok_or_else(|| AppError::NotFound("Tenant not found".into())) } #[instrument(skip(self))] /// 删除指定租户。 /// /// 异常: /// - 若租户不存在,返回 `NotFound("Tenant not found")`。 pub async fn delete_tenant(&self, tenant_id: Uuid) -> Result<(), AppError> { let result = sqlx::query("DELETE FROM tenants WHERE id = $1") .bind(tenant_id) .execute(&self.pool) .await?; if result.rows_affected() == 0 { return Err(AppError::NotFound("Tenant not found".into())); } Ok(()) } #[instrument(skip(self))] pub async fn get_enabled_apps( &self, tenant_id: Uuid, ) -> Result<(Vec, i32, String), AppError> { let now = Instant::now(); if let Some(hit) = self .enabled_apps_cache .read() .await .get(&tenant_id) .cloned() { if hit.expires_at > now { return Ok((hit.enabled_apps, hit.version, hit.updated_at)); } } let row = sqlx::query_as::<_, (Vec, i32, chrono::DateTime)>( r#" SELECT enabled_apps, version, updated_at FROM tenant_entitlements WHERE tenant_id = $1 "#, ) .bind(tenant_id) .fetch_optional(&self.pool) .await?; let (enabled_apps, version, updated_at) = match row { Some((apps, v, ts)) => (apps, v, ts.to_rfc3339()), None => { let exists: Option = sqlx::query_scalar("SELECT id FROM tenants WHERE id = $1") .bind(tenant_id) .fetch_optional(&self.pool) .await?; if exists.is_none() { return Err(AppError::NotFound("Tenant not found".into())); } let ts = chrono::Utc::now().to_rfc3339(); (vec![], 0, ts) } }; let ttl = Duration::from_secs(60); self.enabled_apps_cache.write().await.insert( tenant_id, EnabledAppsCacheEntry { enabled_apps: enabled_apps.clone(), version, updated_at: updated_at.clone(), expires_at: now + ttl, }, ); Ok((enabled_apps, version, updated_at)) } #[instrument(skip(self, enabled_apps))] pub async fn set_enabled_apps( &self, tenant_id: Uuid, enabled_apps: Vec, expected_version: Option, actor_user_id: Uuid, ) -> Result<(Vec, i32, String), AppError> { let normalized = normalize_apps(enabled_apps); self.validate_apps_exist(&normalized).await?; let mut tx = self.pool.begin().await?; let current = sqlx::query_as::<_, (Vec, i32)>( r#" SELECT enabled_apps, version FROM tenant_entitlements WHERE tenant_id = $1 FOR UPDATE "#, ) .bind(tenant_id) .fetch_optional(&mut *tx) .await?; if current.is_none() { let exists: Option = sqlx::query_scalar("SELECT id FROM tenants WHERE id = $1") .bind(tenant_id) .fetch_optional(&mut *tx) .await?; if exists.is_none() { return Err(AppError::NotFound("Tenant not found".into())); } sqlx::query( r#" INSERT INTO tenant_entitlements (tenant_id, enabled_apps, version) VALUES ($1, '{}'::text[], 0) ON CONFLICT (tenant_id) DO NOTHING "#, ) .bind(tenant_id) .execute(&mut *tx) .await?; } let (before_apps, before_version) = current.unwrap_or_else(|| (vec![], 0)); if let Some(ev) = expected_version { if ev != before_version { return Err(AppError::AlreadyExists( "enabled_apps:version_conflict".into(), )); } } let (new_version, updated_at): (i32, chrono::DateTime) = sqlx::query_as( r#" UPDATE tenant_entitlements SET enabled_apps = $1, version = version + 1, updated_at = NOW() WHERE tenant_id = $2 RETURNING version, updated_at "#, ) .bind(&normalized) .bind(tenant_id) .fetch_one(&mut *tx) .await?; sqlx::query( r#" UPDATE tenants SET config = jsonb_set( jsonb_set(COALESCE(config, '{}'::jsonb), '{enabled_apps}', to_jsonb($1::text[]), true), '{enabled_apps_version}', to_jsonb($2::int), true ), updated_at = NOW() WHERE id = $3 "#, ) .bind(&normalized) .bind(new_version) .bind(tenant_id) .execute(&mut *tx) .await?; sqlx::query( r#" INSERT INTO tenant_enabled_apps_history (tenant_id, version, enabled_apps, actor_user_id) VALUES ($1, $2, $3, $4) "#, ) .bind(tenant_id) .bind(new_version) .bind(&normalized) .bind(actor_user_id) .execute(&mut *tx) .await?; sqlx::query( r#" INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details) VALUES ($1, $2, 'tenant.enabled_apps.update', 'tenant', 'allow', $3) "#, ) .bind(tenant_id) .bind(actor_user_id) .bind(serde_json::json!({ "before": { "enabled_apps": before_apps, "version": before_version }, "after": { "enabled_apps": normalized, "version": new_version } })) .execute(&mut *tx) .await?; tx.commit().await?; let ttl = Duration::from_secs(60); let now = Instant::now(); let updated_at = updated_at.to_rfc3339(); self.enabled_apps_cache.write().await.insert( tenant_id, EnabledAppsCacheEntry { enabled_apps: normalized.clone(), version: new_version, updated_at: updated_at.clone(), expires_at: now + ttl, }, ); Ok((normalized, new_version, updated_at)) } async fn validate_apps_exist(&self, enabled_apps: &[String]) -> Result<(), AppError> { if enabled_apps.is_empty() { return Ok(()); } let rows: Vec = sqlx::query_scalar("SELECT id FROM apps WHERE id = ANY($1)") .bind(enabled_apps) .fetch_all(&self.pool) .await?; let found: HashSet = rows.into_iter().collect(); for app in enabled_apps { if !found.contains(app) { return Err(AppError::BadRequest(format!("Unknown app: {app}"))); } } Ok(()) } } fn normalize_apps(enabled_apps: Vec) -> Vec { let mut out = Vec::new(); let mut seen = HashSet::new(); for a in enabled_apps { let v = a.trim().to_lowercase(); if v.is_empty() { continue; } if seen.insert(v.clone()) { out.push(v); } } out.sort(); out }