"""publish_logs 表读写模板。""" from __future__ import annotations from typing import Any, List, Optional, Tuple from db.connection import get_conn, init_db from util.timeutil import now_unix def save_publish_log( account_id: str, article_id: int, article_title: str, status: str, error_msg: Optional[str] = None, ) -> int: init_db() now = now_unix() conn = get_conn() try: cur = conn.cursor() cur.execute( """ INSERT INTO publish_logs (account_id, article_id, article_title, status, error_msg, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (account_id, int(article_id), article_title or "", status, error_msg, now, now), ) new_id = int(cur.lastrowid) conn.commit() finally: conn.close() return new_id def list_publish_logs( limit: int, status: Optional[str] = None, account_id: Optional[str] = None, ) -> List[Tuple[Any, ...]]: init_db() conn = get_conn() try: cur = conn.cursor() sql = ( "SELECT id, account_id, article_id, article_title, status, error_msg, created_at, updated_at " "FROM publish_logs WHERE 1=1 " ) params: List[Any] = [] if status: sql += "AND status = ? " params.append(status) if account_id: sql += "AND account_id = ? " params.append(account_id) sql += "ORDER BY created_at DESC, id DESC LIMIT ?" params.append(int(limit)) cur.execute(sql, tuple(params)) return list(cur.fetchall()) finally: conn.close() def get_publish_log_by_id(log_id: int) -> Optional[Tuple[Any, ...]]: init_db() conn = get_conn() try: cur = conn.cursor() cur.execute( "SELECT id, account_id, article_id, article_title, status, error_msg, created_at, updated_at FROM publish_logs WHERE id = ?", (int(log_id),), ) return cur.fetchone() finally: conn.close()