109 lines
3.8 KiB
Rust
109 lines
3.8 KiB
Rust
// src/outputs/postgres.rs
|
||
use super::LogOutput;
|
||
use crate::model::LogRecord;
|
||
use async_trait::async_trait;
|
||
use chrono::{Datelike, TimeZone, Utc};
|
||
use sqlx::PgPool;
|
||
use tokio::sync::RwLock;
|
||
|
||
pub struct PostgresOutput {
|
||
pool: PgPool,
|
||
active_month: RwLock<String>,
|
||
}
|
||
|
||
impl PostgresOutput {
|
||
pub fn new(pool: PgPool) -> Self {
|
||
Self {
|
||
pool,
|
||
active_month: RwLock::new(String::new()),
|
||
}
|
||
}
|
||
|
||
async fn ensure_partition_exists(&self, timestamp: &chrono::DateTime<Utc>) {
|
||
let current_month_suffix = timestamp.format("%Y_%m").to_string();
|
||
|
||
// 1. 快速检查(读锁):如果内存里的月份和当前日志月份一致,直接返回,不做任何 DB 操作
|
||
{
|
||
let last_month = self.active_month.read().await;
|
||
if *last_month == current_month_suffix {
|
||
return;
|
||
}
|
||
} // 读锁在这里自动释放
|
||
|
||
// 2. 需要建表(写锁):可能是新启动,或者跨月了
|
||
let mut last_month = self.active_month.write().await;
|
||
|
||
// 双重检查:防止多个线程同时拿到写锁重复执行
|
||
if *last_month == current_month_suffix {
|
||
return;
|
||
}
|
||
|
||
// --- 开始计算日期范围 ---
|
||
let year = timestamp.year();
|
||
let month = timestamp.month();
|
||
|
||
// 本月 1 号
|
||
let start_date = Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).unwrap();
|
||
|
||
// 下个月 1 号 (处理 12 月跨年的情况)
|
||
let (next_year, next_month) = if month == 12 {
|
||
(year + 1, 1)
|
||
} else {
|
||
(year, month + 1)
|
||
};
|
||
let end_date = Utc
|
||
.with_ymd_and_hms(next_year, next_month, 1, 0, 0, 0)
|
||
.unwrap();
|
||
|
||
let table_name = format!("app_logs_{}", current_month_suffix);
|
||
let start_str = start_date.format("%Y-%m-%d").to_string();
|
||
let end_str = end_date.format("%Y-%m-%d").to_string();
|
||
|
||
// --- 执行建表 SQL ---
|
||
// 注意:Postgres 的分区表名不能用参数绑定 ($1),只能用 format! 拼接
|
||
// 因为我们是用 chrono 生成的字符串,没有 SQL 注入风险
|
||
let create_sql = format!(
|
||
"CREATE TABLE IF NOT EXISTS {} PARTITION OF app_logs FOR VALUES FROM ('{}') TO ('{}')",
|
||
table_name, start_str, end_str
|
||
);
|
||
|
||
// 如果建表失败,打印错误但不 panic,因为可能是并发创建导致的(虽然 IF NOT EXISTS 能防大部分)
|
||
if let Err(e) = sqlx::query(&create_sql).execute(&self.pool).await {
|
||
eprintln!("Error creating partition table {}: {}", table_name, e);
|
||
} else {
|
||
println!("Partition table checked/created: {}", table_name);
|
||
}
|
||
|
||
// 更新缓存
|
||
*last_month = current_month_suffix;
|
||
}
|
||
}
|
||
|
||
#[async_trait]
|
||
impl LogOutput for PostgresOutput {
|
||
async fn write(&self, record: &LogRecord) {
|
||
self.ensure_partition_exists(&record.timestamp).await;
|
||
|
||
let query = r#"
|
||
INSERT INTO app_logs (service_name, log_level, message, module, created_at, trace_id)
|
||
VALUES ($1, $2, $3, $4, $5, $6)
|
||
"#;
|
||
|
||
// 注意:这里的 write 是在后台任务中执行的,就算慢也不会阻塞主业务
|
||
// 我们忽略错误,因为如果日志系统挂了,不能让它导致业务逻辑崩溃 (Panic)
|
||
// 生产环境可以考虑加一个 fallback 机制(比如降级写文件)
|
||
if let Err(e) = sqlx::query(query)
|
||
.bind(&record.service_name)
|
||
.bind(record.level.to_string())
|
||
.bind(&record.message)
|
||
.bind(&record.module)
|
||
.bind(record.timestamp)
|
||
.bind(&record.trace_id)
|
||
.execute(&self.pool)
|
||
.await
|
||
{
|
||
eprintln!("Failed to write log to PostgreSQL: {}", e);
|
||
}
|
||
}
|
||
}
|