fix(cleaner): add LogCleaner
This commit is contained in:
190
src/cleaner.rs
Normal file
190
src/cleaner.rs
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
// src/cleaner.rs
|
||||||
|
|
||||||
|
use chrono::{Duration as ChronoDuration, TimeZone, Utc};
|
||||||
|
use sqlx::{PgPool, Row};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
|
// 定义配置结构体
|
||||||
|
pub struct LogCleaner {
|
||||||
|
retention_days: i64,
|
||||||
|
db_pool: Option<PgPool>, // 变为 Option,允许为空
|
||||||
|
file_config: Option<(PathBuf, String)>, // (目录, 文件名前缀)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LogCleaner {
|
||||||
|
/// 创建一个新的清理器,指定保留天数
|
||||||
|
pub fn new(retention_days: i64) -> Self {
|
||||||
|
Self {
|
||||||
|
retention_days,
|
||||||
|
db_pool: None,
|
||||||
|
file_config: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 启用数据库清理功能 (可选)
|
||||||
|
pub fn with_db_cleanup(mut self, pool: PgPool) -> Self {
|
||||||
|
self.db_pool = Some(pool);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 启用文件清理功能 (可选)
|
||||||
|
/// dir: 日志目录
|
||||||
|
/// file_prefix: 文件前缀 (如 "order-service"),用于防止误删其他文件
|
||||||
|
pub fn with_file_cleanup(mut self, dir: impl Into<PathBuf>, file_prefix: &str) -> Self {
|
||||||
|
self.file_config = Some((dir.into(), file_prefix.to_string()));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 启动后台清理任务
|
||||||
|
pub fn start(self) {
|
||||||
|
// 如果啥都没配置,直接返回,不启动线程
|
||||||
|
if self.db_pool.is_none() && self.file_config.is_none() {
|
||||||
|
println!("LogCleaner: No cleanup targets configured, task not started.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let check_interval = Duration::from_secs(24 * 3600); // 每天一次
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// 计算截止时间
|
||||||
|
let cutoff_date = Utc::now() - ChronoDuration::days(self.retention_days);
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"Starting log retention cleanup (Cutoff: {})...",
|
||||||
|
cutoff_date
|
||||||
|
);
|
||||||
|
|
||||||
|
// 1. 如果配置了 DB,则清理 DB
|
||||||
|
if let Some(pool) = &self.db_pool
|
||||||
|
&& let Err(e) = cleanup_database_partitions(pool, cutoff_date).await
|
||||||
|
{
|
||||||
|
eprintln!("DB Partition Cleanup failed: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 如果配置了文件,则清理文件
|
||||||
|
if let Some((dir, prefix)) = &self.file_config
|
||||||
|
&& let Err(e) = cleanup_files(dir, prefix, cutoff_date).await
|
||||||
|
{
|
||||||
|
eprintln!("File Cleanup failed: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Cleanup run finished. Sleeping for 24 hours.");
|
||||||
|
time::sleep(check_interval).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- 下面是具体的清理逻辑 (逻辑保持不变,稍微适配参数) ---
|
||||||
|
|
||||||
|
async fn cleanup_database_partitions(
|
||||||
|
pool: &PgPool,
|
||||||
|
cutoff: chrono::DateTime<Utc>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// 1. 查询所有以 app_logs_ 开头的分区表
|
||||||
|
// 查询 PostgreSQL 系统表 pg_class
|
||||||
|
let sql = "SELECT relname FROM pg_class WHERE relname LIKE 'app_logs_%' AND relkind = 'r'";
|
||||||
|
let rows = sqlx::query(sql).fetch_all(pool).await?;
|
||||||
|
|
||||||
|
for row in rows {
|
||||||
|
let table_name: String = row.get("relname");
|
||||||
|
|
||||||
|
// table_name 类似 "app_logs_2026_01"
|
||||||
|
// 解析年份和月份
|
||||||
|
if let Some((year, month)) = parse_table_suffix(&table_name) {
|
||||||
|
// 计算该表的【覆盖范围】
|
||||||
|
// start: 本月1号
|
||||||
|
let partition_start = Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).unwrap();
|
||||||
|
|
||||||
|
// end: 下个月1号
|
||||||
|
let (next_year, next_month) = if month == 12 {
|
||||||
|
(year + 1, 1)
|
||||||
|
} else {
|
||||||
|
(year, month + 1)
|
||||||
|
};
|
||||||
|
let partition_end = Utc
|
||||||
|
.with_ymd_and_hms(next_year, next_month, 1, 0, 0, 0)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// --- 判定逻辑 ---
|
||||||
|
|
||||||
|
if partition_end < cutoff {
|
||||||
|
// 情况 A: 整个表都在截止线之前 (全过期) -> 直接删表
|
||||||
|
// DROP TABLE 比 DELETE 快得多,且能释放物理磁盘空间
|
||||||
|
println!("Dropping expired partition table: {}", table_name);
|
||||||
|
let drop_sql = format!("DROP TABLE IF EXISTS {}", table_name);
|
||||||
|
sqlx::query(&drop_sql).execute(pool).await?;
|
||||||
|
} else if partition_start < cutoff {
|
||||||
|
// 情况 B: 截止线卡在表中间 (部分过期) -> 删除旧行
|
||||||
|
// 例如:cutoff 是 1月7日,表是 app_logs_2026_01 (1月1日-2月1日)
|
||||||
|
// 我们需要删除 1月1日 到 1月7日 的数据
|
||||||
|
println!("Cleaning old rows from partition table: {}", table_name);
|
||||||
|
// 注意:这里必须指定具体的子表名进行 DELETE,效率更高
|
||||||
|
let delete_sql = format!("DELETE FROM {} WHERE created_at < $1", table_name);
|
||||||
|
let result = sqlx::query(&delete_sql).bind(cutoff).execute(pool).await?;
|
||||||
|
println!(
|
||||||
|
"Deleted {} rows from {}",
|
||||||
|
result.rows_affected(),
|
||||||
|
table_name
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// 情况 C: 表完全在截止线之后 (全是新数据) -> 啥也不干
|
||||||
|
// println!("Table {} is active, skipping.", table_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_table_suffix(table_name: &str) -> Option<(i32, u32)> {
|
||||||
|
let parts: Vec<&str> = table_name.split('_').collect();
|
||||||
|
// 预期格式: ["app", "logs", "2026", "01"]
|
||||||
|
if parts.len() >= 4 {
|
||||||
|
let year_str = parts[parts.len() - 2];
|
||||||
|
let month_str = parts[parts.len() - 1];
|
||||||
|
|
||||||
|
if let (Ok(y), Ok(m)) = (year_str.parse::<i32>(), month_str.parse::<u32>()) {
|
||||||
|
return Some((y, m));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn cleanup_files(
|
||||||
|
log_dir: &Path,
|
||||||
|
prefix: &str,
|
||||||
|
threshold: chrono::DateTime<Utc>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
if !log_dir.exists() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut entries = tokio::fs::read_dir(log_dir).await?;
|
||||||
|
while let Some(entry) = entries.next_entry().await? {
|
||||||
|
let path = entry.path();
|
||||||
|
if path.is_file() {
|
||||||
|
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
|
||||||
|
// 【重要】增加了前缀匹配,更加安全
|
||||||
|
if !file_name.starts_with(prefix) || !file_name.ends_with(".log") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let metadata = entry.metadata().await?;
|
||||||
|
if let Ok(modified_sys) = metadata.modified() {
|
||||||
|
let modified: chrono::DateTime<Utc> = modified_sys.into();
|
||||||
|
if modified < threshold {
|
||||||
|
if let Err(e) = tokio::fs::remove_file(&path).await {
|
||||||
|
eprintln!("Failed cleanup file {:?}: {}", path, e);
|
||||||
|
} else {
|
||||||
|
println!("Removed expired log file: {:?}", path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -1,7 +1,9 @@
|
|||||||
// src/lib.rs
|
// src/lib.rs
|
||||||
|
pub mod cleaner;
|
||||||
pub mod core;
|
pub mod core;
|
||||||
pub mod model;
|
pub mod model;
|
||||||
pub mod outputs;
|
pub mod outputs;
|
||||||
|
pub use cleaner::LogCleaner;
|
||||||
|
|
||||||
use crate::core::Logger;
|
use crate::core::Logger;
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
|
|||||||
@@ -3,66 +3,128 @@
|
|||||||
use super::LogOutput;
|
use super::LogOutput;
|
||||||
use crate::model::LogRecord;
|
use crate::model::LogRecord;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use std::path::Path;
|
use chrono::{DateTime, Utc};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
use tokio::fs::{File, OpenOptions};
|
use tokio::fs::{File, OpenOptions};
|
||||||
use tokio::io::{AsyncWriteExt, BufWriter};
|
use tokio::io::{AsyncWriteExt, BufWriter};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
const MAX_FILE_SIZE: u64 = 100 * 1024 * 1024; // 100MB
|
||||||
|
|
||||||
pub struct FileOutput {
|
pub struct FileOutput {
|
||||||
// 使用 Mutex 包裹,因为 write 方法是 &self (不可变引用),但写入文件需要修改内部状态
|
base_path: PathBuf, // 基础目录,如 "logs/"
|
||||||
// 使用 BufWriter 提高 IO 性能
|
file_prefix: String, // 文件前缀,如 "order-service"
|
||||||
writer: Mutex<BufWriter<File>>,
|
current_writer: Mutex<Option<BufWriter<File>>>, // 当前的写入句柄
|
||||||
|
current_date: Mutex<String>, // 当前文件对应的日期 "2023-10-27"
|
||||||
|
current_path: Mutex<PathBuf>, // 当前正在写入的完整路径
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileOutput {
|
impl FileOutput {
|
||||||
/// 创建文件输出实例
|
pub async fn new(dir: impl AsRef<Path>, prefix: &str) -> anyhow::Result<Self> {
|
||||||
/// path: 日志文件路径,例如 "logs/app.log"
|
let dir = dir.as_ref().to_path_buf();
|
||||||
pub async fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
|
tokio::fs::create_dir_all(&dir).await?;
|
||||||
// 确保父目录存在
|
|
||||||
if let Some(parent) = path.as_ref().parent() {
|
// 初始化时,先不打开文件,等第一条日志来了再打开 (Lazy Open)
|
||||||
tokio::fs::create_dir_all(parent).await?;
|
// 或者这里可以先做一次 rotate_if_needed,为了简单我们设为空
|
||||||
|
Ok(Self {
|
||||||
|
base_path: dir,
|
||||||
|
file_prefix: prefix.to_string(),
|
||||||
|
current_writer: Mutex::new(None),
|
||||||
|
current_date: Mutex::new(String::new()),
|
||||||
|
current_path: Mutex::new(PathBuf::new()),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// 以追加模式打开文件,如果不存在则创建
|
// 生成当天的基础文件名: logs/order-service-2023-10-27.log
|
||||||
|
fn get_target_path(&self, date_str: &str) -> PathBuf {
|
||||||
|
self.base_path
|
||||||
|
.join(format!("{}-{}.log", self.file_prefix, date_str))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 核心轮转逻辑
|
||||||
|
async fn rotate_if_needed(&self, timestamp: &DateTime<Utc>) -> anyhow::Result<()> {
|
||||||
|
let date_str = timestamp.format("%Y-%m-%d").to_string();
|
||||||
|
let target_path = self.get_target_path(&date_str);
|
||||||
|
|
||||||
|
let mut date_guard = self.current_date.lock().await;
|
||||||
|
let mut path_guard = self.current_path.lock().await;
|
||||||
|
let mut writer_guard = self.current_writer.lock().await;
|
||||||
|
|
||||||
|
let mut need_open = false;
|
||||||
|
|
||||||
|
// 1. 检查日期是否变化 (跨天)
|
||||||
|
if *date_guard != date_str {
|
||||||
|
*date_guard = date_str.clone();
|
||||||
|
*path_guard = target_path.clone();
|
||||||
|
need_open = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 检查大小 (是否超过 100MB)
|
||||||
|
if !need_open {
|
||||||
|
if let Some(writer) = writer_guard.as_mut() {
|
||||||
|
// 刷新一下缓冲区,确保获取的文件大小是准确的
|
||||||
|
let _ = writer.flush().await;
|
||||||
|
}
|
||||||
|
// 获取当前文件元数据
|
||||||
|
if let Ok(metadata) = tokio::fs::metadata(&*path_guard).await
|
||||||
|
&& metadata.len() >= MAX_FILE_SIZE
|
||||||
|
{
|
||||||
|
// 需要切割:将当前 active 的日志重命名为 archive
|
||||||
|
// 格式:logs/order-service-2023-10-27.log -> logs/order-service-2023-10-27.TIMESTAMP.log
|
||||||
|
let backup_name = format!(
|
||||||
|
"{}-{}.{}.log",
|
||||||
|
self.file_prefix,
|
||||||
|
date_str,
|
||||||
|
Utc::now().format("%H%M%S") // 加个时间后缀避免重名
|
||||||
|
);
|
||||||
|
let backup_path = self.base_path.join(backup_name);
|
||||||
|
|
||||||
|
// 重命名
|
||||||
|
if let Err(e) = tokio::fs::rename(&*path_guard, backup_path).await {
|
||||||
|
eprintln!("Failed to rotate log file: {}", e);
|
||||||
|
}
|
||||||
|
need_open = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 如果需要,重新打开文件
|
||||||
|
if need_open || writer_guard.is_none() {
|
||||||
let file = OpenOptions::new()
|
let file = OpenOptions::new()
|
||||||
.create(true)
|
.create(true)
|
||||||
.append(true)
|
.append(true)
|
||||||
.open(path)
|
.open(&*path_guard)
|
||||||
.await?;
|
.await?;
|
||||||
|
*writer_guard = Some(BufWriter::new(file));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Self {
|
Ok(())
|
||||||
writer: Mutex::new(BufWriter::new(file)),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl LogOutput for FileOutput {
|
impl LogOutput for FileOutput {
|
||||||
async fn write(&self, record: &LogRecord) {
|
async fn write(&self, record: &LogRecord) {
|
||||||
// 1. 格式化日志字符串 (简单文本格式)
|
// 1. 写入前检查轮转
|
||||||
let log_line = format!(
|
if let Err(e) = self.rotate_if_needed(&record.timestamp).await {
|
||||||
"{} [{}] ({}) - {}\n",
|
eprintln!("Log rotation failed: {}", e);
|
||||||
record.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"), // 精确到毫秒
|
|
||||||
record.level,
|
|
||||||
record.module,
|
|
||||||
record.message
|
|
||||||
);
|
|
||||||
|
|
||||||
// 2. 获取锁并写入
|
|
||||||
let mut writer = self.writer.lock().await;
|
|
||||||
|
|
||||||
// 写入缓冲区
|
|
||||||
if let Err(e) = writer.write_all(log_line.as_bytes()).await {
|
|
||||||
eprintln!("Failed to write to log file: {}", e);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 刷新缓冲区 (Flush)
|
let log_line = format!(
|
||||||
// 生产环境权衡:
|
"{} [{}] - {}\n",
|
||||||
// - 每次 flush: 数据最安全,但性能略低
|
record.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"),
|
||||||
// - 只要 writer 没丢,Tokio 会自动管理,但如果程序崩溃可能丢最后几行
|
record.level,
|
||||||
if let Err(e) = writer.flush().await {
|
record.message
|
||||||
eprintln!("Failed to flush log file: {}", e);
|
);
|
||||||
|
|
||||||
|
// 2. 写入
|
||||||
|
let mut writer_guard = self.current_writer.lock().await;
|
||||||
|
if let Some(writer) = writer_guard.as_mut() {
|
||||||
|
if let Err(e) = writer.write_all(log_line.as_bytes()).await {
|
||||||
|
eprintln!("Failed to write to file: {}", e);
|
||||||
|
}
|
||||||
|
// 生产环境可以不每次 flush,交给 buffer 满自动刷新,提高性能
|
||||||
|
let _ = writer.flush().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user