From d83c33ef27e86aaf1ced02c1d498b7f3e17765b6 Mon Sep 17 00:00:00 2001 From: shay7sev Date: Wed, 21 Jan 2026 20:26:10 +0800 Subject: [PATCH] feat(outputs): add file --- src/outputs/file.rs | 68 ++++++++++++++++++++++++++++++++++++++++ src/outputs/mod.rs | 1 + src/outputs/postgres.rs | 69 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 src/outputs/file.rs diff --git a/src/outputs/file.rs b/src/outputs/file.rs new file mode 100644 index 0000000..08a6f75 --- /dev/null +++ b/src/outputs/file.rs @@ -0,0 +1,68 @@ +// src/outputs/file.rs + +use super::LogOutput; +use crate::model::LogRecord; +use async_trait::async_trait; +use std::path::Path; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::sync::Mutex; + +pub struct FileOutput { + // 使用 Mutex 包裹,因为 write 方法是 &self (不可变引用),但写入文件需要修改内部状态 + // 使用 BufWriter 提高 IO 性能 + writer: Mutex>, +} + +impl FileOutput { + /// 创建文件输出实例 + /// path: 日志文件路径,例如 "logs/app.log" + pub async fn new(path: impl AsRef) -> anyhow::Result { + // 确保父目录存在 + if let Some(parent) = path.as_ref().parent() { + tokio::fs::create_dir_all(parent).await?; + } + + // 以追加模式打开文件,如果不存在则创建 + let file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .await?; + + Ok(Self { + writer: Mutex::new(BufWriter::new(file)), + }) + } +} + +#[async_trait] +impl LogOutput for FileOutput { + async fn write(&self, record: &LogRecord) { + // 1. 格式化日志字符串 (简单文本格式) + let log_line = format!( + "{} [{}] ({}) - {}\n", + record.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"), // 精确到毫秒 + record.level, + record.module, + record.message + ); + + // 2. 获取锁并写入 + let mut writer = self.writer.lock().await; + + // 写入缓冲区 + if let Err(e) = writer.write_all(log_line.as_bytes()).await { + eprintln!("Failed to write to log file: {}", e); + return; + } + + // 3. 刷新缓冲区 (Flush) + // 生产环境权衡: + // - 每次 flush: 数据最安全,但性能略低 + // - 只要 writer 没丢,Tokio 会自动管理,但如果程序崩溃可能丢最后几行 + if let Err(e) = writer.flush().await { + eprintln!("Failed to flush log file: {}", e); + } + } +} diff --git a/src/outputs/mod.rs b/src/outputs/mod.rs index 63e4827..44fc0ab 100644 --- a/src/outputs/mod.rs +++ b/src/outputs/mod.rs @@ -1,5 +1,6 @@ // src/outputs/mod.rs pub mod console; +pub mod file; pub mod postgres; use crate::model::LogRecord; diff --git a/src/outputs/postgres.rs b/src/outputs/postgres.rs index f342e17..07e92c0 100644 --- a/src/outputs/postgres.rs +++ b/src/outputs/postgres.rs @@ -2,21 +2,88 @@ use super::LogOutput; use crate::model::LogRecord; use async_trait::async_trait; +use chrono::{Datelike, TimeZone, Utc}; use sqlx::PgPool; +use tokio::sync::RwLock; pub struct PostgresOutput { pool: PgPool, + active_month: RwLock, } impl PostgresOutput { pub fn new(pool: PgPool) -> Self { - Self { pool } + Self { + pool, + active_month: RwLock::new(String::new()), + } + } + + async fn ensure_partition_exists(&self, timestamp: &chrono::DateTime) { + let current_month_suffix = timestamp.format("%Y_%m").to_string(); + + // 1. 快速检查(读锁):如果内存里的月份和当前日志月份一致,直接返回,不做任何 DB 操作 + { + let last_month = self.active_month.read().await; + if *last_month == current_month_suffix { + return; + } + } // 读锁在这里自动释放 + + // 2. 需要建表(写锁):可能是新启动,或者跨月了 + let mut last_month = self.active_month.write().await; + + // 双重检查:防止多个线程同时拿到写锁重复执行 + if *last_month == current_month_suffix { + return; + } + + // --- 开始计算日期范围 --- + let year = timestamp.year(); + let month = timestamp.month(); + + // 本月 1 号 + let start_date = Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).unwrap(); + + // 下个月 1 号 (处理 12 月跨年的情况) + let (next_year, next_month) = if month == 12 { + (year + 1, 1) + } else { + (year, month + 1) + }; + let end_date = Utc + .with_ymd_and_hms(next_year, next_month, 1, 0, 0, 0) + .unwrap(); + + let table_name = format!("app_logs_{}", current_month_suffix); + let start_str = start_date.format("%Y-%m-%d").to_string(); + let end_str = end_date.format("%Y-%m-%d").to_string(); + + // --- 执行建表 SQL --- + // 注意:Postgres 的分区表名不能用参数绑定 ($1),只能用 format! 拼接 + // 因为我们是用 chrono 生成的字符串,没有 SQL 注入风险 + let create_sql = format!( + "CREATE TABLE IF NOT EXISTS {} PARTITION OF app_logs FOR VALUES FROM ('{}') TO ('{}')", + table_name, start_str, end_str + ); + + // 如果建表失败,打印错误但不 panic,因为可能是并发创建导致的(虽然 IF NOT EXISTS 能防大部分) + if let Err(e) = sqlx::query(&create_sql).execute(&self.pool).await { + eprintln!("Error creating partition table {}: {}", table_name, e); + } else { + println!("Partition table checked/created: {}", table_name); + } + + // 更新缓存 + *last_month = current_month_suffix; } } #[async_trait] impl LogOutput for PostgresOutput { async fn write(&self, record: &LogRecord) { + self.ensure_partition_exists(&record.timestamp).await; + let query = r#" INSERT INTO app_logs (service_name, log_level, message, module, created_at) VALUES ($1, $2, $3, $4, $5)