docs: standardize skill-template and add development guide
All checks were successful
技能自动化发布 / release (push) Successful in 22s

This commit is contained in:
2026-04-13 13:46:23 +08:00
parent f11c596bde
commit 298448840d
40 changed files with 1455 additions and 533 deletions

104
scripts/cli/app.py Normal file
View File

@@ -0,0 +1,104 @@
"""CLIargparse 与分发模板。"""
from __future__ import annotations
import argparse
import sys
from typing import List, Optional
from service.publish_service import (
cmd_health,
cmd_log_get,
cmd_logs,
cmd_publish,
cmd_version,
)
from util.argparse_zh import ZhArgumentParser
from util.constants import LOG_LOGGER_NAME, SKILL_SLUG
from util.logging_config import get_skill_logger, setup_skill_logging
def _cli_str_or_none(raw: Optional[str]) -> Optional[str]:
if raw is None:
return None
v = str(raw).strip()
return v or None
def _handle_publish(args: argparse.Namespace) -> int:
tail = [str(x).strip() for x in (args.publish_tail or []) if str(x).strip()]
if len(tail) > 2:
print("❌ 参数过多。")
print("用法python main.py publish [账号id [文章id]] | publish [-a 账号id] [-i 文章id]")
return 1
t_acc: Optional[str] = None
t_art: Optional[str] = None
if len(tail) == 2:
t_acc, t_art = tail[0], tail[1]
elif len(tail) == 1:
if tail[0].isdigit():
t_art = tail[0]
else:
t_acc = tail[0]
pick_a = _cli_str_or_none(getattr(args, "account_id", None))
pick_i = _cli_str_or_none(getattr(args, "article_id", None))
acc = pick_a or t_acc
art = pick_i or t_art
return cmd_publish(account_id=acc, article_id=art)
def _print_full_usage() -> None:
print("模板技能main.py可用命令")
print(" python main.py publish [账号id [文章id]] [-a 账号] [-i 文章id]")
print(" python main.py logs [--limit N] [--status s] [--account-id a]")
print(" python main.py log-get <log_id>")
print(" python main.py health")
print(" python main.py version")
def build_parser() -> ZhArgumentParser:
p = ZhArgumentParser(
prog="main.py",
description="模板技能:发布命令骨架、日志查询、健康检查、版本输出。",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = p.add_subparsers(dest="cmd", required=True, parser_class=ZhArgumentParser)
sp = sub.add_parser("publish", help="发布型技能命令骨架")
sp.add_argument("--account-id", "-a", default=None, metavar="账号id")
sp.add_argument("--article-id", "-i", default=None, metavar="文章id")
sp.add_argument("publish_tail", nargs="*", metavar="位置参数")
sp.set_defaults(handler=_handle_publish)
sp = sub.add_parser("logs", help="查看发布记录")
sp.add_argument("--limit", type=int, default=10)
sp.add_argument("--status", default=None)
sp.add_argument("--account-id", default=None)
sp.set_defaults(handler=lambda a: cmd_logs(limit=a.limit, status=a.status, account_id=a.account_id))
sp = sub.add_parser("log-get", help="按 log_id 查看单条发布记录(JSON)")
sp.add_argument("log_id")
sp.set_defaults(handler=lambda a: cmd_log_get(a.log_id))
sp = sub.add_parser("health", help="健康检查")
sp.set_defaults(handler=lambda _a: cmd_health())
sp = sub.add_parser("version", help="版本信息(JSON)")
sp.set_defaults(handler=lambda _a: cmd_version())
return p
def main(argv: Optional[List[str]] = None) -> int:
argv = argv if argv is not None else sys.argv[1:]
setup_skill_logging(SKILL_SLUG, LOG_LOGGER_NAME)
get_skill_logger().info("cli_start argv=%s", sys.argv)
if not argv:
_print_full_usage()
return 1
if len(argv) == 2 and argv[0] not in {"publish", "logs", "log-get", "health", "version", "-h", "--help"}:
return cmd_publish(account_id=argv[0], article_id=argv[1])
parser = build_parser()
args = parser.parse_args(argv)
return int(args.handler(args))

34
scripts/db/connection.py Normal file
View File

@@ -0,0 +1,34 @@
"""SQLite 连接与日志表迁移模板。"""
from __future__ import annotations
import sqlite3
from util.runtime_paths import get_db_path
def get_conn() -> sqlite3.Connection:
return sqlite3.connect(get_db_path())
def init_db() -> None:
conn = get_conn()
try:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS publish_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT NOT NULL,
article_id INTEGER NOT NULL,
article_title TEXT,
status TEXT NOT NULL,
error_msg TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
"""
)
conn.commit()
finally:
conn.close()

View File

@@ -0,0 +1,76 @@
"""publish_logs 表读写模板。"""
from __future__ import annotations
from typing import Any, List, Optional, Tuple
from db.connection import get_conn, init_db
from util.timeutil import now_unix
def save_publish_log(
account_id: str,
article_id: int,
article_title: str,
status: str,
error_msg: Optional[str] = None,
) -> int:
init_db()
now = now_unix()
conn = get_conn()
try:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO publish_logs (account_id, article_id, article_title, status, error_msg, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(account_id, int(article_id), article_title or "", status, error_msg, now, now),
)
new_id = int(cur.lastrowid)
conn.commit()
finally:
conn.close()
return new_id
def list_publish_logs(
limit: int,
status: Optional[str] = None,
account_id: Optional[str] = None,
) -> List[Tuple[Any, ...]]:
init_db()
conn = get_conn()
try:
cur = conn.cursor()
sql = (
"SELECT id, account_id, article_id, article_title, status, error_msg, created_at, updated_at "
"FROM publish_logs WHERE 1=1 "
)
params: List[Any] = []
if status:
sql += "AND status = ? "
params.append(status)
if account_id:
sql += "AND account_id = ? "
params.append(account_id)
sql += "ORDER BY created_at DESC, id DESC LIMIT ?"
params.append(int(limit))
cur.execute(sql, tuple(params))
return list(cur.fetchall())
finally:
conn.close()
def get_publish_log_by_id(log_id: int) -> Optional[Tuple[Any, ...]]:
init_db()
conn = get_conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT id, account_id, article_id, article_title, status, error_msg, created_at, updated_at FROM publish_logs WHERE id = ?",
(int(log_id),),
)
return cur.fetchone()
finally:
conn.close()

View File

@@ -0,0 +1 @@
# Vendored from jiangchang-platform-kit/sdk/jiangchang_skill_core/ — keep runtime_env + unified_logging in sync.

View File

@@ -0,0 +1,113 @@
"""
JIANGCHANG_* 数据根与用户目录:解析规则 + 可选本地 CLI 默认值。
模板复制后通常无需大改;如组织环境不同,再按项目实际调整。
"""
from __future__ import annotations
import os
import sys
CLI_LOCAL_DEV_ENABLED = True
DEFAULT_LOCAL_USER_ID = "10032"
WIN_DEFAULT_DATA_ROOT = r"D:\jiangchang-data"
WIN_DEFAULT_JIANGCHANG_APP_ROOT = r"D:\AI\jiangchang"
def platform_default_data_root() -> str:
if sys.platform == "win32":
return WIN_DEFAULT_DATA_ROOT
return os.path.join(os.path.expanduser("~"), ".jiangchang-data")
def get_data_root() -> str:
env = (
os.getenv("CLAW_DATA_ROOT")
or os.getenv("JIANGCHANG_DATA_ROOT")
or ""
).strip()
if env:
return env
return platform_default_data_root()
def get_user_id() -> str:
return (
os.getenv("CLAW_USER_ID")
or os.getenv("JIANGCHANG_USER_ID")
or ""
).strip() or "_anon"
def _looks_like_skills_root(path: str) -> bool:
if not path or not os.path.isdir(path):
return False
for marker in (
"llm-manager",
"content-manager",
"account-manager",
"sohu-publisher",
"toutiao-publisher",
"gongzhonghao-publisher",
"weibo-publisher",
"skill-template",
):
if os.path.isdir(os.path.join(path, marker)):
return True
return False
def get_skills_root() -> str:
for key in ("JIANGCHANG_SKILLS_ROOT", "CLAW_SKILLS_ROOT"):
v = (os.getenv(key) or "").strip()
if v:
return os.path.normpath(v)
app = (os.getenv("JIANGCHANG_APP_ROOT") or "").strip()
if sys.platform == "win32" and not app:
app = WIN_DEFAULT_JIANGCHANG_APP_ROOT
if app:
nested = os.path.join(app, "skills")
if _looks_like_skills_root(nested):
return os.path.normpath(nested)
if _looks_like_skills_root(app):
return os.path.normpath(app)
if sys.platform == "win32":
nested = os.path.join(WIN_DEFAULT_JIANGCHANG_APP_ROOT, "skills")
if _looks_like_skills_root(nested):
return os.path.normpath(nested)
if _looks_like_skills_root(WIN_DEFAULT_JIANGCHANG_APP_ROOT):
return os.path.normpath(WIN_DEFAULT_JIANGCHANG_APP_ROOT)
return os.path.normpath(os.path.join(os.path.expanduser("~"), ".openclaw", "skills"))
def get_sibling_skills_root(skill_scripts_dir: str | None = None) -> str:
if skill_scripts_dir:
scripts = os.path.abspath(skill_scripts_dir)
skill_root = os.path.dirname(scripts)
inferred = os.path.dirname(skill_root)
if _looks_like_skills_root(inferred):
return os.path.normpath(inferred)
for key in ("JIANGCHANG_SKILLS_ROOT", "CLAW_SKILLS_ROOT"):
v = (os.getenv(key) or "").strip()
if v:
return os.path.normpath(v)
return get_skills_root()
def apply_cli_local_defaults() -> None:
enabled = CLI_LOCAL_DEV_ENABLED
if not enabled:
v = (os.getenv("JIANGCHANG_CLI_LOCAL_DEV") or "").strip().lower()
enabled = v in ("1", "true", "yes", "on")
if not enabled:
return
if not (os.getenv("CLAW_DATA_ROOT") or os.getenv("JIANGCHANG_DATA_ROOT") or "").strip():
os.environ["JIANGCHANG_DATA_ROOT"] = platform_default_data_root()
if not (os.getenv("CLAW_USER_ID") or os.getenv("JIANGCHANG_USER_ID") or "").strip():
os.environ["JIANGCHANG_USER_ID"] = DEFAULT_LOCAL_USER_ID.strip()

View File

@@ -0,0 +1,137 @@
"""
统一文件日志:{DATA_ROOT}/{USER_ID}/logs/jiangchang.log
按日轮转;行内带 trace_id 与 skill_slug便于跨技能排查。
"""
from __future__ import annotations
import logging
import os
import sys
import uuid
from logging.handlers import TimedRotatingFileHandler
from typing import Optional
from .runtime_env import get_data_root, get_user_id
_skill_slug: str = ""
_logger_name: str = ""
def get_unified_logs_dir() -> str:
path = os.path.join(get_data_root(), get_user_id(), "logs")
os.makedirs(path, exist_ok=True)
return path
def get_skill_log_file_path() -> str:
override = (os.getenv("JIANGCHANG_LOG_FILE") or "").strip()
if override:
parent = os.path.dirname(os.path.abspath(override))
if parent:
os.makedirs(parent, exist_ok=True)
return os.path.abspath(override)
return os.path.join(get_unified_logs_dir(), "jiangchang.log")
def ensure_trace_for_process() -> str:
existing = (os.getenv("JIANGCHANG_TRACE_ID") or "").strip()
if existing:
return existing
tid = uuid.uuid4().hex[:12]
os.environ["JIANGCHANG_TRACE_ID"] = tid
return tid
def subprocess_env_with_trace(environ: Optional[dict] = None) -> dict:
ensure_trace_for_process()
tid = (os.getenv("JIANGCHANG_TRACE_ID") or "").strip() or ensure_trace_for_process()
base = os.environ if environ is None else environ
return {**base, "JIANGCHANG_TRACE_ID": tid}
class _SkillContextFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
record.trace_id = (os.getenv("JIANGCHANG_TRACE_ID") or "").strip() or "-"
record.skill_slug = _skill_slug or "-"
return True
_FORMAT = "%(asctime)s | %(levelname)-8s | %(trace_id)s | %(skill_slug)s | %(name)s | %(message)s"
_DATEFMT = "%Y-%m-%dT%H:%M:%S"
def _log_level_from_env() -> int:
v = (os.getenv("JIANGCHANG_LOG_LEVEL") or "INFO").strip().upper()
return getattr(logging, v, None) or logging.INFO
def _backup_count() -> int:
try:
n = int((os.getenv("JIANGCHANG_LOG_BACKUP_COUNT") or "30").strip())
return max(1, min(n, 365))
except ValueError:
return 30
def setup_skill_logging(skill_slug: str, logger_name: str) -> None:
global _skill_slug, _logger_name
ensure_trace_for_process()
_skill_slug = skill_slug
_logger_name = logger_name
log = logging.getLogger(logger_name)
if log.handlers:
return
log.setLevel(_log_level_from_env())
path = get_skill_log_file_path()
fh = TimedRotatingFileHandler(
path,
when="midnight",
interval=1,
backupCount=_backup_count(),
encoding="utf-8",
delay=True,
)
fmt = logging.Formatter(_FORMAT, datefmt=_DATEFMT)
fh.setFormatter(fmt)
fh.addFilter(_SkillContextFilter())
log.addHandler(fh)
if (os.getenv("JIANGCHANG_LOG_TO_STDERR") or "").strip().lower() in ("1", "true", "yes", "on"):
sh = logging.StreamHandler(sys.stderr)
sh.setLevel(logging.WARNING)
sh.setFormatter(fmt)
sh.addFilter(_SkillContextFilter())
log.addHandler(sh)
log.propagate = False
def get_skill_logger() -> logging.Logger:
if not _logger_name:
raise RuntimeError("get_skill_logger: call setup_skill_logging first")
return logging.getLogger(_logger_name)
def attach_unified_file_handler(
log_path: str,
*,
skill_slug: str,
logger_name: str,
level: int = logging.DEBUG,
) -> logging.Logger:
global _skill_slug
ensure_trace_for_process()
_skill_slug = skill_slug
lg = logging.getLogger(logger_name)
lg.handlers.clear()
lg.setLevel(level)
parent = os.path.dirname(os.path.abspath(log_path))
if parent:
os.makedirs(parent, exist_ok=True)
fh = logging.FileHandler(log_path, encoding="utf-8")
fmt = logging.Formatter(_FORMAT, datefmt=_DATEFMT)
fh.setFormatter(fmt)
fh.addFilter(_SkillContextFilter())
lg.addHandler(fh)
lg.propagate = False
return lg

31
scripts/main.py Normal file
View File

@@ -0,0 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
skill-template CLI 入口。
复制为新技能后,请修改注释与常量,但保留当前分层结构:
cliargv→ service业务编排→ db持久化→ util通用工具
"""
from __future__ import annotations
import os
import sys
if sys.platform == "win32":
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
_scripts_dir = os.path.dirname(os.path.abspath(__file__))
if _scripts_dir not in sys.path:
sys.path.insert(0, _scripts_dir)
from jiangchang_skill_core.runtime_env import apply_cli_local_defaults
apply_cli_local_defaults()
from cli.app import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,46 @@
"""在线鉴权(可选模板)。"""
from __future__ import annotations
import os
from typing import Tuple
import requests
def check_entitlement(skill_slug: str) -> Tuple[bool, str]:
auth_base = (os.getenv("JIANGCHANG_AUTH_BASE_URL") or "").strip().rstrip("/")
if not auth_base:
return True, ""
user_id = (os.getenv("CLAW_USER_ID") or os.getenv("JIANGCHANG_USER_ID") or "").strip()
if not user_id:
return False, "鉴权失败缺少用户身份CLAW_USER_ID / JIANGCHANG_USER_ID"
auth_api_key = (os.getenv("JIANGCHANG_AUTH_API_KEY") or "").strip()
timeout = int((os.getenv("JIANGCHANG_AUTH_TIMEOUT_SECONDS") or "5").strip())
headers = {"Content-Type": "application/json"}
if auth_api_key:
headers["Authorization"] = f"Bearer {auth_api_key}"
payload = {
"user_id": user_id,
"skill_slug": skill_slug,
"trace_id": (os.getenv("JIANGCHANG_TRACE_ID") or "").strip(),
"context": {"entry": "main.py"},
}
try:
res = requests.post(f"{auth_base}/api/entitlements/check", json=payload, headers=headers, timeout=timeout)
except requests.RequestException as exc:
return False, f"鉴权请求失败:{exc}"
if res.status_code != 200:
return False, f"鉴权服务异常HTTP {res.status_code}"
try:
body = res.json()
except ValueError:
return False, "鉴权服务异常:返回非 JSON"
code = body.get("code")
data = body.get("data") or {}
if code != 200:
return False, str(body.get("msg") or "鉴权失败")
if not data.get("allow", False):
return False, str(data.get("reason") or "未购买或已过期")
return True, ""

View File

@@ -0,0 +1,10 @@
"""后台自动化占位模块模板。"""
from __future__ import annotations
from typing import Any, Dict
async def publish(account: Dict[str, Any], article: Dict[str, Any], account_id: str) -> str:
_ = (account, article, account_id)
return "ERROR:NOT_IMPLEMENTED 请复制模板后将本文件改名为具体平台模块并实现后台自动化逻辑"

View File

@@ -0,0 +1,83 @@
"""发布编排、日志查询模板。"""
from __future__ import annotations
import json
import sys
from typing import Optional
from db import publish_logs_repository as plr
from service.entitlement_service import check_entitlement
from util.constants import SKILL_SLUG, SKILL_VERSION
from util.timeutil import unix_to_iso
def cmd_publish(account_id: Optional[str] = None, article_id: Optional[str] = None) -> int:
_ = (account_id, article_id)
ok, reason = check_entitlement(SKILL_SLUG)
if not ok:
print(f"{reason}")
return 1
print("❌ 这是模板仓库,请复制后在 scripts/service/ 中实现真正的发布逻辑。")
return 1
def cmd_logs(limit: int = 10, status: Optional[str] = None, account_id: Optional[str] = None) -> int:
if limit <= 0:
limit = 10
rows = plr.list_publish_logs(limit, status, account_id)
if not rows:
print("暂无发布记录")
return 0
sep_line = "_" * 39
for idx, r in enumerate(rows):
rid, aid, arid, title, st, err, cat, uat = r
print(f"id{rid}")
print(f"account_id{aid or ''}")
print(f"article_id{arid}")
print(f"article_title{title or ''}")
print(f"status{st or ''}")
print(f"error_msg{err or ''}")
print(f"created_at{unix_to_iso(cat) or str(cat or '')}")
print(f"updated_at{unix_to_iso(uat) or str(uat or '')}")
if idx != len(rows) - 1:
print(sep_line)
print()
return 0
def cmd_log_get(log_id: str) -> int:
if not str(log_id).isdigit():
print("❌ log_id 必须是数字")
return 1
row = plr.get_publish_log_by_id(int(log_id))
if not row:
print("❌ 没有这条发布记录")
return 1
rid, aid, arid, title, st, err, cat, uat = row
print(
json.dumps(
{
"id": int(rid),
"account_id": aid,
"article_id": int(arid),
"article_title": title,
"status": st,
"error_msg": err,
"created_at": unix_to_iso(cat),
"updated_at": unix_to_iso(uat),
},
ensure_ascii=False,
)
)
return 0
def cmd_health() -> int:
return 0 if sys.version_info >= (3, 10) else 1
def cmd_version() -> int:
print(json.dumps({"version": SKILL_VERSION, "skill": SKILL_SLUG}, ensure_ascii=False))
return 0

View File

@@ -0,0 +1,39 @@
"""兄弟技能 CLI 调用模板。"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from typing import Any, Dict, List, Optional
from util.logging_config import subprocess_env_with_trace
from util.runtime_paths import get_skills_root
def _call_json_script(script_path: str, args: List[str]) -> Optional[Dict[str, Any]]:
proc = subprocess.run(
[sys.executable, script_path, *args],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
env=subprocess_env_with_trace(),
)
raw = (proc.stdout or "").strip()
if not raw or raw.startswith("ERROR"):
return None
try:
data = json.loads(raw)
except json.JSONDecodeError:
return None
return data if isinstance(data, dict) else None
def get_account_manager_main_path() -> str:
return os.path.join(get_skills_root(), "account-manager", "scripts", "main.py")
def get_content_manager_main_path() -> str:
return os.path.join(get_skills_root(), "content-manager", "scripts", "main.py")

View File

@@ -1,83 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
技能入口脚本(模板)
====================
【职责】
- 作为宿主调用时的统一 CLI 入口子进程、终端、CI 均可)。
- 只做:参数解析、环境检查、分发到具体子命令;复杂逻辑放到同目录其它模块。
【如何扩展】
1. 在 main() 的 dispatch 字典中增加 "your_cmd": handler 项。
2. 实现 handler(argv) 或 handler();出错时打印 ERROR: 前缀信息并 sys.exit(非0)。
3. 在仓库根目录 SKILL.md「执行步骤」中补充示例命令。
【多宿主注意】
- 不要在本文件写死某一品牌宿主名。
- 路径与环境变量约定见 ../docs/RUNTIME.md可选辅助代码见 ../optional/paths_snippet.py需自行复制或 import 路径按项目调整)。
【编码】
Windows 下若宿主仍使用系统默认编码,可在宿主侧设置 UTF-8本模板不强制改 sys.stdout避免与宿主捕获冲突
"""
from __future__ import annotations
import argparse
import json
import sys
from typing import Callable, Dict, List, Optional
# 与 SKILL.md 中 metadata.skill.slug 保持一致(模板占位,复制后请修改)
SKILL_SLUG = "your-skill-slug"
def cmd_version(_args: argparse.Namespace) -> int:
"""打印版本信息(与 SKILL.md frontmatter 中 version 应对齐,此处为占位)。"""
payload = {
"skill_slug": SKILL_SLUG,
"version": "0.1.0",
"entry": "skill_main.py",
}
print(json.dumps(payload, ensure_ascii=False))
return 0
def cmd_health(_args: argparse.Namespace) -> int:
"""
健康检查:应快速、可离线(除非技能本身强依赖网络)。
失败时打印 ERROR: 前缀,便于宿主与自动化解析。
"""
# 示例:检查 Python 版本(可按需改为检查关键依赖 import
if sys.version_info < (3, 9):
print("ERROR:PYTHON_VERSION need >= 3.9", file=sys.stderr)
return 1
print(f"OK skill={SKILL_SLUG} python={sys.version.split()[0]}")
return 0
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Claw skill CLI template — replace SKILL_SLUG and add subcommands.",
)
sub = p.add_subparsers(dest="command", required=True)
sp = sub.add_parser("version", help="Print version JSON.")
sp.set_defaults(handler=cmd_version)
sp = sub.add_parser("health", help="Quick health check.")
sp.set_defaults(handler=cmd_health)
return p
def main(argv: Optional[List[str]] = None) -> int:
argv = argv if argv is not None else sys.argv[1:]
parser = build_parser()
args = parser.parse_args(argv)
handler: Callable[[argparse.Namespace], int] = args.handler
return handler(args)
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,12 @@
"""argparse 中文错误说明。"""
from __future__ import annotations
import argparse
import sys
class ZhArgumentParser(argparse.ArgumentParser):
def error(self, message: str) -> None:
print(f"参数错误:{message}\n请执行python main.py -h 查看帮助", file=sys.stderr)
self.exit(2)

