Files
skill-template/content-manager/content_manager/services/article_service.py
2026-04-04 10:35:02 +08:00

432 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""文章:业务规则与编排(调用仓储 + llm-manager"""
from __future__ import annotations
import json
import os
import re
import subprocess
import sys
from typing import Any, Dict, Optional
from content_manager.config import get_openclaw_root
from content_manager.constants import PUBLISH_PLATFORM_CN, PUBLISH_PLATFORM_ALIASES
from content_manager.db import articles_repository as ar
from content_manager.db import prompts_repository as pr
from content_manager.db.connection import get_conn, init_db
from content_manager.util.timeutil import now_unix, unix_to_iso
def _row_to_public_dict(row: tuple) -> Dict[str, Any]:
rid, title, body, content_html, status, source, account_id, error_msg, llm_target, extra_json, cat, uat = row
d: Dict[str, Any] = {
"id": int(rid),
"title": title,
"content": body,
"content_html": content_html if content_html else body,
"status": status or "draft",
"source": source or "manual",
"account_id": account_id,
"error_msg": error_msg,
"llm_target": llm_target,
"created_at": unix_to_iso(cat),
"updated_at": unix_to_iso(uat),
}
if extra_json:
try:
ex = json.loads(extra_json)
if isinstance(ex, dict):
d["extra"] = ex
except json.JSONDecodeError:
pass
return d
def resolve_publish_platform(raw: Optional[str]) -> Optional[str]:
s = (raw or "").strip().lower()
if not s:
return "common"
for key, aliases in PUBLISH_PLATFORM_ALIASES.items():
if s in {a.lower() for a in aliases}:
return key
return None
def _choose_prompt_template(platform: str) -> Optional[Dict[str, Any]]:
init_db()
conn = get_conn()
try:
rows = pr.fetch_active_templates(conn, platform)
if not rows and platform != "common":
rows = pr.fetch_common_fallback(conn)
finally:
conn.close()
return pr.pick_random_template(rows)
def _build_prompt_from_template(template_text: str, topic: str, platform: str) -> str:
platform_name = PUBLISH_PLATFORM_CN.get(platform, "通用")
rendered = (
template_text.replace("{topic}", topic).replace("{platform}", platform).replace("{platform_name}", platform_name)
)
return rendered.strip()
def cmd_add(title: str, body: str, source: str = "manual", llm_target: Optional[str] = None) -> None:
init_db()
title = (title or "").strip() or "未命名"
body = body or ""
ts = now_unix()
conn = get_conn()
try:
new_id = ar.insert_article(
conn,
title=title,
body=body,
content_html=None,
status="draft",
source=source,
account_id=None,
error_msg=None,
llm_target=llm_target,
extra_json=None,
created_at=ts,
updated_at=ts,
)
conn.commit()
finally:
conn.close()
print(f"✅ 已新增文章 id={new_id} | {title}")
def cmd_import_json(path: str) -> None:
init_db()
path = os.path.abspath(path.strip())
if not os.path.isfile(path):
print(f"❌ 找不到文件:{path}\n请检查路径是否正确、文件是否存在。")
sys.exit(1)
with open(path, encoding="utf-8") as f:
raw = json.load(f)
if isinstance(raw, dict) and "articles" in raw:
items = raw["articles"]
elif isinstance(raw, list):
items = raw
else:
print(
"❌ JSON 格式不对。\n"
"正确格式二选一:① 文件里是数组 [ {\"title\":\"\",\"body\":\"\"}, … ]\n"
"② 或对象 {\"articles\": [ … ] }数组里每项至少要有正文body 或 content"
)
sys.exit(1)
if not items:
print("❌ JSON 里没有可导入的文章条目(数组为空)。")
sys.exit(1)
n = 0
for i, item in enumerate(items):
if not isinstance(item, dict):
print(f"❌ 第 {i + 1} 条不是 JSON 对象(应为 {{ \"title\":…, \"body\":… }})。")
sys.exit(1)
title = (item.get("title") or item.get("标题") or "").strip()
body = item.get("body") or item.get("content") or item.get("正文") or ""
if isinstance(body, dict):
print(f"❌ 第 {i + 1} 条的 body/content 必须是字符串,不能是别的类型。")
sys.exit(1)
body = str(body)
if not title and not body.strip():
continue
if not title:
title = f"导入-{i + 1}"
cmd_add(title, body, source="import")
n += 1
print(f"✅ 批量导入完成,共写入 {n}")
def _parse_llm_stdout(stdout: str) -> str:
if "===LLM_START===" in stdout and "===LLM_END===" in stdout:
chunk = stdout.split("===LLM_START===", 1)[1]
chunk = chunk.split("===LLM_END===", 1)[0]
return chunk.strip()
return (stdout or "").strip()
def _default_title_from_body(body: str) -> str:
for line in body.splitlines():
t = line.strip()
if t:
return t[:120] if len(t) > 120 else t
return f"文稿-{now_unix()}"
def cmd_generate(
llm_target: str,
topic: str,
publish_platform: str = "common",
title: Optional[str] = None,
) -> None:
llm_target = (llm_target or "").strip()
topic = (topic or "").strip()
publish_platform = (publish_platform or "common").strip().lower()
if not llm_target or not topic:
print(
"❌ 生成参数不完整。\n"
"请使用python main.py article generate <模型> [发布平台] <主题或关键词>\n"
"示例python main.py article generate 豆包 搜狐号 RPA降本增效"
)
sys.exit(1)
template = _choose_prompt_template(publish_platform)
if not template:
print("❌ 提示词模板库为空,请先补充模板后再执行 generate。")
sys.exit(1)
prompt = _build_prompt_from_template(template["template_text"], topic, publish_platform)
script = os.path.join(get_openclaw_root(), "llm-manager", "scripts", "main.py")
if not os.path.isfile(script):
print(
f"❌ 找不到大模型脚本:{script}\n"
"请确认 llm-manager 与 content-manager 在同一上级目录OpenClaw下。"
)
sys.exit(1)
proc = subprocess.run(
[sys.executable, script, "generate", llm_target, prompt],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
out = (proc.stdout or "") + "\n" + (proc.stderr or "")
std = proc.stdout or ""
has_markers = "===LLM_START===" in std and "===LLM_END===" in std
if (proc.returncode != 0 and not has_markers) or (
proc.returncode == 0 and not has_markers and re.search(r"(?m)^ERROR:", std)
):
print(
(out.strip() or f"大模型进程退出码 {proc.returncode}")
+ "\n❌ 生成失败:请根据上面说明处理(常见:先在「模型管理」添加并登录该平台账号,或配置 API Key"
)
sys.exit(1)
body = _parse_llm_stdout(proc.stdout or out)
if not body:
print(
"❌ 没有从大模型输出里取到正文。\n"
"正常情况输出里应包含 ===LLM_START=== 与 ===LLM_END===;请重试或查看 llm-manager 是否正常打印。"
)
sys.exit(1)
body = body.strip()
if body.startswith("ERROR:"):
print(out.strip())
print(f"\n❌ 生成失败,未写入数据库。\n{body}")
sys.exit(1)
final_title = (title or "").strip() or _default_title_from_body(body)
extra_payload = {
"generate_meta": {
"mode": "template",
"topic": topic,
"platform": publish_platform,
"platform_cn": PUBLISH_PLATFORM_CN.get(publish_platform, publish_platform),
"template_id": template["id"],
"template_name": template["name"],
}
}
init_db()
ts = now_unix()
conn = get_conn()
try:
new_id = ar.insert_article(
conn,
title=final_title,
body=body,
content_html=None,
status="draft",
source="llm",
account_id=None,
error_msg=None,
llm_target=llm_target,
extra_json=json.dumps(extra_payload, ensure_ascii=False),
created_at=ts,
updated_at=ts,
)
pr.insert_usage(conn, int(template["id"]), llm_target, publish_platform, topic, int(new_id))
conn.commit()
finally:
conn.close()
print(
f"✅ 已写入 LLM 文稿 id={new_id} | {final_title}\n"
f" 模板:{template['name']} (id={template['id']}) | 平台:{PUBLISH_PLATFORM_CN.get(publish_platform, publish_platform)} | 主题:{topic}"
)
def cmd_prompt_list(platform: Optional[str] = None, limit: int = 30) -> None:
init_db()
if limit <= 0:
limit = 30
key = resolve_publish_platform(platform) if platform else None
if platform and not key:
print(f"❌ 不支持的平台:{platform}")
print("支持:通用 / 搜狐号 / 头条号 / 公众号")
sys.exit(1)
conn = get_conn()
try:
rows = pr.list_templates(conn, key, limit)
finally:
conn.close()
if not rows:
print("暂无提示词模板")
return
sep_line = "_" * 39
for idx, (rid, p, name, active, uat) in enumerate(rows):
print(f"id{rid}")
print(f"platform{p}")
print(f"platform_cn{PUBLISH_PLATFORM_CN.get(p, p)}")
print(f"name{name}")
print(f"is_active{int(active)}")
print(f"updated_at{unix_to_iso(uat) or ''}")
if idx != len(rows) - 1:
print(sep_line)
print()
def cmd_save(article_id: str, title: str, content: str) -> None:
init_db()
ts = now_unix()
conn = get_conn()
try:
if article_id.isdigit():
aid = int(article_id)
if ar.exists_id(conn, aid):
ar.update_article_body(conn, aid, title, content, ts)
conn.commit()
print(f"✅ 已更新 id={aid} | {title}")
return
new_id = ar.insert_article(
conn,
title=title,
body=content,
content_html=None,
status="draft",
source="manual",
account_id=None,
error_msg=None,
llm_target=None,
extra_json=None,
created_at=ts,
updated_at=ts,
)
conn.commit()
print(f"✅ 已新建 id={new_id} | {title}")
finally:
conn.close()
def cmd_get(article_id: str) -> None:
init_db()
if not str(article_id).strip().isdigit():
print("❌ 文章 id 必须是纯数字(整数)。请先 article list 查看最左一列编号。")
sys.exit(1)
aid = int(article_id)
conn = get_conn()
try:
row = ar.fetch_by_id(conn, aid)
finally:
conn.close()
if not row:
print("❌ 没有这篇文章:该 id 在库里不存在。请先执行 article list 核对编号。")
sys.exit(1)
print(json.dumps(_row_to_public_dict(row), ensure_ascii=False))
def cmd_list(limit: int = 10, max_chars: int = 50) -> None:
init_db()
conn = get_conn()
try:
rows = ar.list_recent(conn, limit)
finally:
conn.close()
if not rows:
print("暂无文章")
return
def maybe_truncate(text: str) -> str:
if not text:
return ""
if len(text) > max_chars:
return text[:max_chars] + "..."
return text
sep_line = "_" * 39
for idx, r in enumerate(rows):
(
rid,
title,
body,
content_html,
status,
source,
account_id,
error_msg,
llm_target,
extra_json,
created_at,
updated_at,
) = r
content = content_html if content_html else (body or "")
print(f"id{rid}")
print(f"title{title or ''}")
print("body")
print(maybe_truncate(body or ""))
print("content")
print(maybe_truncate(content or ""))
print(f"status{status or ''}")
print(f"source{source or ''}")
print(f"account_id{account_id or ''}")
print(f"error_msg{error_msg or ''}")
print(f"llm_target{llm_target or ''}")
print(f"extra_json{extra_json or ''}")
print(f"created_at{unix_to_iso(created_at) or ''}")
print(f"updated_at{unix_to_iso(updated_at) or ''}")
if idx != len(rows) - 1:
print(sep_line)
print()
def cmd_delete(article_id: str) -> None:
init_db()
if not str(article_id).strip().isdigit():
print("❌ 文章 id 必须是纯数字。请先 article list 查看。")
sys.exit(1)
aid = int(article_id)
conn = get_conn()
try:
n = ar.delete_by_id(conn, aid)
if n == 0:
print("❌ 没有 id 为 {} 的文章,无法删除。".format(aid))
sys.exit(1)
conn.commit()
finally:
conn.close()
print(f"✅ 已删除 id={aid}")
def cmd_feedback(
article_id: str,
status: str,
account_id: Optional[str] = None,
error_msg: Optional[str] = None,
) -> None:
init_db()
if not str(article_id).strip().isdigit():
print("❌ 文章 id 必须是纯数字。")
sys.exit(1)
aid = int(article_id)
ts = now_unix()
conn = get_conn()
try:
if not ar.exists_id(conn, aid):
print("❌ 没有 id 为 {} 的文章,无法回写状态。".format(aid))
sys.exit(1)
ar.update_feedback(conn, aid, status, account_id, error_msg, ts)
conn.commit()
finally:
conn.close()
print("✅ 状态已更新")