feat(handler): add app
This commit is contained in:
802
src/services/app.rs
Normal file
802
src/services/app.rs
Normal file
@@ -0,0 +1,802 @@
|
||||
use crate::models::{
|
||||
App, AppStatusChangeRequest, CreateAppRequest, ListAppsQuery, UpdateAppRequest,
|
||||
};
|
||||
use common_telemetry::AppError;
|
||||
use sqlx::PgPool;
|
||||
use tracing::instrument;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppService {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl AppService {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
|
||||
pub fn normalize_app_id(raw: &str) -> Result<String, AppError> {
|
||||
let id = raw.trim().to_ascii_lowercase();
|
||||
if id.len() < 2 || id.len() > 32 {
|
||||
return Err(AppError::BadRequest("Invalid app id length".into()));
|
||||
}
|
||||
if !id
|
||||
.chars()
|
||||
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '-')
|
||||
{
|
||||
return Err(AppError::BadRequest("Invalid app id format".into()));
|
||||
}
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
fn normalize_text_opt(v: Option<String>, max_len: usize) -> Result<Option<String>, AppError> {
|
||||
let Some(v) = v else {
|
||||
return Ok(None);
|
||||
};
|
||||
let t = v.trim().to_string();
|
||||
if t.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
if t.len() > max_len {
|
||||
return Err(AppError::BadRequest("Value too long".into()));
|
||||
}
|
||||
Ok(Some(t))
|
||||
}
|
||||
|
||||
fn normalize_text_required(v: &str, max_len: usize) -> Result<String, AppError> {
|
||||
let t = v.trim();
|
||||
if t.is_empty() {
|
||||
return Err(AppError::BadRequest("Value is required".into()));
|
||||
}
|
||||
if t.len() > max_len {
|
||||
return Err(AppError::BadRequest("Value too long".into()));
|
||||
}
|
||||
Ok(t.to_string())
|
||||
}
|
||||
|
||||
fn normalize_app_type(v: &str) -> Result<String, AppError> {
|
||||
let t = v.trim().to_ascii_lowercase();
|
||||
if t.is_empty() {
|
||||
return Ok("generic".to_string());
|
||||
}
|
||||
if t.len() > 50 {
|
||||
return Err(AppError::BadRequest("Invalid app_type length".into()));
|
||||
}
|
||||
if !t
|
||||
.chars()
|
||||
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '-')
|
||||
{
|
||||
return Err(AppError::BadRequest("Invalid app_type format".into()));
|
||||
}
|
||||
Ok(t)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, req))]
|
||||
pub async fn create_app(
|
||||
&self,
|
||||
req: CreateAppRequest,
|
||||
actor_user_id: Uuid,
|
||||
) -> Result<App, AppError> {
|
||||
let id = Self::normalize_app_id(&req.id)?;
|
||||
let name = Self::normalize_text_required(&req.name, 100)?;
|
||||
let description = Self::normalize_text_opt(req.description, 10_000)?;
|
||||
let app_type = Self::normalize_app_type(&req.app_type)?;
|
||||
let owner = Self::normalize_text_opt(req.owner, 100)?;
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let inserted = sqlx::query_as::<_, App>(
|
||||
r#"
|
||||
INSERT INTO apps (id, name, description, status, app_type, owner, updated_at)
|
||||
VALUES ($1, $2, $3, 'active', $4, $5, NOW())
|
||||
RETURNING
|
||||
id,
|
||||
name,
|
||||
description,
|
||||
app_type,
|
||||
owner,
|
||||
status,
|
||||
created_at::text as created_at,
|
||||
updated_at::text as updated_at
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&name)
|
||||
.bind(&description)
|
||||
.bind(&app_type)
|
||||
.bind(&owner)
|
||||
.fetch_one(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if let sqlx::Error::Database(db) = &e {
|
||||
if db.is_unique_violation() {
|
||||
return AppError::AlreadyExists("App already exists".into());
|
||||
}
|
||||
}
|
||||
e.into()
|
||||
})?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO app_change_logs (app_id, action, actor_user_id, after)
|
||||
VALUES ($1, 'create', $2, $3)
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(actor_user_id)
|
||||
.bind(serde_json::json!({
|
||||
"id": inserted.id,
|
||||
"name": inserted.name,
|
||||
"description": inserted.description,
|
||||
"app_type": inserted.app_type,
|
||||
"owner": inserted.owner,
|
||||
"status": inserted.status
|
||||
}))
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details)
|
||||
VALUES ('00000000-0000-0000-0000-000000000001', $1, 'app.create', 'app', 'allow', $2)
|
||||
"#,
|
||||
)
|
||||
.bind(actor_user_id)
|
||||
.bind(serde_json::json!({ "app_id": id }))
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(inserted)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn list_apps(&self, query: ListAppsQuery) -> Result<Vec<App>, AppError> {
|
||||
self.apply_due_status_changes().await?;
|
||||
|
||||
let page = query.page.unwrap_or(1);
|
||||
let page_size = query.page_size.unwrap_or(20);
|
||||
if page == 0 || page_size == 0 || page_size > 200 {
|
||||
return Err(AppError::BadRequest("Invalid pagination parameters".into()));
|
||||
}
|
||||
let offset = (page - 1) as i64 * page_size as i64;
|
||||
|
||||
let status = query
|
||||
.status
|
||||
.map(|s| s.trim().to_ascii_lowercase())
|
||||
.filter(|s| !s.is_empty());
|
||||
let app_type = query
|
||||
.app_type
|
||||
.map(|s| s.trim().to_ascii_lowercase())
|
||||
.filter(|s| !s.is_empty());
|
||||
|
||||
let sort_by = query.sort_by.unwrap_or_else(|| "created_at".to_string());
|
||||
let sort_order = query.sort_order.unwrap_or_else(|| "desc".to_string());
|
||||
|
||||
let sort_by = match sort_by.as_str() {
|
||||
"id" => "id",
|
||||
"name" => "name",
|
||||
"status" => "status",
|
||||
"app_type" => "app_type",
|
||||
"created_at" => "created_at",
|
||||
"updated_at" => "updated_at",
|
||||
_ => "created_at",
|
||||
};
|
||||
let sort_order = match sort_order.to_ascii_lowercase().as_str() {
|
||||
"asc" => "ASC",
|
||||
_ => "DESC",
|
||||
};
|
||||
|
||||
let created_from = query
|
||||
.created_from
|
||||
.and_then(|s| s.parse::<chrono::DateTime<chrono::Utc>>().ok());
|
||||
let created_to = query
|
||||
.created_to
|
||||
.and_then(|s| s.parse::<chrono::DateTime<chrono::Utc>>().ok());
|
||||
|
||||
let sql = format!(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
name,
|
||||
description,
|
||||
app_type,
|
||||
owner,
|
||||
status,
|
||||
created_at::text as created_at,
|
||||
updated_at::text as updated_at
|
||||
FROM apps
|
||||
WHERE ($1::text IS NULL OR status = $1)
|
||||
AND ($2::text IS NULL OR app_type = $2)
|
||||
AND ($3::timestamptz IS NULL OR created_at >= $3)
|
||||
AND ($4::timestamptz IS NULL OR created_at <= $4)
|
||||
ORDER BY {sort_by} {sort_order}
|
||||
LIMIT $5 OFFSET $6
|
||||
"#
|
||||
);
|
||||
|
||||
let rows = sqlx::query_as::<_, App>(&sql)
|
||||
.bind(status)
|
||||
.bind(app_type)
|
||||
.bind(created_from)
|
||||
.bind(created_to)
|
||||
.bind(page_size as i64)
|
||||
.bind(offset)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn get_app(&self, app_id: &str) -> Result<App, AppError> {
|
||||
self.apply_due_status_changes().await?;
|
||||
let id = Self::normalize_app_id(app_id)?;
|
||||
let row = sqlx::query_as::<_, App>(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
name,
|
||||
description,
|
||||
app_type,
|
||||
owner,
|
||||
status,
|
||||
created_at::text as created_at,
|
||||
updated_at::text as updated_at
|
||||
FROM apps
|
||||
WHERE id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound("App not found".into()))?;
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, req))]
|
||||
pub async fn update_app(
|
||||
&self,
|
||||
app_id: &str,
|
||||
req: UpdateAppRequest,
|
||||
actor_user_id: Uuid,
|
||||
) -> Result<App, AppError> {
|
||||
let id = Self::normalize_app_id(app_id)?;
|
||||
let name = match req.name {
|
||||
Some(v) => Some(Self::normalize_text_required(&v, 100)?),
|
||||
None => None,
|
||||
};
|
||||
let description = Self::normalize_text_opt(req.description, 10_000)?;
|
||||
let app_type = match req.app_type {
|
||||
Some(v) => Some(Self::normalize_app_type(&v)?),
|
||||
None => None,
|
||||
};
|
||||
let owner = Self::normalize_text_opt(req.owner, 100)?;
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let before: Option<serde_json::Value> = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT to_jsonb(a)
|
||||
FROM (
|
||||
SELECT id, name, description, status, app_type, owner
|
||||
FROM apps
|
||||
WHERE id = $1
|
||||
) a
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
if before.is_none() {
|
||||
return Err(AppError::NotFound("App not found".into()));
|
||||
}
|
||||
|
||||
let updated = sqlx::query_as::<_, App>(
|
||||
r#"
|
||||
UPDATE apps
|
||||
SET
|
||||
name = COALESCE($1, name),
|
||||
description = COALESCE($2, description),
|
||||
app_type = COALESCE($3, app_type),
|
||||
owner = COALESCE($4, owner),
|
||||
updated_at = NOW()
|
||||
WHERE id = $5
|
||||
RETURNING
|
||||
id,
|
||||
name,
|
||||
description,
|
||||
app_type,
|
||||
owner,
|
||||
status,
|
||||
created_at::text as created_at,
|
||||
updated_at::text as updated_at
|
||||
"#,
|
||||
)
|
||||
.bind(name)
|
||||
.bind(description)
|
||||
.bind(app_type)
|
||||
.bind(owner)
|
||||
.bind(&id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let after = serde_json::json!({
|
||||
"id": updated.id,
|
||||
"name": updated.name,
|
||||
"description": updated.description,
|
||||
"app_type": updated.app_type,
|
||||
"owner": updated.owner,
|
||||
"status": updated.status
|
||||
});
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO app_change_logs (app_id, action, actor_user_id, before, after)
|
||||
VALUES ($1, 'update', $2, $3, $4)
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(actor_user_id)
|
||||
.bind(before.unwrap_or_else(|| serde_json::json!({})))
|
||||
.bind(after)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details)
|
||||
VALUES ('00000000-0000-0000-0000-000000000001', $1, 'app.update', 'app', 'allow', $2)
|
||||
"#,
|
||||
)
|
||||
.bind(actor_user_id)
|
||||
.bind(serde_json::json!({ "app_id": id }))
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(updated)
|
||||
}
|
||||
|
||||
fn normalize_status(v: &str) -> Result<String, AppError> {
|
||||
let s = v.trim().to_ascii_lowercase();
|
||||
match s.as_str() {
|
||||
"active" | "disabled" => Ok(s),
|
||||
_ => Err(AppError::BadRequest("Invalid status".into())),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self, req))]
|
||||
pub async fn request_status_change(
|
||||
&self,
|
||||
app_id: &str,
|
||||
req: crate::models::RequestAppStatusChangeRequest,
|
||||
actor_user_id: Uuid,
|
||||
) -> Result<AppStatusChangeRequest, AppError> {
|
||||
let id = Self::normalize_app_id(app_id)?;
|
||||
let to_status = Self::normalize_status(&req.to_status)?;
|
||||
let effective_at = match req.effective_at {
|
||||
Some(v) => Some(
|
||||
v.parse::<chrono::DateTime<chrono::Utc>>()
|
||||
.map_err(|_| AppError::BadRequest("Invalid effective_at".into()))?,
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
let reason = Self::normalize_text_opt(req.reason, 10_000)?;
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let from_status: Option<String> =
|
||||
sqlx::query_scalar("SELECT status FROM apps WHERE id = $1 FOR UPDATE")
|
||||
.bind(&id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
let Some(from_status) = from_status else {
|
||||
return Err(AppError::NotFound("App not found".into()));
|
||||
};
|
||||
if from_status == to_status {
|
||||
return Err(AppError::BadRequest("No status change".into()));
|
||||
}
|
||||
|
||||
let row = sqlx::query_as::<_, AppStatusChangeRequest>(
|
||||
r#"
|
||||
INSERT INTO app_status_change_requests (app_id, from_status, to_status, requested_by, effective_at, reason)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING
|
||||
id,
|
||||
app_id,
|
||||
from_status,
|
||||
to_status,
|
||||
requested_by,
|
||||
requested_at::text as requested_at,
|
||||
effective_at::text as effective_at,
|
||||
status,
|
||||
approved_by,
|
||||
approved_at::text as approved_at,
|
||||
rejected_by,
|
||||
rejected_at::text as rejected_at,
|
||||
reason
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&from_status)
|
||||
.bind(&to_status)
|
||||
.bind(actor_user_id)
|
||||
.bind(effective_at)
|
||||
.bind(&reason)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details)
|
||||
VALUES ('00000000-0000-0000-0000-000000000001', $1, 'app.status_change.request', 'app', 'allow', $2)
|
||||
"#,
|
||||
)
|
||||
.bind(actor_user_id)
|
||||
.bind(serde_json::json!({ "app_id": id, "to_status": to_status }))
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, effective_at))]
|
||||
pub async fn approve_status_change(
|
||||
&self,
|
||||
request_id: Uuid,
|
||||
effective_at: Option<String>,
|
||||
actor_user_id: Uuid,
|
||||
) -> Result<AppStatusChangeRequest, AppError> {
|
||||
let effective_at = match effective_at {
|
||||
Some(v) => Some(
|
||||
v.parse::<chrono::DateTime<chrono::Utc>>()
|
||||
.map_err(|_| AppError::BadRequest("Invalid effective_at".into()))?,
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let pending = sqlx::query_as::<
|
||||
_,
|
||||
(
|
||||
Uuid,
|
||||
String,
|
||||
String,
|
||||
Option<chrono::DateTime<chrono::Utc>>,
|
||||
String,
|
||||
),
|
||||
>(
|
||||
r#"
|
||||
SELECT id, app_id, to_status, effective_at, status
|
||||
FROM app_status_change_requests
|
||||
WHERE id = $1
|
||||
FOR UPDATE
|
||||
"#,
|
||||
)
|
||||
.bind(request_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
let Some((id, app_id, to_status, current_effective_at, status)) = pending else {
|
||||
return Err(AppError::NotFound("Status change request not found".into()));
|
||||
};
|
||||
if status != "pending" {
|
||||
return Err(AppError::BadRequest("Request is not pending".into()));
|
||||
}
|
||||
|
||||
let effective_at = effective_at.or(current_effective_at);
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE app_status_change_requests
|
||||
SET status = 'approved',
|
||||
approved_by = $1,
|
||||
approved_at = NOW(),
|
||||
effective_at = COALESCE($2, effective_at)
|
||||
WHERE id = $3
|
||||
"#,
|
||||
)
|
||||
.bind(actor_user_id)
|
||||
.bind(effective_at)
|
||||
.bind(id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
self.apply_due_status_changes_tx(&mut tx).await?;
|
||||
|
||||
let row = sqlx::query_as::<_, AppStatusChangeRequest>(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
app_id,
|
||||
from_status,
|
||||
to_status,
|
||||
requested_by,
|
||||
requested_at::text as requested_at,
|
||||
effective_at::text as effective_at,
|
||||
status,
|
||||
approved_by,
|
||||
approved_at::text as approved_at,
|
||||
rejected_by,
|
||||
rejected_at::text as rejected_at,
|
||||
reason
|
||||
FROM app_status_change_requests
|
||||
WHERE id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details)
|
||||
VALUES ('00000000-0000-0000-0000-000000000001', $1, 'app.status_change.approve', 'app', 'allow', $2)
|
||||
"#,
|
||||
)
|
||||
.bind(actor_user_id)
|
||||
.bind(serde_json::json!({ "request_id": request_id, "app_id": app_id, "to_status": to_status }))
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, reason))]
|
||||
pub async fn reject_status_change(
|
||||
&self,
|
||||
request_id: Uuid,
|
||||
reason: Option<String>,
|
||||
actor_user_id: Uuid,
|
||||
) -> Result<AppStatusChangeRequest, AppError> {
|
||||
let reason = Self::normalize_text_opt(reason, 10_000)?;
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let pending: Option<String> = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT status
|
||||
FROM app_status_change_requests
|
||||
WHERE id = $1
|
||||
FOR UPDATE
|
||||
"#,
|
||||
)
|
||||
.bind(request_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
let Some(status) = pending else {
|
||||
return Err(AppError::NotFound("Status change request not found".into()));
|
||||
};
|
||||
if status != "pending" {
|
||||
return Err(AppError::BadRequest("Request is not pending".into()));
|
||||
}
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE app_status_change_requests
|
||||
SET status = 'rejected',
|
||||
rejected_by = $1,
|
||||
rejected_at = NOW(),
|
||||
reason = COALESCE($2, reason)
|
||||
WHERE id = $3
|
||||
"#,
|
||||
)
|
||||
.bind(actor_user_id)
|
||||
.bind(&reason)
|
||||
.bind(request_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let row = sqlx::query_as::<_, AppStatusChangeRequest>(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
app_id,
|
||||
from_status,
|
||||
to_status,
|
||||
requested_by,
|
||||
requested_at::text as requested_at,
|
||||
effective_at::text as effective_at,
|
||||
status,
|
||||
approved_by,
|
||||
approved_at::text as approved_at,
|
||||
rejected_by,
|
||||
rejected_at::text as rejected_at,
|
||||
reason
|
||||
FROM app_status_change_requests
|
||||
WHERE id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(request_id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details)
|
||||
VALUES ('00000000-0000-0000-0000-000000000001', $1, 'app.status_change.reject', 'app', 'allow', $2)
|
||||
"#,
|
||||
)
|
||||
.bind(actor_user_id)
|
||||
.bind(serde_json::json!({ "request_id": request_id }))
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn list_status_change_requests(
|
||||
&self,
|
||||
status: Option<String>,
|
||||
page: Option<u32>,
|
||||
page_size: Option<u32>,
|
||||
) -> Result<Vec<AppStatusChangeRequest>, AppError> {
|
||||
let page = page.unwrap_or(1);
|
||||
let page_size = page_size.unwrap_or(20);
|
||||
if page == 0 || page_size == 0 || page_size > 200 {
|
||||
return Err(AppError::BadRequest("Invalid pagination parameters".into()));
|
||||
}
|
||||
let offset = (page - 1) as i64 * page_size as i64;
|
||||
let status = status
|
||||
.map(|s| s.trim().to_ascii_lowercase())
|
||||
.filter(|s| !s.is_empty());
|
||||
|
||||
let rows = sqlx::query_as::<_, AppStatusChangeRequest>(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
app_id,
|
||||
from_status,
|
||||
to_status,
|
||||
requested_by,
|
||||
requested_at::text as requested_at,
|
||||
effective_at::text as effective_at,
|
||||
status,
|
||||
approved_by,
|
||||
approved_at::text as approved_at,
|
||||
rejected_by,
|
||||
rejected_at::text as rejected_at,
|
||||
reason
|
||||
FROM app_status_change_requests
|
||||
WHERE ($1::text IS NULL OR status = $1)
|
||||
ORDER BY requested_at DESC
|
||||
LIMIT $2 OFFSET $3
|
||||
"#,
|
||||
)
|
||||
.bind(status)
|
||||
.bind(page_size as i64)
|
||||
.bind(offset)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn delete_app(&self, app_id: &str, actor_user_id: Uuid) -> Result<(), AppError> {
|
||||
let id = Self::normalize_app_id(app_id)?;
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let before: Option<serde_json::Value> = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT to_jsonb(a)
|
||||
FROM (
|
||||
SELECT id, name, description, status, app_type, owner
|
||||
FROM apps
|
||||
WHERE id = $1
|
||||
) a
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
if before.is_none() {
|
||||
return Err(AppError::NotFound("App not found".into()));
|
||||
}
|
||||
|
||||
sqlx::query("UPDATE apps SET status = 'deleted', updated_at = NOW() WHERE id = $1")
|
||||
.bind(&id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let after: serde_json::Value = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT to_jsonb(a)
|
||||
FROM (
|
||||
SELECT id, name, description, status, app_type, owner
|
||||
FROM apps
|
||||
WHERE id = $1
|
||||
) a
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO app_change_logs (app_id, action, actor_user_id, before, after)
|
||||
VALUES ($1, 'delete', $2, $3, $4)
|
||||
"#,
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(actor_user_id)
|
||||
.bind(before.unwrap_or_else(|| serde_json::json!({})))
|
||||
.bind(after)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details)
|
||||
VALUES ('00000000-0000-0000-0000-000000000001', $1, 'app.delete', 'app', 'allow', $2)
|
||||
"#,
|
||||
)
|
||||
.bind(actor_user_id)
|
||||
.bind(serde_json::json!({ "app_id": id }))
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn apply_due_status_changes(&self) -> Result<(), AppError> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
self.apply_due_status_changes_tx(&mut tx).await?;
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn apply_due_status_changes_tx(
|
||||
&self,
|
||||
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||
) -> Result<(), AppError> {
|
||||
let due: Vec<(Uuid, String, String)> = sqlx::query_as(
|
||||
r#"
|
||||
SELECT id, app_id, to_status
|
||||
FROM app_status_change_requests
|
||||
WHERE status = 'approved'
|
||||
AND COALESCE(effective_at, NOW()) <= NOW()
|
||||
ORDER BY approved_at NULLS LAST, requested_at ASC
|
||||
FOR UPDATE
|
||||
"#,
|
||||
)
|
||||
.fetch_all(&mut **tx)
|
||||
.await?;
|
||||
for (request_id, app_id, to_status) in due {
|
||||
sqlx::query("UPDATE apps SET status = $1, updated_at = NOW() WHERE id = $2")
|
||||
.bind(&to_status)
|
||||
.bind(&app_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE app_status_change_requests
|
||||
SET status = 'applied'
|
||||
WHERE id = $1
|
||||
"#,
|
||||
)
|
||||
.bind(request_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::AppService;
|
||||
|
||||
#[test]
|
||||
fn normalize_app_id_rejects_invalid() {
|
||||
assert!(AppService::normalize_app_id("A").is_err());
|
||||
assert!(AppService::normalize_app_id("A@").is_err());
|
||||
assert!(AppService::normalize_app_id("dms").is_ok());
|
||||
assert_eq!(AppService::normalize_app_id(" CMS ").unwrap(), "cms");
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,12 @@
|
||||
pub mod auth;
|
||||
pub mod app;
|
||||
pub mod authorization;
|
||||
pub mod role;
|
||||
pub mod tenant;
|
||||
pub mod user;
|
||||
|
||||
pub use auth::AuthService;
|
||||
pub use app::AppService;
|
||||
pub use authorization::AuthorizationService;
|
||||
pub use role::RoleService;
|
||||
pub use tenant::TenantService;
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
use crate::models::{UpdateUserRequest, User};
|
||||
use crate::utils::{hash_password, verify_password};
|
||||
use base64::Engine;
|
||||
use common_telemetry::AppError;
|
||||
use rand::RngCore;
|
||||
use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use tracing::instrument;
|
||||
use uuid::Uuid;
|
||||
@@ -106,4 +110,140 @@ impl UserService {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self, current_password, new_password))]
|
||||
pub async fn reset_my_password(
|
||||
&self,
|
||||
tenant_id: Uuid,
|
||||
user_id: Uuid,
|
||||
current_password: String,
|
||||
new_password: String,
|
||||
) -> Result<(), AppError> {
|
||||
if current_password.trim().is_empty() || new_password.trim().is_empty() {
|
||||
return Err(AppError::BadRequest("Password is required".into()));
|
||||
}
|
||||
if new_password.trim().len() < 8 {
|
||||
return Err(AppError::BadRequest("Password too short".into()));
|
||||
}
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let stored_hash: Option<String> = sqlx::query_scalar(
|
||||
"SELECT password_hash FROM users WHERE tenant_id = $1 AND id = $2 FOR UPDATE",
|
||||
)
|
||||
.bind(tenant_id)
|
||||
.bind(user_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| AppError::DbError(e))?;
|
||||
|
||||
let Some(stored_hash) = stored_hash else {
|
||||
return Err(AppError::NotFound("User not found".into()));
|
||||
};
|
||||
|
||||
if !verify_password(¤t_password, &stored_hash) {
|
||||
return Err(AppError::InvalidCredentials);
|
||||
}
|
||||
|
||||
let new_hash = hash_password(new_password.trim())
|
||||
.map_err(|e| AppError::AnyhowError(anyhow::anyhow!(e)))?;
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE users SET password_hash = $1, updated_at = NOW() WHERE tenant_id = $2 AND id = $3",
|
||||
)
|
||||
.bind(&new_hash)
|
||||
.bind(tenant_id)
|
||||
.bind(user_id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| AppError::DbError(e))?;
|
||||
|
||||
sqlx::query("UPDATE refresh_tokens SET is_revoked = TRUE WHERE user_id = $1")
|
||||
.bind(user_id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| AppError::DbError(e))?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details)
|
||||
VALUES ($1, $2, 'user.password.reset.self', 'user', 'allow', $3)
|
||||
"#,
|
||||
)
|
||||
.bind(tenant_id)
|
||||
.bind(user_id)
|
||||
.bind(json!({ "target_user_id": user_id }))
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| AppError::DbError(e))?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn generate_temporary_password(length: usize) -> Result<String, AppError> {
|
||||
let length = length.clamp(16, 64);
|
||||
let mut bytes = vec![0u8; length];
|
||||
rand::rng().fill_bytes(&mut bytes);
|
||||
let mut out = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes);
|
||||
out.truncate(length);
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn reset_user_password_as_admin(
|
||||
&self,
|
||||
tenant_id: Uuid,
|
||||
actor_user_id: Uuid,
|
||||
target_user_id: Uuid,
|
||||
length: Option<usize>,
|
||||
) -> Result<String, AppError> {
|
||||
let temp = Self::generate_temporary_password(length.unwrap_or(20))?;
|
||||
let new_hash =
|
||||
hash_password(&temp).map_err(|e| AppError::AnyhowError(anyhow::anyhow!(e)))?;
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let updated: i64 = sqlx::query_scalar(
|
||||
r#"
|
||||
UPDATE users
|
||||
SET password_hash = $1, updated_at = NOW()
|
||||
WHERE tenant_id = $2 AND id = $3
|
||||
RETURNING 1
|
||||
"#,
|
||||
)
|
||||
.bind(&new_hash)
|
||||
.bind(tenant_id)
|
||||
.bind(target_user_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| AppError::DbError(e))?
|
||||
.unwrap_or(0);
|
||||
|
||||
if updated == 0 {
|
||||
return Err(AppError::NotFound("User not found".into()));
|
||||
}
|
||||
|
||||
sqlx::query("UPDATE refresh_tokens SET is_revoked = TRUE WHERE user_id = $1")
|
||||
.bind(target_user_id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| AppError::DbError(e))?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_logs (tenant_id, user_id, action, resource, status, details)
|
||||
VALUES ($1, $2, 'user.password.reset.admin', 'user', 'allow', $3)
|
||||
"#,
|
||||
)
|
||||
.bind(tenant_id)
|
||||
.bind(actor_user_id)
|
||||
.bind(json!({ "target_user_id": target_user_id }))
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| AppError::DbError(e))?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(temp)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user