""" 统一文件日志:{DATA_ROOT}/{USER_ID}/logs/jiangchang.log 按日轮转;行内带 trace_id 与 skill_slug,便于跨技能排查。 """ from __future__ import annotations import logging import os import sys import uuid from logging.handlers import TimedRotatingFileHandler from typing import Optional from .runtime_env import get_data_root, get_user_id _skill_slug: str = "" _logger_name: str = "" def get_unified_logs_dir() -> str: path = os.path.join(get_data_root(), get_user_id(), "logs") os.makedirs(path, exist_ok=True) return path def get_skill_log_file_path() -> str: override = (os.getenv("JIANGCHANG_LOG_FILE") or "").strip() if override: parent = os.path.dirname(os.path.abspath(override)) if parent: os.makedirs(parent, exist_ok=True) return os.path.abspath(override) return os.path.join(get_unified_logs_dir(), "jiangchang.log") def ensure_trace_for_process() -> str: existing = (os.getenv("JIANGCHANG_TRACE_ID") or "").strip() if existing: return existing tid = uuid.uuid4().hex[:12] os.environ["JIANGCHANG_TRACE_ID"] = tid return tid def subprocess_env_with_trace(environ: Optional[dict] = None) -> dict: ensure_trace_for_process() tid = (os.getenv("JIANGCHANG_TRACE_ID") or "").strip() or ensure_trace_for_process() base = os.environ if environ is None else environ return {**base, "JIANGCHANG_TRACE_ID": tid} class _SkillContextFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: record.trace_id = (os.getenv("JIANGCHANG_TRACE_ID") or "").strip() or "-" record.skill_slug = _skill_slug or "-" return True _FORMAT = "%(asctime)s | %(levelname)-8s | %(trace_id)s | %(skill_slug)s | %(name)s | %(message)s" _DATEFMT = "%Y-%m-%dT%H:%M:%S" def _log_level_from_env() -> int: v = (os.getenv("JIANGCHANG_LOG_LEVEL") or "INFO").strip().upper() return getattr(logging, v, None) or logging.INFO def _backup_count() -> int: try: n = int((os.getenv("JIANGCHANG_LOG_BACKUP_COUNT") or "30").strip()) return max(1, min(n, 365)) except ValueError: return 30 def setup_skill_logging(skill_slug: str, logger_name: str) -> None: global _skill_slug, _logger_name ensure_trace_for_process() _skill_slug = skill_slug _logger_name = logger_name log = logging.getLogger(logger_name) if log.handlers: return log.setLevel(_log_level_from_env()) path = get_skill_log_file_path() fh = TimedRotatingFileHandler( path, when="midnight", interval=1, backupCount=_backup_count(), encoding="utf-8", delay=True, ) fmt = logging.Formatter(_FORMAT, datefmt=_DATEFMT) fh.setFormatter(fmt) fh.addFilter(_SkillContextFilter()) log.addHandler(fh) if (os.getenv("JIANGCHANG_LOG_TO_STDERR") or "").strip().lower() in ("1", "true", "yes", "on"): sh = logging.StreamHandler(sys.stderr) sh.setLevel(logging.WARNING) sh.setFormatter(fmt) sh.addFilter(_SkillContextFilter()) log.addHandler(sh) log.propagate = False def get_skill_logger() -> logging.Logger: if not _logger_name: raise RuntimeError("get_skill_logger: call setup_skill_logging first") return logging.getLogger(_logger_name) def attach_unified_file_handler( log_path: str, *, skill_slug: str, logger_name: str, level: int = logging.DEBUG, ) -> logging.Logger: global _skill_slug ensure_trace_for_process() _skill_slug = skill_slug lg = logging.getLogger(logger_name) lg.handlers.clear() lg.setLevel(level) parent = os.path.dirname(os.path.abspath(log_path)) if parent: os.makedirs(parent, exist_ok=True) fh = logging.FileHandler(log_path, encoding="utf-8") fmt = logging.Formatter(_FORMAT, datefmt=_DATEFMT) fh.setFormatter(fmt) fh.addFilter(_SkillContextFilter()) lg.addHandler(fh) lg.propagate = False return lg