diff --git a/src/cleaner.rs b/src/cleaner.rs new file mode 100644 index 0000000..469a7f2 --- /dev/null +++ b/src/cleaner.rs @@ -0,0 +1,190 @@ +// src/cleaner.rs + +use chrono::{Duration as ChronoDuration, TimeZone, Utc}; +use sqlx::{PgPool, Row}; +use std::path::{Path, PathBuf}; +use std::time::Duration; +use tokio::time; + +// 定义配置结构体 +pub struct LogCleaner { + retention_days: i64, + db_pool: Option, // 变为 Option,允许为空 + file_config: Option<(PathBuf, String)>, // (目录, 文件名前缀) +} + +impl LogCleaner { + /// 创建一个新的清理器,指定保留天数 + pub fn new(retention_days: i64) -> Self { + Self { + retention_days, + db_pool: None, + file_config: None, + } + } + + /// 启用数据库清理功能 (可选) + pub fn with_db_cleanup(mut self, pool: PgPool) -> Self { + self.db_pool = Some(pool); + self + } + + /// 启用文件清理功能 (可选) + /// dir: 日志目录 + /// file_prefix: 文件前缀 (如 "order-service"),用于防止误删其他文件 + pub fn with_file_cleanup(mut self, dir: impl Into, file_prefix: &str) -> Self { + self.file_config = Some((dir.into(), file_prefix.to_string())); + self + } + + /// 启动后台清理任务 + pub fn start(self) { + // 如果啥都没配置,直接返回,不启动线程 + if self.db_pool.is_none() && self.file_config.is_none() { + println!("LogCleaner: No cleanup targets configured, task not started."); + return; + } + + tokio::spawn(async move { + let check_interval = Duration::from_secs(24 * 3600); // 每天一次 + + loop { + // 计算截止时间 + let cutoff_date = Utc::now() - ChronoDuration::days(self.retention_days); + + println!( + "Starting log retention cleanup (Cutoff: {})...", + cutoff_date + ); + + // 1. 如果配置了 DB,则清理 DB + if let Some(pool) = &self.db_pool + && let Err(e) = cleanup_database_partitions(pool, cutoff_date).await + { + eprintln!("DB Partition Cleanup failed: {}", e); + } + + // 2. 如果配置了文件,则清理文件 + if let Some((dir, prefix)) = &self.file_config + && let Err(e) = cleanup_files(dir, prefix, cutoff_date).await + { + eprintln!("File Cleanup failed: {}", e); + } + + println!("Cleanup run finished. Sleeping for 24 hours."); + time::sleep(check_interval).await; + } + }); + } +} + +// --- 下面是具体的清理逻辑 (逻辑保持不变,稍微适配参数) --- + +async fn cleanup_database_partitions( + pool: &PgPool, + cutoff: chrono::DateTime, +) -> anyhow::Result<()> { + // 1. 查询所有以 app_logs_ 开头的分区表 + // 查询 PostgreSQL 系统表 pg_class + let sql = "SELECT relname FROM pg_class WHERE relname LIKE 'app_logs_%' AND relkind = 'r'"; + let rows = sqlx::query(sql).fetch_all(pool).await?; + + for row in rows { + let table_name: String = row.get("relname"); + + // table_name 类似 "app_logs_2026_01" + // 解析年份和月份 + if let Some((year, month)) = parse_table_suffix(&table_name) { + // 计算该表的【覆盖范围】 + // start: 本月1号 + let partition_start = Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).unwrap(); + + // end: 下个月1号 + let (next_year, next_month) = if month == 12 { + (year + 1, 1) + } else { + (year, month + 1) + }; + let partition_end = Utc + .with_ymd_and_hms(next_year, next_month, 1, 0, 0, 0) + .unwrap(); + + // --- 判定逻辑 --- + + if partition_end < cutoff { + // 情况 A: 整个表都在截止线之前 (全过期) -> 直接删表 + // DROP TABLE 比 DELETE 快得多,且能释放物理磁盘空间 + println!("Dropping expired partition table: {}", table_name); + let drop_sql = format!("DROP TABLE IF EXISTS {}", table_name); + sqlx::query(&drop_sql).execute(pool).await?; + } else if partition_start < cutoff { + // 情况 B: 截止线卡在表中间 (部分过期) -> 删除旧行 + // 例如:cutoff 是 1月7日,表是 app_logs_2026_01 (1月1日-2月1日) + // 我们需要删除 1月1日 到 1月7日 的数据 + println!("Cleaning old rows from partition table: {}", table_name); + // 注意:这里必须指定具体的子表名进行 DELETE,效率更高 + let delete_sql = format!("DELETE FROM {} WHERE created_at < $1", table_name); + let result = sqlx::query(&delete_sql).bind(cutoff).execute(pool).await?; + println!( + "Deleted {} rows from {}", + result.rows_affected(), + table_name + ); + } else { + // 情况 C: 表完全在截止线之后 (全是新数据) -> 啥也不干 + // println!("Table {} is active, skipping.", table_name); + } + } + } + Ok(()) +} + +fn parse_table_suffix(table_name: &str) -> Option<(i32, u32)> { + let parts: Vec<&str> = table_name.split('_').collect(); + // 预期格式: ["app", "logs", "2026", "01"] + if parts.len() >= 4 { + let year_str = parts[parts.len() - 2]; + let month_str = parts[parts.len() - 1]; + + if let (Ok(y), Ok(m)) = (year_str.parse::(), month_str.parse::()) { + return Some((y, m)); + } + } + None +} + +async fn cleanup_files( + log_dir: &Path, + prefix: &str, + threshold: chrono::DateTime, +) -> anyhow::Result<()> { + if !log_dir.exists() { + return Ok(()); + } + + let mut entries = tokio::fs::read_dir(log_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_file() { + if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) { + // 【重要】增加了前缀匹配,更加安全 + if !file_name.starts_with(prefix) || !file_name.ends_with(".log") { + continue; + } + } + + let metadata = entry.metadata().await?; + if let Ok(modified_sys) = metadata.modified() { + let modified: chrono::DateTime = modified_sys.into(); + if modified < threshold { + if let Err(e) = tokio::fs::remove_file(&path).await { + eprintln!("Failed cleanup file {:?}: {}", path, e); + } else { + println!("Removed expired log file: {:?}", path); + } + } + } + } + } + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 721fa14..29f77f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,9 @@ // src/lib.rs +pub mod cleaner; pub mod core; pub mod model; pub mod outputs; +pub use cleaner::LogCleaner; use crate::core::Logger; use once_cell::sync::OnceCell; diff --git a/src/outputs/file.rs b/src/outputs/file.rs index 08a6f75..7688a2f 100644 --- a/src/outputs/file.rs +++ b/src/outputs/file.rs @@ -3,66 +3,128 @@ use super::LogOutput; use crate::model::LogRecord; use async_trait::async_trait; -use std::path::Path; +use chrono::{DateTime, Utc}; +use std::path::{Path, PathBuf}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Mutex; +const MAX_FILE_SIZE: u64 = 100 * 1024 * 1024; // 100MB + pub struct FileOutput { - // 使用 Mutex 包裹,因为 write 方法是 &self (不可变引用),但写入文件需要修改内部状态 - // 使用 BufWriter 提高 IO 性能 - writer: Mutex>, + base_path: PathBuf, // 基础目录,如 "logs/" + file_prefix: String, // 文件前缀,如 "order-service" + current_writer: Mutex>>, // 当前的写入句柄 + current_date: Mutex, // 当前文件对应的日期 "2023-10-27" + current_path: Mutex, // 当前正在写入的完整路径 } impl FileOutput { - /// 创建文件输出实例 - /// path: 日志文件路径,例如 "logs/app.log" - pub async fn new(path: impl AsRef) -> anyhow::Result { - // 确保父目录存在 - if let Some(parent) = path.as_ref().parent() { - tokio::fs::create_dir_all(parent).await?; + pub async fn new(dir: impl AsRef, prefix: &str) -> anyhow::Result { + let dir = dir.as_ref().to_path_buf(); + tokio::fs::create_dir_all(&dir).await?; + + // 初始化时,先不打开文件,等第一条日志来了再打开 (Lazy Open) + // 或者这里可以先做一次 rotate_if_needed,为了简单我们设为空 + Ok(Self { + base_path: dir, + file_prefix: prefix.to_string(), + current_writer: Mutex::new(None), + current_date: Mutex::new(String::new()), + current_path: Mutex::new(PathBuf::new()), + }) + } + + // 生成当天的基础文件名: logs/order-service-2023-10-27.log + fn get_target_path(&self, date_str: &str) -> PathBuf { + self.base_path + .join(format!("{}-{}.log", self.file_prefix, date_str)) + } + + // 核心轮转逻辑 + async fn rotate_if_needed(&self, timestamp: &DateTime) -> anyhow::Result<()> { + let date_str = timestamp.format("%Y-%m-%d").to_string(); + let target_path = self.get_target_path(&date_str); + + let mut date_guard = self.current_date.lock().await; + let mut path_guard = self.current_path.lock().await; + let mut writer_guard = self.current_writer.lock().await; + + let mut need_open = false; + + // 1. 检查日期是否变化 (跨天) + if *date_guard != date_str { + *date_guard = date_str.clone(); + *path_guard = target_path.clone(); + need_open = true; } - // 以追加模式打开文件,如果不存在则创建 - let file = OpenOptions::new() - .create(true) - .append(true) - .open(path) - .await?; + // 2. 检查大小 (是否超过 100MB) + if !need_open { + if let Some(writer) = writer_guard.as_mut() { + // 刷新一下缓冲区,确保获取的文件大小是准确的 + let _ = writer.flush().await; + } + // 获取当前文件元数据 + if let Ok(metadata) = tokio::fs::metadata(&*path_guard).await + && metadata.len() >= MAX_FILE_SIZE + { + // 需要切割:将当前 active 的日志重命名为 archive + // 格式:logs/order-service-2023-10-27.log -> logs/order-service-2023-10-27.TIMESTAMP.log + let backup_name = format!( + "{}-{}.{}.log", + self.file_prefix, + date_str, + Utc::now().format("%H%M%S") // 加个时间后缀避免重名 + ); + let backup_path = self.base_path.join(backup_name); - Ok(Self { - writer: Mutex::new(BufWriter::new(file)), - }) + // 重命名 + if let Err(e) = tokio::fs::rename(&*path_guard, backup_path).await { + eprintln!("Failed to rotate log file: {}", e); + } + need_open = true; + } + } + + // 3. 如果需要,重新打开文件 + if need_open || writer_guard.is_none() { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&*path_guard) + .await?; + *writer_guard = Some(BufWriter::new(file)); + } + + Ok(()) } } #[async_trait] impl LogOutput for FileOutput { async fn write(&self, record: &LogRecord) { - // 1. 格式化日志字符串 (简单文本格式) - let log_line = format!( - "{} [{}] ({}) - {}\n", - record.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"), // 精确到毫秒 - record.level, - record.module, - record.message - ); - - // 2. 获取锁并写入 - let mut writer = self.writer.lock().await; - - // 写入缓冲区 - if let Err(e) = writer.write_all(log_line.as_bytes()).await { - eprintln!("Failed to write to log file: {}", e); + // 1. 写入前检查轮转 + if let Err(e) = self.rotate_if_needed(&record.timestamp).await { + eprintln!("Log rotation failed: {}", e); return; } - // 3. 刷新缓冲区 (Flush) - // 生产环境权衡: - // - 每次 flush: 数据最安全,但性能略低 - // - 只要 writer 没丢,Tokio 会自动管理,但如果程序崩溃可能丢最后几行 - if let Err(e) = writer.flush().await { - eprintln!("Failed to flush log file: {}", e); + let log_line = format!( + "{} [{}] - {}\n", + record.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"), + record.level, + record.message + ); + + // 2. 写入 + let mut writer_guard = self.current_writer.lock().await; + if let Some(writer) = writer_guard.as_mut() { + if let Err(e) = writer.write_all(log_line.as_bytes()).await { + eprintln!("Failed to write to file: {}", e); + } + // 生产环境可以不每次 flush,交给 buffer 满自动刷新,提高性能 + let _ = writer.flush().await; } } }