View File

@@ -0,0 +1,5 @@
"""技能标识与版本(复制后请修改)。"""
SKILL_SLUG = "your-skill-slug"
SKILL_VERSION = "1.0.0"
LOG_LOGGER_NAME = "openclaw.skill.your_skill_slug"

View File

@@ -0,0 +1,21 @@
"""Re-export unified logging (implementation: jiangchang_skill_core.unified_logging)."""
from jiangchang_skill_core.unified_logging import (
attach_unified_file_handler,
ensure_trace_for_process,
get_skill_log_file_path,
get_skill_logger,
get_unified_logs_dir,
setup_skill_logging,
subprocess_env_with_trace,
)
__all__ = [
"attach_unified_file_handler",
"ensure_trace_for_process",
"get_skill_log_file_path",
"get_skill_logger",
"get_unified_logs_dir",
"setup_skill_logging",
"subprocess_env_with_trace",
]

View File

@@ -0,0 +1,34 @@
"""数据根、技能目录、兄弟技能根路径。"""
from __future__ import annotations
import os
from jiangchang_skill_core.runtime_env import get_data_root, get_sibling_skills_root, get_user_id
from util.constants import SKILL_SLUG
_SCRIPTS_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def get_skill_root() -> str:
return os.path.dirname(_SCRIPTS_DIR)
def get_openclaw_root() -> str:
return get_sibling_skills_root(_SCRIPTS_DIR)
def get_skills_root() -> str:
return get_sibling_skills_root(_SCRIPTS_DIR)
def get_skill_data_dir() -> str:
path = os.path.join(get_data_root(), get_user_id(), SKILL_SLUG)
os.makedirs(path, exist_ok=True)
return path
def get_db_path(filename: str | None = None) -> str:
name = filename or f"{SKILL_SLUG}.db"
return os.path.join(get_skill_data_dir(), name)

20
scripts/util/timeutil.py Normal file
View File

@@ -0,0 +1,20 @@
"""时间戳与 ISO 展示。"""
from __future__ import annotations
import time
from datetime import datetime
from typing import Optional
def now_unix() -> int:
return int(time.time())
def unix_to_iso(ts: Optional[int]) -> Optional[str]:
if ts is None:
return None
try:
return datetime.fromtimestamp(int(ts)).isoformat(timespec="seconds")
except (ValueError, OSError, OverflowError):
return None