77 lines
2.0 KiB
Python
77 lines
2.0 KiB
Python
"""publish_logs 表读写模板。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, List, Optional, Tuple
|
|
|
|
from db.connection import get_conn, init_db
|
|
from util.timeutil import now_unix
|
|
|
|
|
|
def save_publish_log(
|
|
account_id: str,
|
|
article_id: int,
|
|
article_title: str,
|
|
status: str,
|
|
error_msg: Optional[str] = None,
|
|
) -> int:
|
|
init_db()
|
|
now = now_unix()
|
|
conn = get_conn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO publish_logs (account_id, article_id, article_title, status, error_msg, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(account_id, int(article_id), article_title or "", status, error_msg, now, now),
|
|
)
|
|
new_id = int(cur.lastrowid)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return new_id
|
|
|
|
|
|
def list_publish_logs(
|
|
limit: int,
|
|
status: Optional[str] = None,
|
|
account_id: Optional[str] = None,
|
|
) -> List[Tuple[Any, ...]]:
|
|
init_db()
|
|
conn = get_conn()
|
|
try:
|
|
cur = conn.cursor()
|
|
sql = (
|
|
"SELECT id, account_id, article_id, article_title, status, error_msg, created_at, updated_at "
|
|
"FROM publish_logs WHERE 1=1 "
|
|
)
|
|
params: List[Any] = []
|
|
if status:
|
|
sql += "AND status = ? "
|
|
params.append(status)
|
|
if account_id:
|
|
sql += "AND account_id = ? "
|
|
params.append(account_id)
|
|
sql += "ORDER BY created_at DESC, id DESC LIMIT ?"
|
|
params.append(int(limit))
|
|
cur.execute(sql, tuple(params))
|
|
return list(cur.fetchall())
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_publish_log_by_id(log_id: int) -> Optional[Tuple[Any, ...]]:
|
|
init_db()
|
|
conn = get_conn()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id, account_id, article_id, article_title, status, error_msg, created_at, updated_at FROM publish_logs WHERE id = ?",
|
|
(int(log_id),),
|
|
)
|
|
return cur.fetchone()
|
|
finally:
|
|
conn.close()
|