diff --git a/sdk/jiangchang-desktop-sdk/pyproject.toml b/sdk/jiangchang-desktop-sdk/pyproject.toml index 52ee397..0788a52 100644 --- a/sdk/jiangchang-desktop-sdk/pyproject.toml +++ b/sdk/jiangchang-desktop-sdk/pyproject.toml @@ -4,10 +4,20 @@ build-backend = "setuptools.build_meta" [project] name = "jiangchang-desktop-sdk" -version = "0.1.0" -description = "匠厂桌面应用自动化测试 SDK" +version = "0.4.0" +description = "匠厂桌面应用自动化测试 SDK + 共享 pytest plugin" requires-python = ">=3.10" dependencies = ["playwright>=1.42.0"] +[project.optional-dependencies] +testing = ["pytest>=7.0"] + +# 让 pytest 在 pip install 后自动发现并加载 jiangchang_desktop_sdk.testing.plugin。 +# 未 pip install 时(开发态 sys.path 注入),skill 的 conftest 可改用 +# pytest_plugins = ["jiangchang_desktop_sdk.testing.plugin"] +# 二者等价。 +[project.entry-points.pytest11] +jiangchang_desktop_sdk_testing = "jiangchang_desktop_sdk.testing.plugin" + [tool.setuptools.packages.find] where = ["src"] diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/PKG-INFO b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/PKG-INFO new file mode 100644 index 0000000..3d3ed12 --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/PKG-INFO @@ -0,0 +1,6 @@ +Metadata-Version: 2.4 +Name: jiangchang-desktop-sdk +Version: 0.1.0 +Summary: 匠厂桌面应用自动化测试 SDK +Requires-Python: >=3.10 +Requires-Dist: playwright>=1.42.0 diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/SOURCES.txt b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/SOURCES.txt new file mode 100644 index 0000000..aa9f444 --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/SOURCES.txt @@ -0,0 +1,11 @@ +pyproject.toml +src/jiangchang_desktop_sdk/__init__.py +src/jiangchang_desktop_sdk/client.py +src/jiangchang_desktop_sdk/exceptions.py +src/jiangchang_desktop_sdk/types.py +src/jiangchang_desktop_sdk.egg-info/PKG-INFO +src/jiangchang_desktop_sdk.egg-info/SOURCES.txt +src/jiangchang_desktop_sdk.egg-info/dependency_links.txt +src/jiangchang_desktop_sdk.egg-info/requires.txt +src/jiangchang_desktop_sdk.egg-info/top_level.txt +tests/test_client.py \ No newline at end of file diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/dependency_links.txt b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/requires.txt b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/requires.txt new file mode 100644 index 0000000..1299365 --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/requires.txt @@ -0,0 +1 @@ +playwright>=1.42.0 diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/top_level.txt b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/top_level.txt new file mode 100644 index 0000000..bea7eb9 --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk.egg-info/top_level.txt @@ -0,0 +1 @@ +jiangchang_desktop_sdk diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/__init__.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/__init__.py index ed58fb8..64b2aef 100644 --- a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/__init__.py +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/__init__.py @@ -1,15 +1,25 @@ from .client import JiangchangDesktopClient from .types import JiangchangMessage, AskOptions, LaunchOptions, AssertOptions -from .exceptions import AppNotFoundError, TimeoutError, AssertError +from .exceptions import ( + AppNotFoundError, + ConnectionError, + TimeoutError, + AssertError, + LaunchError, + GatewayDownError, +) __all__ = [ "JiangchangDesktopClient", "JiangchangMessage", "AskOptions", - "LaunchOptions", + "LaunchOptions", "AssertOptions", "AppNotFoundError", + "ConnectionError", "TimeoutError", "AssertError", + "LaunchError", + "GatewayDownError", ] -__version__ = "0.1.0" +__version__ = "0.4.0" diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/client.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/client.py index 990c48f..4d334a2 100644 --- a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/client.py +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/client.py @@ -1,189 +1,302 @@ """ JiangchangDesktopClient — 匠厂桌面应用自动化测试 SDK + +设计要点 +-------- +1. 激活方式:优先用 `jiangchang://` 自定义协议(由安装器/开发态 main.ts 注册), + 拉起已安装的桌面应用或聚焦已运行实例;失败时再回退到直接运行可执行文件。 +2. 拟人输入:使用 `locator.press_sequentially(..., delay=...)` 逐字符键入并按回车发送, + 触发应用内 IME / onChange / Composition 事件,与真实用户一致。 +3. 每次提问前新建任务(点击侧栏 `data-jcid="new-task-button"`),避免上下文污染。 +4. **v2.0.15 UI 模型适配**(重要): + 从匠厂 v2.0.15 开始,单轮对话中间的 thinking / tool_use 消息被折叠进 + `ExecutionGraphCard`(`src/pages/Chat/index.tsx` 里的 `foldedNarrationIndices`)。 + 这些 DOM 节点 **不再带 `data-jcid="message"`**,而是 `data-testid="chat-execution-graph"` + / `chat-execution-step`。同时在多用户历史共享 session 的情况下,`新建任务` 后 + Gateway 可能把整段历史载回消息列表 —— 单靠"最后一条 assistant"选答复会被历史污染。 + 因此完成/选答策略改为: + · **锚点**:`ask()` 发送后等到最后一条 `[data-jcid-role="user"]` 节点(就是我们刚发的 + 问题)在 DOM 里出现,记住它在 `[data-jcid="message"]` 列表里的 index,作为切片起点; + · **完成判据**(同时满足,并稳定 `stable_seconds` 秒): + · `chat-root[data-jcid-sending] === 'false'` + · `window.__jc_sending__` !== true + · 不存在 `[data-testid="chat-execution-graph"][data-collapsed="false"]` + (自动化场景下不会手动展开旧图,因此任何未收起的图都意味着本轮仍在执行) + · 我们这条 user 之后存在 `[data-jcid-role="assistant"]` 且其内部有 + `[data-jcid="message-body"]` + · 该 body 文本连续 `stable_seconds` 秒无变化 + · **选答**:仅在我们 user 节点之后的 assistant 里挑"有 message-body"的最后一条。 +5. read() 严格按 `data-jcid-role` 分类,没有该属性的 DOM 节点不再默认归为 assistant。 """ -import os -import time +from __future__ import annotations + import json -import urllib.request -import signal +import logging +import os import re -from typing import Optional, List +import shutil +import signal +import subprocess +import time +import urllib.error +import urllib.request +from typing import List, Optional -from playwright.sync_api import sync_playwright, Browser, Page +from playwright.sync_api import Browser, Page, sync_playwright -from .types import JiangchangMessage, AskOptions, LaunchOptions, AssertOptions -from .exceptions import AppNotFoundError, ConnectionError, TimeoutError as JcTimeoutError, AssertError, LaunchError +from .exceptions import ( + AppNotFoundError, + AssertError, + ConnectionError, + GatewayDownError, + LaunchError, + TimeoutError as JcTimeoutError, +) +from .types import AskOptions, AssertOptions, JiangchangMessage, LaunchOptions + +# ── 调试日志 ─────────────────────────────────────────────────────── +_logger = logging.getLogger("jiangchang-desktop-sdk") +_handler_added = False + + +def _ensure_logger() -> None: + global _handler_added + if not _handler_added: + _handler_added = True + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + ch.setFormatter( + logging.Formatter( + "[%(asctime)s] %(levelname)s %(name)s %(message)s", + datefmt="%H:%M:%S", + ) + ) + _logger.addHandler(ch) + _logger.setLevel(logging.DEBUG) class JiangchangDesktopClient: - """ - 主要自动化客户端。 - - 用法示例: - - ```python - client = JiangchangDesktopClient() - # 连接到已运行的匠厂桌面应用(通过 CDP,端口 9222) - client.connect() - - # 提问并等待回复 - answer = client.ask("帮我计算,上海到洛杉矶一个40HQ的海运费") - print(answer) - - # 断言回复内容 - client.assert_contains("USD") - - client.disconnect() - ``` - """ + """匠厂桌面应用自动化客户端。""" - def __init__(self): - self._playwright = None # playwright.sync_playwright 实例 + def __init__(self) -> None: + _ensure_logger() + self._playwright = None self._browser: Optional[Browser] = None self._page: Optional[Page] = None self._connected = False - self._owns_browser = False # 是否由我们启动的浏览器 - self._electron_process = None # subprocess.Popen 启动的 Electron 进程 + self._owns_browser = False + self._electron_process: Optional[subprocess.Popen] = None - def _get_default_executable_path(self) -> str: - """从环境变量获取 Electron 可执行文件路径""" + # ─── 配置解析 ───────────────────────────────────────────────── + + def _get_default_executable_path(self) -> Optional[str]: path = os.environ.get("JIANGCHANG_E2E_APP_PATH") - if not path: - raise AppNotFoundError( - "未找到 Electron 可执行文件路径。" - "请设置环境变量 JIANGCHANG_E2E_APP_PATH," - "或调用 launch_app() 时传入 executable_path 参数。" - ) - return path + if path and os.path.exists(path): + return path + # 兼容旧环境:尝试从 JIANGCHANG_APP_ROOT 推断 + root = os.environ.get("JIANGCHANG_APP_ROOT") + if root: + for candidate in ("Jiangchang.exe", "jiangchang.exe", "匠厂.exe"): + p = os.path.join(root, candidate) + if os.path.exists(p): + return p + return None def _get_default_cdp_port(self) -> int: - """从环境变量获取 CDP 端口,默认 9222""" - port = os.environ.get("JIANGCHANG_E2E_CDP_PORT", "9222") - return int(port) + return int(os.environ.get("JIANGCHANG_E2E_CDP_PORT", "9222")) - def _discover_main_page_ws_url(self, cdp_port: int) -> str: - """ - 通过 CDP HTTP 端点发现匠厂主窗口的 WebSocket 调试 URL。 - CDP HTTP 端点格式:http://localhost:9222/json 返回目标列表, - 每个目标包含 webSocketDebuggerUrl 字段。 - """ - json_url = f"http://localhost:{cdp_port}/json" + # ─── CDP 发现与连接 ────────────────────────────────────────── + + def _cdp_alive(self, cdp_port: int) -> bool: try: - with urllib.request.urlopen(json_url, timeout=10) as resp: - targets = json.load(resp) + with urllib.request.urlopen( + f"http://localhost:{cdp_port}/json/version", timeout=2 + ) as resp: + json.load(resp) + return True + except Exception: + return False + + def _discover_browser_ws_url(self, cdp_port: int) -> str: + version_url = f"http://localhost:{cdp_port}/json/version" + try: + with urllib.request.urlopen(version_url, timeout=10) as resp: + version_info = json.load(resp) except urllib.error.URLError as exc: raise ConnectionError( - f"无法访问 CDP 端点 {json_url}:{exc}。" + f"无法访问 CDP 端点 {version_url}:{exc}。" "请确认匠厂应用正在运行,并使用了 --remote-debugging-port=9222 参数。" ) from exc - if not targets: - raise ConnectionError( - f"CDP 端点 {json_url} 没有找到任何调试目标。" - "请确认匠厂应用正在运行,并使用了 --remote-debugging-port=9222 参数。" - ) - - # 优先选择标题含"匠厂"、"OpenClaw"、"ClawX"的页面 - for t in targets: - title = t.get("title", "") - if any(kw in title for kw in ["匠厂", "OpenClaw", "ClawX"]): - ws_url = t.get("webSocketDebuggerUrl") - if ws_url: - return ws_url - - # 没有匹配就用第一个可用目标 - first = targets[0] - ws_url = first.get("webSocketDebuggerUrl") + ws_url = version_info.get("webSocketDebuggerUrl") if not ws_url: - raise ConnectionError("目标页面没有 WebSocket 调试 URL") + raise ConnectionError( + f"CDP /json/version 响应中没有 webSocketDebuggerUrl 字段。返回:{version_info}" + ) return ws_url - def connect(self, url: Optional[str] = None) -> None: + def _find_main_page(self) -> Page: + all_pages: List[Page] = [] + for ctx in self._browser.contexts: # type: ignore[union-attr] + all_pages.extend(ctx.pages) + if not all_pages: + raise ConnectionError("CDP 连接成功,但没有找到任何页面") + + keywords = ["匠厂", "Jiangchang", "OpenClaw", "ClawX"] + for page in all_pages: + try: + title = page.title() + except Exception: + title = "" + if any(kw in title for kw in keywords): + return page + + for page in all_pages: + if "devtools://" not in page.url: + return page + return all_pages[0] + + # ─── 应用激活(协议优先) ──────────────────────────────────── + + def _activate_via_protocol(self) -> bool: + """通过 jiangchang:// 协议拉起或聚焦应用。成功返回 True。""" + try: + if os.name == "nt": + # `start` 是 cmd 内置命令;用 `cmd /c start "" "jiangchang://activate"` 交给 Windows Shell + subprocess.run( + ["cmd", "/c", "start", "", "jiangchang://activate"], + check=False, + timeout=5, + ) + _logger.debug("[activate] 已通过 jiangchang:// 协议发送激活请求") + return True + elif os.name == "posix": + # macOS: open -a;Linux: xdg-open + if shutil.which("open"): + subprocess.run(["open", "jiangchang://activate"], check=False, timeout=5) + return True + if shutil.which("xdg-open"): + subprocess.run(["xdg-open", "jiangchang://activate"], check=False, timeout=5) + return True + except Exception as exc: + _logger.warning("[activate] 协议激活失败:%s", exc) + return False + + def _launch_via_executable(self, cdp_port: int) -> bool: + """回退:直接 spawn 可执行文件,带 --remote-debugging-port。""" + exe = self._get_default_executable_path() + if not exe: + _logger.warning("[activate] 未配置 JIANGCHANG_E2E_APP_PATH,无法回退直接启动") + return False + try: + flags = 0 + if os.name == "nt": + flags = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] + self._electron_process = subprocess.Popen( + [exe, f"--remote-debugging-port={cdp_port}"], + creationflags=flags, + ) + _logger.debug("[activate] 已直接启动可执行文件:%s pid=%s", exe, self._electron_process.pid) + return True + except Exception as exc: + _logger.error("[activate] 直接启动失败:%s", exc) + return False + + def ensure_app_running(self, wait_timeout_s: float = 30.0) -> None: """ - 连接到已运行的匠厂桌面应用(通过 Chrome DevTools Protocol)。 - - url: 可选,CDP HTTP 端点,默认 http://localhost:9222 - 也可通过环境变量 JIANGCHANG_E2E_CDP_PORT 设置端口 + 保证匠厂桌面应用已运行并开放 CDP。 + 顺序:1) 已开 CDP → 直接连接;2) 协议激活;3) 直接启动可执行文件。 """ - if self._connected: + cdp_port = self._get_default_cdp_port() + if self._cdp_alive(cdp_port): + _logger.debug("[ensure_app_running] CDP 已就绪,直接连接") + self.connect() + self.bring_to_front() return + _logger.info("[ensure_app_running] CDP 未就绪,尝试协议激活") + activated = self._activate_via_protocol() + if not activated: + _logger.info("[ensure_app_running] 协议激活不可用,回退直接启动可执行文件") + self._launch_via_executable(cdp_port) + + deadline = time.time() + wait_timeout_s + while time.time() < deadline: + if self._cdp_alive(cdp_port): + _logger.debug("[ensure_app_running] CDP 就绪") + self.connect() + self.bring_to_front() + return + time.sleep(0.5) + raise LaunchError( + f"等待匠厂 CDP 就绪超时({wait_timeout_s}s)。" + "请手动启动匠厂桌面应用,或检查 jiangchang:// 协议注册与 JIANGCHANG_E2E_APP_PATH 配置。" + ) + + def bring_to_front(self) -> None: + """把主窗口聚焦到最前。""" + try: + page = self.get_page() + page.bring_to_front() + except Exception as exc: + _logger.debug("[bring_to_front] 失败:%s", exc) + + # ─── 连接 / 断开 ────────────────────────────────────────────── + + def connect(self, url: Optional[str] = None) -> None: + if self._connected: + return cdp_port = self._get_default_cdp_port() - + _logger.debug("[connect] CDP 端口=%d", cdp_port) + try: self._playwright = sync_playwright().start() except Exception as exc: raise LaunchError(f"启动 Playwright 失败: {exc}") from exc try: - # 发现 WebSocket URL - ws_url = self._discover_main_page_ws_url(cdp_port) - - # 通过 WebSocket 连接到已有浏览器 + ws_url = self._discover_browser_ws_url(cdp_port) self._browser = self._playwright.chromium.connect_over_cdp(ws_url) - - # 获取页面 - contexts = self._browser.contexts - if not contexts: - raise ConnectionError("CDP 连接成功,但没有找到浏览器上下文") - - pages = contexts[0].pages - if not pages: - raise ConnectionError("CDP 连接成功,但没有找到页面") - - self._page = pages[0] + self._page = self._find_main_page() self._connected = True self._owns_browser = False - - except ConnectionError: - self._cleanup() - raise + _logger.debug("[connect] 主窗口 URL:%s", self._page.url) except Exception as exc: - self._cleanup() - raise ConnectionError(f"连接失败: {exc}") from exc + _logger.error("[connect] 失败:%s", exc) + if self._playwright is not None: + try: + self._playwright.stop() + except Exception: + pass + self._playwright = None + self._browser = None + self._page = None + self._connected = False + raise ConnectionError(f"连接匠厂桌面应用失败: {exc}") from exc def launch_app(self, options: Optional[LaunchOptions] = None) -> None: - """ - 启动匠厂桌面应用(直接运行 Electron 可执行文件)。 - - options: 启动选项。如果 executable_path 未指定, - 从环境变量 JIANGCHANG_E2E_APP_PATH 读取。 - """ + """保留旧 API 兼容:直接启动可执行文件后连接。""" if self._connected: self.disconnect() - opts = options or LaunchOptions() executable_path = opts.executable_path or self._get_default_executable_path() - cdp_port = opts.cdp_port - startup_timeout = opts.startup_timeout - - if not os.path.exists(executable_path): - raise AppNotFoundError(f"Electron 可执行文件不存在: {executable_path}") - + if not executable_path or not os.path.exists(executable_path): + raise AppNotFoundError( + f"Electron 可执行文件不存在: {executable_path!r}。请设置 JIANGCHANG_E2E_APP_PATH。" + ) try: self._playwright = sync_playwright().start() except Exception as exc: raise LaunchError(f"启动 Playwright 失败: {exc}") from exc - try: - # 直接启动匠厂 Electron 程序,带远程调试端口 - import subprocess + flags = subprocess.CREATE_NEW_PROCESS_GROUP if os.name == "nt" else 0 # type: ignore[attr-defined] self._electron_process = subprocess.Popen( - [executable_path, f"--remote-debugging-port={cdp_port}"], - detached=True, - creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == "nt" else 0, + [executable_path, f"--remote-debugging-port={opts.cdp_port}"], + creationflags=flags, ) - - # 等应用启动 time.sleep(3) - - # 再通过 CDP 连接上去 self.connect() - self._owns_browser = False - - # 等待页面加载 - self._wait_for_page_ready(startup_timeout) - + self._wait_for_page_ready(opts.startup_timeout) except AppNotFoundError: self._cleanup() raise @@ -192,265 +305,610 @@ class JiangchangDesktopClient: raise LaunchError(f"启动应用失败: {exc}") from exc def _wait_for_page_ready(self, timeout_ms: int) -> None: - """等待页面有标题(说明主窗口已加载)""" - start = time.time() - deadline = start + timeout_ms / 1000 - + deadline = time.time() + timeout_ms / 1000 while time.time() < deadline: if self._page and self._page.title(): return time.sleep(0.5) - raise JcTimeoutError(f"页面加载超时({timeout_ms}ms)") def _cleanup(self) -> None: - """清理所有资源""" - # 如果是我们启动的 Electron 进程,先关闭它 - if self._electron_process is not None: - try: - if os.name == "nt": - # Windows:发送 CTRL_BREAK_EVENT 然后 terminate - self._electron_process.send_signal(signal.CTRL_BREAK_EVENT if hasattr(signal, 'CTRL_BREAK_EVENT') else None) - self._electron_process.wait(timeout=5) - self._electron_process.terminate() - self._electron_process.wait(timeout=5) - except Exception: - try: - self._electron_process.kill() - except Exception: - pass - self._electron_process = None + """ + 仅断开 CDP 连接,**不关闭**匠厂主窗口 / Electron 进程。 + 自动化是 attach 模式:page 归匠厂所有,我们无权关掉它 —— 关掉会触发 + window-all-closed → app.quit(),直接把匠厂杀掉。 + 只有当 self._owns_browser=True(即我们自己 spawn 的 Electron)时, + 才终止我们启动的进程。 + """ + # Attach 模式:只释放 Playwright 句柄,不碰 page / browser 的生命周期 + # 把 page 引用置空,防御后续误用 + self._page = None - if self._page: - try: - self._page.close() - except Exception: - pass - self._page = None - if self._browser: try: - if self._owns_browser: - self._browser.close() - else: - self._browser.disconnect() + # 无论是否 owns_browser,disconnect 都只断 WS 连接,不关浏览器窗口 + self._browser.disconnect() except Exception: pass self._browser = None - + if self._playwright: try: self._playwright.stop() except Exception: pass self._playwright = None - - self._connected = False + + # 仅当是我们自己 spawn 的 Electron,才允许终止它 + if self._owns_browser and self._electron_process is not None: + try: + if os.name == "nt" and hasattr(signal, "CTRL_BREAK_EVENT"): + try: + self._electron_process.send_signal(signal.CTRL_BREAK_EVENT) # type: ignore[attr-defined] + self._electron_process.wait(timeout=3) + except Exception: + pass + self._electron_process.terminate() + self._electron_process.wait(timeout=3) + except Exception: + try: + self._electron_process.kill() + except Exception: + pass self._electron_process = None + self._connected = False + + def disconnect(self) -> None: + if not self._connected: + return + self._cleanup() + + def is_connected(self) -> bool: + return self._connected + + def get_page(self) -> Page: + if not self._page: + raise ConnectionError("未连接到应用,请先调用 connect() 或 ensure_app_running()") + return self._page # ─── 辅助选择器 ─────────────────────────────────────────────── def _get_chat_input_locator(self): - """获取聊天输入框 locator,优先用 data-jcid,回退到类名/位置选择器""" page = self.get_page() - # 优先:data-jcid="chat-input" locator = page.locator('[data-jcid="chat-input"]') if locator.count() > 0: return locator - # Fallback:查找 textarea(通常在 chat 输入区) - locator = page.locator('textarea').last + locator = page.locator("textarea").last if locator.count() > 0: return locator - raise ConnectionError("找不到聊天输入框(textarea),请确认 data-jcid 属性已添加") + raise ConnectionError("找不到聊天输入框(textarea),请确认匠厂已加 data-jcid=\"chat-input\"") def _get_send_button_locator(self): - """获取发送按钮 locator""" page = self.get_page() locator = page.locator('[data-jcid="send-button"]') if locator.count() > 0: return locator - # Fallback:找 type="submit" 的 Button,或 textarea 同级的发送按钮 locator = page.locator('button[type="submit"]') if locator.count() > 0: return locator - # Fallback:找最后一个主要操作按钮(排除 icon-only 小按钮) - locator = page.locator('button:not([aria-label])').last + raise ConnectionError("找不到发送按钮,请确认匠厂已加 data-jcid=\"send-button\"") + + def _get_new_task_button_locator(self): + page = self.get_page() + locator = page.locator('[data-jcid="new-task-button"]') if locator.count() > 0: return locator - raise ConnectionError("找不到发送按钮,请确认 data-jcid 属性已添加") + # 回退:旧版本 data-testid + locator = page.locator('[data-testid="sidebar-new-chat"]') + if locator.count() > 0: + return locator + # 最后兜底:文本"新建任务" + locator = page.get_by_text("新建任务", exact=False).first + return locator def _get_message_list_locator(self): - """获取消息列表容器 locator""" page = self.get_page() locator = page.locator('[data-jcid="message-list"]') if locator.count() > 0: return locator - # Fallback:找消息容器(class 含 space-y 或 max-w 的 div) - locator = page.locator('[class*="space-y-3"]') + locator = page.locator('[class*="space-y-3"]').first if locator.count() > 0: return locator - # Fallback:找 chat 主区域 - locator = page.locator('main, [class*="chat"]').first - if locator.count() > 0: - return locator - raise ConnectionError("找不到消息列表容器") + raise ConnectionError("找不到消息列表容器 data-jcid=\"message-list\"") - # ─── 核心交互方法 ───────────────────────────────────────────── + # ─── 新建任务 ───────────────────────────────────────────────── - def _wait_for_streaming_done(self, timeout: int = 120000) -> None: - """ - 等待 AI 流式输出完成。 - - 优先检测 window.__jc_sending__ 标志(需要 UI 注入), - 回退到轮询消息列表内容变化。 - """ + def new_task(self) -> None: + """点击"新建任务"按钮,等待消息列表清空或欢迎屏出现。""" + if not self._connected: + raise ConnectionError("未连接到应用") page = self.get_page() + btn = self._get_new_task_button_locator() + try: + btn.wait_for(state="visible", timeout=5000) + btn.click() + except Exception as exc: + _logger.warning("[new_task] 点击失败:%s", exc) + return - # 策略 1:window.__jc_sending__ 标志(由 UI runtime 注入) + # 等待 chat-root.message-count=0 或 welcome-screen 出现,最多 5s + deadline = time.time() + 5 + while time.time() < deadline: + try: + count = page.locator('[data-jcid="chat-root"]').first.get_attribute("data-jcid-message-count") + if count == "0": + _logger.debug("[new_task] 已进入新任务(message-count=0)") + return + if page.locator('[data-jcid="welcome-screen"]').count() > 0: + _logger.debug("[new_task] 欢迎屏已出现") + return + except Exception: + pass + time.sleep(0.2) + _logger.debug("[new_task] 未检测到新任务指标,继续") + + # ─── 网关健康检查 ───────────────────────────────────────────── + + def _gateway_state(self) -> str: + try: + page = self.get_page() + el = page.locator('[data-jcid="gateway-status"]').first + if el.count() == 0: + return "unknown" + return el.get_attribute("data-state") or "unknown" + except Exception: + return "unknown" + + def wait_gateway_ready(self, timeout_ms: int = 30000) -> None: + page = self.get_page() try: page.wait_for_function( - "() => window.__jc_sending__ === false", - timeout=timeout, + """() => { + const el = document.querySelector('[data-jcid="gateway-status"]'); + return el && el.dataset.state === 'running'; + }""", + timeout=timeout_ms, ) - return + except Exception as exc: + _logger.warning("[wait_gateway_ready] 超时/异常:%s(继续,但后续可能报 GatewayDown)", exc) + + # ─── 核心:等待流式输出完成 ─────────────────────────────────── + + # 下面这些辅助方法围绕 **"我们刚发出的 user 消息节点"** 这个锚点来切片 + # DOM。之所以不再用"assistant 总数变化 > baseline"的老逻辑,是因为 v2.0.15 + # 在 `新建任务` 后 Gateway 可能把整段历史载回消息列表 —— 此时 assistant 总数 + # 会瞬间变大,但那些不是我们这一轮的回复。 + + def _message_node_count(self) -> int: + try: + return self.get_page().locator('[data-jcid="message"]').count() except Exception: - pass # 标志不存在,尝试策略 2 + return 0 - # 策略 2:轮询消息列表内容,直到连续 3 次无变化(稳定) - last_content = "" - stable_count = 0 - stable_threshold = 3 - poll_interval = 2.0 # 秒 - max_wait = timeout / 1000 + def _last_user_node_index(self) -> int: + """返回 `[data-jcid="message"]` 节点列表中最后一条 role=user 的 index;找不到返回 -1。""" + try: + page = self.get_page() + total = page.locator('[data-jcid="message"]').count() + for i in range(total - 1, -1, -1): + node = page.locator('[data-jcid="message"]').nth(i) + role = node.get_attribute("data-jcid-role") or "" + if role.lower() == "user": + return i + return -1 + except Exception: + return -1 - start = time.time() - while time.time() - start < max_wait: - try: - msg_list = self._get_message_list_locator() - content = msg_list.inner_text() - except Exception: - content = "" + def _user_node_matches(self, user_idx: int, question: str) -> bool: + """校验 index=user_idx 的节点文本是否与 question 基本一致(用于防止我们抓错了历史 user)。 + 采用子串 / 反向子串包含,容忍匠厂对 user 气泡的装饰字符(如时间戳)。 + """ + if user_idx < 0 or not question: + return False + try: + page = self.get_page() + node = page.locator('[data-jcid="message"]').nth(user_idx) + text = node.inner_text(timeout=2000) or "" + t = re.sub(r"\s+", "", text) + q = re.sub(r"\s+", "", question) + return q in t or t in q + except Exception: + return False - if content == last_content: - stable_count += 1 - if stable_count >= stable_threshold: - return # 连续稳定则认为完成 + def _latest_assistant_after(self, user_idx: int) -> Optional[dict]: + """在 user_idx 之后找最后一条 assistant 消息节点,返回其快照: + { + "index": int, + "has_body": bool, + "body_text": str, # 只有 has_body 才非空 + "streaming": bool, # data-jcid-streaming 属性 + } + 找不到返回 None。 + """ + if user_idx < 0: + return None + try: + page = self.get_page() + nodes = page.locator('[data-jcid="message"]') + total = nodes.count() + for i in range(total - 1, user_idx, -1): + node = nodes.nth(i) + role = (node.get_attribute("data-jcid-role") or "").lower() + if role != "assistant": + continue + body = node.locator('[data-jcid="message-body"]') + has_body = body.count() > 0 + body_text = "" + if has_body: + try: + body_text = body.first.inner_text(timeout=2000) or "" + except Exception: + body_text = "" + streaming = (node.get_attribute("data-jcid-streaming") or "false").lower() == "true" + return { + "index": i, + "has_body": has_body, + "body_text": body_text, + "streaming": streaming, + } + return None + except Exception: + return None + + def _active_execution_graph_count(self) -> int: + """未收起的 ExecutionGraphCard 数量。 + + 匠厂 v2.0.15 里: + - 活跃运行中:card.active=true → 默认展开 → `data-collapsed="false"` + - 运行完成:autoCollapsedRunKeys 自动收起 → `data-collapsed="true"` + 自动化场景下我们不会点开历史图,所以 `data-collapsed="false"` 的数量 > 0 + 就意味着"本轮仍在执行(thinking/tool 阶段)"。 + """ + try: + page = self.get_page() + return page.locator( + '[data-testid="chat-execution-graph"][data-collapsed="false"]' + ).count() + except Exception: + return 0 + + def _chat_root_sending(self) -> Optional[bool]: + """chat-root[data-jcid-sending] 的 bool 化;读不到返回 None。""" + try: + val = self.get_page().locator('[data-jcid="chat-root"]').first.get_attribute( + "data-jcid-sending" + ) + if val is None: + return None + return val.lower() == "true" + except Exception: + return None + + def _chat_root_message_count(self) -> Optional[int]: + try: + val = self.get_page().locator('[data-jcid="chat-root"]').first.get_attribute( + "data-jcid-message-count" + ) + if val is None: + return None + return int(val) + except Exception: + return None + + def _sending_flag(self) -> Optional[bool]: + """从 window.__jc_sending__ 读取当前 sending 状态;不可用返回 None。""" + try: + page = self.get_page() + val = page.evaluate("() => window.__jc_sending__") + if isinstance(val, bool): + return val + return None + except Exception: + return None + + def _any_sending_signal(self) -> bool: + """综合 3 路 sending 信号,**任意一路指示 true 就视为仍在发送中**。""" + if self._chat_root_sending() is True: + return True + if self._sending_flag() is True: + return True + if self._active_execution_graph_count() > 0: + return True + return False + + def _wait_for_streaming_done( + self, + user_msg_index: int, + timeout_ms: int = 120000, + stable_seconds: float = 3.0, + poll_interval: float = 0.5, + ) -> None: + """ + 等待 agent 整轮(可能包含多次 thinking / tool_use 往返)完成。 + + 核心判据 —— 只有 agent 最终答复才有 `message-body`(文本气泡): + 源码依据(D:\\AI\\jiangchang/src/stores/chat/helpers.ts): + - isToolOnlyMessage 显式允许 thinking 与 tool_use 共存仍视为中间轮 + - 中间工具轮的 ChatMessage 无 hasText → 不渲染 MessageBubble → + 也就不会有 `data-jcid="message-body"` + - 只有最终含 text 的助手消息才会渲染 message-body + + 因此可靠的完成条件为: + (A)最后一条 assistant message 存在 [data-jcid="message-body"]; + (B)其文本连续 `stable_seconds` 秒无变化; + (C)同时 window.__jc_sending__ 不是 true(允许 None / False,以容忍 + 早期 React 未挂载注入钩子的情况)。 + + 这样中间轮 sending 瞬间翻转 false → true 也不会被误判为结束。 + """ + _logger.debug( + "[wait_stream] 开始 timeout=%dms user_idx=%d stable=%.1fs", + timeout_ms, user_msg_index, stable_seconds, + ) + deadline = time.time() + timeout_ms / 1000 + + # 先尝试等到 sending 信号作为"本轮已启动"的确认(宽容 5s,拿不到就继续走主循环)。 + start_deadline = time.time() + 5.0 + while time.time() < start_deadline: + if self._any_sending_signal(): + _logger.debug("[wait_stream] 检测到 sending 信号,进入正式等待") + break + time.sleep(0.1) + + last_body_len = -1 + last_change_at = time.time() + last_progress_log = 0.0 + phase = "waiting-start" # waiting-start | tool-phase | body-growing + + while time.time() < deadline: + # 0) Gateway 健康 + gw = self._gateway_state() + if gw not in ("running", "unknown"): + raise GatewayDownError( + f"等待 AI 回复过程中检测到 Gateway 状态异常:{gw}。匠厂可能已崩溃或退出。" + ) + + sending_any = self._any_sending_signal() + chat_sending = self._chat_root_sending() + win_sending = self._sending_flag() + graph_active = self._active_execution_graph_count() + assistant = self._latest_assistant_after(user_msg_index) + + # 完成判据需同时满足: + # (1) 三路 sending 信号全部静默(chat-root / window / 未收起的 execution graph) + # (2) user_msg_index 之后存在 assistant 消息 + # (3) 该 assistant 存在 message-body + # (4) 该 assistant 的 data-jcid-streaming != 'true' + # (5) body 文本连续 stable_seconds 秒无变化 + can_check_stable = ( + not sending_any + and assistant is not None + and assistant["has_body"] + and not assistant["streaming"] + ) + + if not can_check_stable: + last_body_len = -1 + last_change_at = time.time() + new_phase = "waiting-start" if assistant is None else "tool-phase" + if new_phase != phase: + _logger.debug( + "[wait_stream] 阶段 → %s (chat_sending=%s win_sending=%s graph_active=%d" + " assistant_idx=%s has_body=%s streaming=%s)", + new_phase, + chat_sending, win_sending, graph_active, + None if assistant is None else assistant["index"], + None if assistant is None else assistant["has_body"], + None if assistant is None else assistant["streaming"], + ) + phase = new_phase else: - stable_count = 0 - last_content = content + cur_len = len(assistant["body_text"]) + if cur_len != last_body_len: + last_body_len = cur_len + last_change_at = time.time() + if phase != "body-growing": + _logger.debug( + "[wait_stream] 最终气泡稳定性观察开始 len=%d (user_idx=%d asst_idx=%d)", + cur_len, user_msg_index, assistant["index"], + ) + phase = "body-growing" + else: + elapsed = time.time() - last_change_at + if elapsed >= stable_seconds: + _logger.debug( + "[wait_stream] 稳定 %.1fs 判定完成 asst_idx=%d len=%d", + stable_seconds, assistant["index"], cur_len, + ) + return + + # 每 10s 打一条进度日志 + if time.time() - last_progress_log >= 10.0: + last_progress_log = time.time() + _logger.debug( + "[wait_stream] 进度 elapsed=%.1fs phase=%s chat_sending=%s win_sending=%s" + " graph_active=%d asst_idx=%s has_body=%s streaming=%s body_len=%d gw=%s", + time.time() - (deadline - timeout_ms / 1000), + phase, chat_sending, win_sending, graph_active, + None if assistant is None else assistant["index"], + None if assistant is None else assistant["has_body"], + None if assistant is None else assistant["streaming"], + max(last_body_len, 0), gw, + ) time.sleep(poll_interval) - raise JcTimeoutError(f"等待 AI 回复超时({timeout}ms)") + raise JcTimeoutError( + f"等待 AI 回复超时({timeout_ms}ms)。user_msg_index={user_msg_index} 最后阶段 phase={phase}。" + "可能原因:模型思考时间过长 / 工具链循环未结束 / UI 未挂载 message-body / Gateway 卡住。" + ) + + # ─── 核心交互:ask / read / assert ──────────────────────────── def wait_for_response(self, timeout: int = 120000) -> None: - """ - 等待当前 AI 回复完成。 - - timeout: 超时毫秒数,默认 120000(2分钟) - """ if not self._connected: raise ConnectionError("未连接到应用") - self._wait_for_streaming_done(timeout) + # 兼容旧 API:以"当前最后一条 user 节点"作为锚点 + user_idx = self._last_user_node_index() + if user_idx < 0: + raise ConnectionError("找不到任何 user 消息节点,无法建立等待锚点") + self._wait_for_streaming_done(user_msg_index=user_idx, timeout_ms=timeout) def ask(self, question: str, options: Optional[AskOptions] = None) -> str: """ - 向应用提问,等待回复完成,返回 assistant 回复文本。 - - 步骤: - 1. 清空并填写输入框 - 2. 点击发送按钮 - 3. 等待流式输出完成(__jc_sending__ 标志 或 轮询) - 4. 返回最新一条 assistant 消息内容 + 向应用提问,拟人化键入 + 回车发送,等待本轮 agent 完成,返回我们这一轮的最终 assistant 文本。 + + v2.0.15 脏 session 适配:**不再用 assistant 总数作 baseline**,而是锚定 + "我们刚发出的 user 节点在 [data-jcid=\"message\"] 列表里的 index"。后续所有 + 状态判定与答复抓取都只在这个 index 之后。 """ if not self._connected: - raise ConnectionError("未连接到应用,请先调用 connect() 或 launch_app()") + raise ConnectionError("未连接到应用,请先调用 ensure_app_running() / connect()") opts = options or AskOptions() - page = self.get_page() - # 1. 填写问题到输入框 + # 0. 新任务(可关) + if opts.new_task: + _logger.debug("[ask] 新建任务") + self.new_task() + + # 1. 记录发送前的锚点:当前 chat-root message-count 与当前最后 user 节点 index + pre_count = self._chat_root_message_count() or 0 + pre_user_idx = self._last_user_node_index() + pre_node_total = self._message_node_count() + _logger.debug( + "[ask] 锚点 pre_count=%d pre_user_idx=%d pre_node_total=%d", + pre_count, pre_user_idx, pre_node_total, + ) + + # 2. 拟人化输入 input_locator = self._get_chat_input_locator() input_locator.wait_for(state="visible", timeout=10000) input_locator.click() - input_locator.fill(question) + try: + input_locator.fill("") + except Exception: + pass - # 2. 点击发送 - send_btn = self._get_send_button_locator() - send_btn.wait_for(state="visible", timeout=5000) - send_btn.click() + if opts.typing_delay_ms > 0: + _logger.debug( + "[ask] press_sequentially 拟人输入 delay=%dms len=%d", + opts.typing_delay_ms, len(question), + ) + input_locator.press_sequentially(question, delay=opts.typing_delay_ms) + else: + input_locator.fill(question) - # 3. 等待回复完成 - self._wait_for_streaming_done(opts.timeout) + # 3. 发送:优先 Enter + if opts.use_enter_key: + _logger.debug("[ask] 按 Enter 发送") + input_locator.press("Enter") + else: + send_btn = self._get_send_button_locator() + send_btn.wait_for(state="visible", timeout=5000) + send_btn.click() + _logger.debug("[ask] 点击发送按钮") - # 4. 读取最后一条 assistant 消息 - messages = self.read() - assistant_msgs = [m for m in messages if m.role == "assistant"] - if not assistant_msgs: + # 4. 等待我们这条 user 消息落到 DOM(体现为最后一条 user 节点 index 增大 + # 或节点总数增大 + 文本匹配 question)。最多等 15s。 + my_user_idx = -1 + my_user_deadline = time.time() + 15.0 + while time.time() < my_user_deadline: + cur_user_idx = self._last_user_node_index() + cur_total = self._message_node_count() + # 条件 A:最后一条 user 节点 index 变大(未污染场景) + # 条件 B:节点总数变大且最后 user 节点文本匹配 question(脏 session 场景) + if cur_user_idx > pre_user_idx and self._user_node_matches(cur_user_idx, question): + my_user_idx = cur_user_idx + break + if cur_total > pre_node_total and self._user_node_matches(cur_user_idx, question): + my_user_idx = cur_user_idx + break + time.sleep(0.2) + + if my_user_idx < 0: + # 最后兜底:取当前最后一条 user 节点(哪怕文本没对上) + my_user_idx = self._last_user_node_index() + _logger.warning( + "[ask] 未能可靠识别我们的 user 节点,回退到 last_user_idx=%d", + my_user_idx, + ) + else: + _logger.debug("[ask] 我们的 user 节点 index=%d", my_user_idx) + + # 5. 等待本轮完成(多信号合流) + self._wait_for_streaming_done( + user_msg_index=my_user_idx, + timeout_ms=opts.timeout, + stable_seconds=opts.stable_seconds, + poll_interval=opts.poll_interval, + ) + + # 6. 选答:只在 my_user_idx 之后的 assistant 中找带 message-body 的最后一条 + picked = self._latest_assistant_after(my_user_idx) + if picked is None or not picked["has_body"]: + _logger.warning( + "[ask] user_idx=%d 之后未找到带 message-body 的 assistant(picked=%s)", + my_user_idx, picked, + ) + # 兜底:取我们 user 之后的任意最后一条 assistant inner_text + try: + page = self.get_page() + nodes = page.locator('[data-jcid="message"]') + total = nodes.count() + for i in range(total - 1, my_user_idx, -1): + node = nodes.nth(i) + if (node.get_attribute("data-jcid-role") or "").lower() == "assistant": + return node.inner_text(timeout=2000) or "" + except Exception: + pass return "" - return assistant_msgs[-1].content + + _logger.debug( + "[ask] 选定 assistant_idx=%d(user_idx=%d 之后第一条带 message-body)len=%d", + picked["index"], my_user_idx, len(picked["body_text"]), + ) + return picked["body_text"] def send_file(self, file_path: str, message: Optional[str] = None) -> None: - """ - 上传附件到当前会话。 - - 步骤: - 1. 点击附件按钮(data-jcid="attach-button" 或 fallback) - 2. 通过 input[type="file"] 设置文件路径 - 3. 如果提供了 message,填入输入框并发送 - - file_path: 要上传的文件绝对路径(如 D:/test.pdf) - message: 可选,上传后附带的消息文本。如果提供,上传后自动触发发送 - """ if not self._connected: raise ConnectionError("未连接到应用") - page = self.get_page() - # 1. 找到附件按钮 attach_btn = page.locator('[data-jcid="attach-button"]') if attach_btn.count() == 0: - # Fallback:找包含附件/paperclip 等文字的按钮 - attach_btn = page.locator('button').filter( + attach_btn = page.locator("button").filter( has_text=re.compile(r"附件|上传|paperclip|attach", re.I) ) - # 2. 尝试找到文件 input[type="file"] file_input = page.locator('input[type="file"]') if file_input.count() == 0 and attach_btn.count() > 0: - # 点击附件按钮可能会触发隐藏的 file input 显示 attach_btn.first.click() time.sleep(0.5) file_input = page.locator('input[type="file"]') if file_input.count() == 0: raise AssertError( - "找不到文件上传 input[type='file']。" - "请确认附件功能已实现,或 UI 已添加 data-jcid=\"attach-button\" 属性。" + "找不到文件上传 input[type='file']。请确认匠厂已加 data-jcid=\"attach-button\"。" ) - # 3. 设置文件 abs_path = os.path.abspath(file_path) if not os.path.exists(abs_path): raise AppNotFoundError(f"上传文件不存在: {abs_path}") file_input.set_input_files(abs_path) - time.sleep(1) # 等待附件预览渲染 + time.sleep(1) - # 4. 如果提供了 message,自动发送 if message: input_locator = self._get_chat_input_locator() input_locator.click() - input_locator.fill(message) - send_btn = self._get_send_button_locator() - send_btn.click() + input_locator.press_sequentially(message, delay=25) + input_locator.press("Enter") def read(self) -> List[JiangchangMessage]: """ - 从页面 DOM 提取当前可见的所有消息。 - - 优先通过 data-jcid 属性识别,回退到 DOM 结构推断。 - 返回按时间顺序排列的 JiangchangMessage 列表。 + 严格读取:仅接受带 data-jcid="message" + data-jcid-role 的节点。 + 无 role 的节点返回 role='unknown'(不再默认为 assistant)。 """ if not self._connected: raise ConnectionError("未连接到应用") @@ -460,80 +918,51 @@ class JiangchangDesktopClient: try: msg_list = self._get_message_list_locator() - msg_list.wait_for(state="visible", timeout=10000) - except Exception: - return messages # 消息列表还没出现,返回空 + msg_list.wait_for(state="visible", timeout=5000) + except Exception as exc: + _logger.debug("[read] 消息列表不可见:%s", exc) + return messages - # 策略 1:通过 data-jcid="message-xxx" 查找每条消息 - items = page.locator('[data-jcid^="message-"]') - if items.count() > 0: - for i in range(items.count()): - el = items.nth(i) - try: - role = el.get_attribute("data-jcid-role") or "assistant" - content = el.inner_text() - is_error = el.get_attribute("data-jcid-error") == "true" - tool_name = el.get_attribute("data-jcid-tool") - messages.append(JiangchangMessage( - id=el.get_attribute("data-jcid") or str(i), + items = page.locator('[data-jcid="message"]') + total = items.count() + _logger.debug("[read] data-jcid=\"message\" 节点数=%d", total) + + for i in range(total): + el = items.nth(i) + try: + role_attr = el.get_attribute("data-jcid-role") + streaming_attr = el.get_attribute("data-jcid-streaming") or "false" + # 优先从 message-body(正文气泡)取干净文本,避免带入 Thinking / ToolCards + body_locator = el.locator('[data-jcid="message-body"]') + if body_locator.count() > 0: + content = body_locator.first.inner_text(timeout=2000) + else: + content = el.inner_text(timeout=2000) + + role = (role_attr or "unknown").lower() + messages.append( + JiangchangMessage( + id=el.get_attribute("data-jcid-message-id") or f"msg-{i}", role=role, content=content, timestamp=time.time(), - is_error=is_error, - tool_name=tool_name, - )) - except Exception: - continue - return messages - - # 策略 2:按 DOM 结构推断(通过 class 特征区分 user/assistant) - # user 消息通常 class 含 "user" 或气泡颜色为蓝色 - # assistant 消息通常 class 含 "assistant" 或气泡颜色为灰色 - raw_items = msg_list.locator("> *").all() - for idx, el in enumerate(raw_items): - try: - classes = el.get_attribute("class") or "" - text = el.inner_text() - if not text.strip(): - continue - - # 简单启发式判断角色 - if "user" in classes.lower(): - role = "user" - elif "assistant" in classes.lower() or "bot" in classes.lower(): - role = "assistant" - elif idx % 2 == 0: # 交替:偶数=user,奇数=assistant - role = "user" - else: - role = "assistant" - - messages.append(JiangchangMessage( - id=f"msg-{idx}", - role=role, - content=text, - timestamp=time.time(), - )) - except Exception: + is_error=(el.get_attribute("data-jcid-error") == "true"), + tool_name=el.get_attribute("data-jcid-tool"), + ) + ) + if streaming_attr == "true": + messages[-1].tool_status = "streaming" + except Exception as exc: + _logger.debug("[read] 读取第%d条异常:%s", i, exc) continue - return messages def assert_contains(self, expected: str, options: Optional[AssertOptions] = None) -> None: - """ - 断言最新一条 assistant 回复中包含指定内容。 - - options.match_mode 支持: - - 'contains'(默认):子串包含 - - 'regex':正则表达式匹配 - - 'exact':完全相等 - """ if not self._connected: raise ConnectionError("未连接到应用") - opts = options or AssertOptions() messages = self.read() - # 找目标消息 assistant_msgs = [m for m in messages if m.role == "assistant"] if not assistant_msgs: raise AssertError( @@ -542,43 +971,47 @@ class JiangchangDesktopClient: actual="(无 assistant 消息)", ) - # 默认取最后一条,可通过 message_index 自定义 target_idx = opts.message_index if opts.message_index != -1 else -1 target = assistant_msgs[target_idx] actual = target.content - # 执行匹配 matched = False if opts.match_mode == "exact": - matched = (actual.strip() == expected.strip()) + matched = actual.strip() == expected.strip() elif opts.match_mode == "regex": - import re matched = bool(re.search(expected, actual)) - else: # contains(默认) + else: matched = expected in actual if not matched: raise AssertError( - f"断言失败。期望包含:{expected!r}\n实际内容:{actual[:200]!r}", + f"断言失败。期望:{expected!r}\n实际:{actual[:200]!r}", expected=expected, actual=actual[:200], ) - def disconnect(self) -> None: - """断开连接""" - if not self._connected: - return - self._cleanup() + # ─── 调试辅助 ───────────────────────────────────────────────── - def is_connected(self) -> bool: - """返回当前是否已连接""" - return self._connected - - def get_page(self) -> Page: - """获取 Playwright Page 对象,供高级用法""" - if not self._page: - raise ConnectionError("未连接到应用,请先调用 connect() 或 launch_app()") - return self._page + def snapshot(self, target_dir: str, tag: str = "snapshot") -> dict: + """把当前页面截图 + DOM HTML 落盘,用于测试失败现场留证。""" + os.makedirs(target_dir, exist_ok=True) + ts = time.strftime("%Y%m%d-%H%M%S") + png_path = os.path.join(target_dir, f"{tag}-{ts}.png") + html_path = os.path.join(target_dir, f"{tag}-{ts}.html") + out = {"png": png_path, "html": html_path} + try: + self.get_page().screenshot(path=png_path, full_page=True) + except Exception as exc: + _logger.warning("[snapshot] 截图失败:%s", exc) + out["png"] = "" + try: + html = self.get_page().content() + with open(html_path, "w", encoding="utf-8") as f: + f.write(html) + except Exception as exc: + _logger.warning("[snapshot] DOM dump 失败:%s", exc) + out["html"] = "" + return out def __enter__(self): return self diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/exceptions.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/exceptions.py index 1d1bfa9..a6026fb 100644 --- a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/exceptions.py +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/exceptions.py @@ -21,3 +21,7 @@ class AssertError(JiangchangDesktopError): class LaunchError(JiangchangDesktopError): """应用启动失败时抛出""" pass + +class GatewayDownError(JiangchangDesktopError): + """Gateway 在等待过程中被检测到已停止/退出时抛出,便于测试立刻失败而不是空等超时。""" + pass diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/__init__.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/__init__.py new file mode 100644 index 0000000..eb2cf69 --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/__init__.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +"""匠厂桌面 SDK 的测试工具子包。 + +提供 skill 桌面 E2E 测试所需的共享工具: + - ``SkillInfo`` / ``discover_skill_root`` / ``parse_skill_md`` + - ``skill_healthcheck`` + - ``HostAPIClient`` / ``HostAPIError`` + - ``clear_main_agent_history`` + +pytest plugin 本体位于子模块 ``plugin``,由 pyproject.toml 的 +``[project.entry-points.pytest11]`` 自动注册;未 pip install 场景下, +skill 的 conftest 也可通过 ``pytest_plugins`` 变量手动加载。 +""" +from .config import SkillInfo, discover_skill_root, parse_skill_md +from .healthcheck import HealthCheckError, skill_healthcheck +from .host_api import HostAPIClient, HostAPIError +from .session_cleanup import clear_main_agent_history + +__all__ = [ + "SkillInfo", + "discover_skill_root", + "parse_skill_md", + "HealthCheckError", + "skill_healthcheck", + "HostAPIClient", + "HostAPIError", + "clear_main_agent_history", +] diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/config.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/config.py new file mode 100644 index 0000000..de1cd9c --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/config.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +"""Skill 元信息读取。 + +负责: + - 定位 skill 根目录(含 SKILL.md); + - 解析 SKILL.md 的 YAML frontmatter,抽出 slug / version / name 等关键字段。 + +为避免硬性依赖 PyYAML,这里用一个**足够用**的轻量正则 parser:只抽取 +我们真正关心的字段(顶层 `name` / `version` / `author`,嵌套 `openclaw.slug` +与 `openclaw.category`)。更复杂的 YAML 结构请自行用 PyYAML 解析。 +""" +from __future__ import annotations + +import os +import re +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +SKILL_MD = "SKILL.md" +SKILL_ROOT_ENV = "JIANGCHANG_E2E_SKILL_ROOT" + + +@dataclass +class SkillInfo: + root: str + slug: str + name: str + version: str + author: str = "" + category: str = "" + + +def discover_skill_root(start: Optional[str] = None) -> str: + """定位 skill 根目录。 + + 优先级: + 1. 环境变量 ``JIANGCHANG_E2E_SKILL_ROOT``(由各 skill 的 conftest 提前注入); + 2. 从 ``start``(默认 cwd)向上回溯,直到找到含 ``SKILL.md`` 的目录。 + """ + env = (os.environ.get(SKILL_ROOT_ENV) or "").strip() + if env: + env_abs = os.path.abspath(env) + if os.path.isfile(os.path.join(env_abs, SKILL_MD)): + return env_abs + cur = Path(start or os.getcwd()).resolve() + for parent in [cur, *cur.parents]: + if (parent / SKILL_MD).exists(): + return str(parent) + raise FileNotFoundError( + f"未能在 {start or os.getcwd()} 及其父目录中找到 {SKILL_MD}。" + f"请设置环境变量 {SKILL_ROOT_ENV}=。" + ) + + +_FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL) + + +def _extract_frontmatter(text: str) -> str: + match = _FRONTMATTER_RE.match(text) + if not match: + raise ValueError("SKILL.md 缺少 YAML frontmatter(--- ... ---)。") + return match.group(1) + + +def _strip_quotes(value: str) -> str: + v = value.strip() + if len(v) >= 2 and v[0] == v[-1] and v[0] in ("'", '"'): + return v[1:-1] + return v + + +def parse_skill_md(skill_root: str) -> SkillInfo: + """解析 SKILL.md 的 frontmatter,返回 SkillInfo。""" + md_path = Path(skill_root) / SKILL_MD + if not md_path.exists(): + raise FileNotFoundError(f"SKILL.md 不存在:{md_path}") + text = md_path.read_text(encoding="utf-8") + fm = _extract_frontmatter(text) + + top: dict[str, str] = {} + openclaw: dict[str, str] = {} + + lines = fm.splitlines() + i = 0 + while i < len(lines): + line = lines[i] + if not line.strip() or line.lstrip().startswith("#"): + i += 1 + continue + # 顶层 key: value + m = re.match(r"^([A-Za-z_][\w-]*):\s*(.*)$", line) + if m: + key, rest = m.group(1), m.group(2) + if rest.strip() == "": + # 嵌套块,收集 2 空格缩进的子项 + i += 1 + sub: dict[str, str] = {} + while i < len(lines): + sub_line = lines[i] + if sub_line.strip() == "" or sub_line.lstrip().startswith("#"): + i += 1 + continue + if not sub_line.startswith(" "): + break + sm = re.match(r"^\s+([A-Za-z_][\w-]*):\s*(.*)$", sub_line) + if sm: + sub[sm.group(1)] = _strip_quotes(sm.group(2)) + i += 1 + if key == "metadata": + # openclaw 在 metadata 下再下一层: + # metadata: + # openclaw: + # slug: ... + # 上面 sub 已经拿到的只是 "openclaw" 这个 key。为稳妥起见,回头 + # 用一个更宽松的扫描:直接找所有含 `slug:` `category:` 的行。 + pass + continue + top[key] = _strip_quotes(rest) + i += 1 + + # openclaw.slug / category 用宽松扫描兜底(无论嵌套几层) + for line in lines: + stripped = line.lstrip() + if stripped.startswith("slug:") and "slug" not in openclaw: + openclaw["slug"] = _strip_quotes(stripped.split(":", 1)[1]) + elif stripped.startswith("category:") and "category" not in openclaw: + openclaw["category"] = _strip_quotes(stripped.split(":", 1)[1]) + + return SkillInfo( + root=os.path.abspath(skill_root), + slug=openclaw.get("slug", "") or Path(skill_root).name, + name=top.get("name", Path(skill_root).name), + version=top.get("version", "0.0.0"), + author=top.get("author", ""), + category=openclaw.get("category", ""), + ) diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/healthcheck.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/healthcheck.py new file mode 100644 index 0000000..f878e73 --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/healthcheck.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +"""技能健康检查。 + +检查项: + 1. SKILL.md 成功解析,含 slug / version。 + 2. ``python scripts/main.py health`` 命令退出码为 0(每个 skill 都约定实现)。 + +CLI 耗时通常 <2s,一个 pytest session 里只跑一次(session-scope autouse), +失败时 fail-fast 终止整个测试 session。开发时可设 +``JIANGCHANG_E2E_SKIP_HEALTHCHECK=1`` 临时跳过。 +""" +from __future__ import annotations + +import logging +import os +import subprocess +import sys + +from .config import SkillInfo + +logger = logging.getLogger("jiangchang-desktop-sdk.healthcheck") + + +class HealthCheckError(RuntimeError): + """健康检查失败。""" + + +def skill_healthcheck( + skill_info: SkillInfo, + *, + run_cli: bool = True, + timeout_s: float = 15.0, +) -> None: + """对一个 skill 跑本地健康检查。失败抛 ``HealthCheckError``。""" + if not skill_info.slug: + raise HealthCheckError(f"SKILL.md 未定义 openclaw.slug:{skill_info.root}") + if not skill_info.version or skill_info.version == "0.0.0": + raise HealthCheckError(f"SKILL.md 未定义 version:{skill_info.root}") + + if not run_cli: + return + + main_py = os.path.join(skill_info.root, "scripts", "main.py") + if not os.path.isfile(main_py): + logger.warning("skill_healthcheck: %s 不存在,跳过 CLI health 检查", main_py) + return + + cmd = [sys.executable, main_py, "health"] + logger.debug("skill_healthcheck: 运行 %s (cwd=%s)", cmd, skill_info.root) + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout_s, + cwd=skill_info.root, + encoding="utf-8", + errors="replace", + ) + except subprocess.TimeoutExpired as exc: + raise HealthCheckError( + f"skill_healthcheck: '{' '.join(cmd)}' 超时 {timeout_s}s" + ) from exc + + if proc.returncode != 0: + raise HealthCheckError( + f"skill_healthcheck: '{' '.join(cmd)}' 退出码 {proc.returncode}\n" + f"stdout: {proc.stdout[:600]}\n" + f"stderr: {proc.stderr[:600]}" + ) + logger.info( + "skill_healthcheck: %s v%s OK", skill_info.slug, skill_info.version + ) diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/host_api.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/host_api.py new file mode 100644 index 0000000..fc10b14 --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/host_api.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +"""匠厂 Host HTTP API 的 Python 轻封装。 + +使用 stdlib urllib,避免为 SDK 引入新运行时依赖。只暴露本模块当前 +真正需要的端点;其余接口按需添加。 + +Host API 默认端口:`13210`(见 jiangchang/electron/utils/config.ts 中 `CLAWX_HOST_API`)。 +可通过环境变量 ``CLAWX_PORT_CLAWX_HOST_API`` 或本类的 ``port`` 参数覆盖。 +""" +from __future__ import annotations + +import json +import logging +import os +import urllib.error +import urllib.request +from typing import Any, Dict, Optional + +DEFAULT_PORT = 13210 +DEFAULT_HOST = "127.0.0.1" + +logger = logging.getLogger("jiangchang-desktop-sdk.host_api") + + +class HostAPIError(RuntimeError): + """Host API 调用失败(网络错误或非 2xx 响应)。""" + + +class HostAPIClient: + def __init__( + self, + host: str = DEFAULT_HOST, + port: Optional[int] = None, + timeout: float = 10.0, + ) -> None: + self.host = host + env_port = os.environ.get("CLAWX_PORT_CLAWX_HOST_API") + self.port = port or (int(env_port) if env_port else DEFAULT_PORT) + self.timeout = timeout + + @property + def base_url(self) -> str: + return f"http://{self.host}:{self.port}" + + # ── HTTP helpers ───────────────────────────────────────── + + def _request(self, method: str, path: str, body: Any = None) -> Any: + url = f"{self.base_url}{path}" + data = None + headers = {"Accept": "application/json"} + if body is not None: + data = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + req = urllib.request.Request(url, data=data, method=method, headers=headers) + try: + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + raw = resp.read().decode("utf-8") + if not raw: + return None + try: + return json.loads(raw) + except json.JSONDecodeError: + return raw + except urllib.error.HTTPError as exc: + payload: Any + try: + payload = json.loads(exc.read().decode("utf-8")) + except Exception: # noqa: BLE001 + payload = str(exc) + raise HostAPIError(f"{method} {path} -> HTTP {exc.code}: {payload}") from exc + except urllib.error.URLError as exc: + raise HostAPIError(f"{method} {path} -> {exc}") from exc + + # ── High-level API ─────────────────────────────────────── + + def ping(self) -> bool: + try: + self.list_agents() + return True + except HostAPIError: + return False + + def list_agents(self) -> Dict[str, Any]: + return self._request("GET", "/api/agents") or {} + + def get_main_session_key(self) -> str: + """返回主 Agent 的 mainSessionKey(形如 ``agent:main:{mainKey}``)。 + + 若多种形式都取不到,退化为 ``agent:main:default``——这通常仍能被 + 后端 ``POST /api/sessions/delete`` 正确识别(会走 sessions.json 查找)。 + """ + snapshot = self.list_agents() + entries = snapshot.get("entries") or [] + for entry in entries: + if entry.get("id") == "main": + key = entry.get("mainSessionKey") or entry.get("sessionKey") + if key: + return key + key = snapshot.get("mainSessionKey") + if key: + return key + return "agent:main:default" + + def delete_session(self, session_key: str) -> None: + self._request("POST", "/api/sessions/delete", {"sessionKey": session_key}) + + def sessions_dir(self, agent_id: str = "main") -> str: + """返回指定 agent 的 sessions 目录绝对路径(供文件系统回退使用)。""" + return os.path.join( + os.path.expanduser("~"), ".openclaw", "agents", agent_id, "sessions" + ) diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/plugin.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/plugin.py new file mode 100644 index 0000000..978a6ba --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/plugin.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +"""jiangchang_desktop_sdk.testing 的 pytest plugin。 + +提供的 fixture / hook(所有 skill 通用): + +session 级: + - ``skill_info`` 当前 skill 的 SkillInfo(从 SKILL.md 解析) + - ``host_api`` 匠厂 Host API 的 Python 客户端 + - ``jc_desktop_session_setup`` (autouse) 跑一次健康检查 + 清空主 Agent 历史 + +function 级: + - ``failure_snapshot_dir`` 失败现场落盘目录(tests/desktop/artifacts/) + +hook: + - ``pytest_runtest_makereport`` 把 setup/call/teardown 结果挂到 + ``item.rep_setup`` / ``item.rep_call`` / ``item.rep_teardown`` + +环境变量: + - ``JIANGCHANG_E2E_SKILL_ROOT`` 显式指定 skill 根目录 + - ``JIANGCHANG_E2E_SKIP_HEALTHCHECK`` 置 1 可临时跳过健康检查 + - ``JIANGCHANG_E2E_SKIP_CLEAR_HISTORY`` 置 1 可临时跳过清空历史 + +加载方式(二选一): + A) ``pip install -e`` 安装本 SDK 后,通过 ``pyproject.toml`` 的 + ``[project.entry-points.pytest11]`` 自动加载; + B) 在 skill 的 ``tests/desktop/conftest.py`` 里声明 + ``pytest_plugins = ["jiangchang_desktop_sdk.testing.plugin"]``。 +""" +from __future__ import annotations + +import logging +import os + +import pytest + +from .config import SkillInfo, discover_skill_root, parse_skill_md +from .healthcheck import HealthCheckError, skill_healthcheck +from .host_api import HostAPIClient +from .session_cleanup import clear_main_agent_history + +logger = logging.getLogger("jiangchang-desktop-sdk.plugin") + + +# ── Fixtures ────────────────────────────────────────────────────────────── + +@pytest.fixture(scope="session") +def skill_info(request) -> SkillInfo: + rootpath = str(request.config.rootpath) + root = discover_skill_root(start=rootpath) + info = parse_skill_md(root) + logger.info( + "[skill_info] root=%s slug=%s version=%s", info.root, info.slug, info.version + ) + return info + + +@pytest.fixture(scope="session") +def host_api() -> HostAPIClient: + return HostAPIClient() + + +@pytest.fixture(scope="session", autouse=True) +def jc_desktop_session_setup(skill_info, host_api): + """Desktop 测试 session 级一次性准备。 + + 1. ``skill_healthcheck``:SKILL.md + ``python scripts/main.py health``, + 失败时 **fail-fast** 终止整个 session; + 2. ``clear_main_agent_history``:通过 Host API 清空主 Agent 的当前 session, + 规避跨 case 的上下文污染。API 不可达则回退文件系统,不影响测试启动。 + """ + if not os.environ.get("JIANGCHANG_E2E_SKIP_HEALTHCHECK"): + try: + skill_healthcheck(skill_info) + except HealthCheckError as exc: + pytest.exit( + f"[jc-healthcheck] 技能健康检查失败,终止测试 session:\n{exc}\n" + "临时跳过:设置 JIANGCHANG_E2E_SKIP_HEALTHCHECK=1。", + returncode=5, + ) + else: + logger.warning("[jc-healthcheck] 已通过环境变量跳过健康检查") + + if not os.environ.get("JIANGCHANG_E2E_SKIP_CLEAR_HISTORY"): + try: + cleared = clear_main_agent_history(host_api) + logger.info("[jc-cleanup] session 启动前已清空 %d 个历史 session", cleared) + except Exception as exc: # noqa: BLE001 + logger.warning("[jc-cleanup] 清空历史失败(非致命):%s", exc) + else: + logger.warning("[jc-cleanup] 已通过环境变量跳过清空历史") + + yield + + +@pytest.fixture +def failure_snapshot_dir(request) -> str: + """失败现场落盘目录:与测试文件同级的 artifacts/ 目录。""" + test_dir = os.path.dirname(str(request.node.fspath)) + artifacts = os.path.join(test_dir, "artifacts") + os.makedirs(artifacts, exist_ok=True) + return artifacts + + +# ── Hooks ───────────────────────────────────────────────────────────────── + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + outcome = yield + rep = outcome.get_result() + setattr(item, f"rep_{rep.when}", rep) + + +def pytest_configure(config): + config.addinivalue_line( + "markers", + "multi_turn: 多轮追问测试用例(同类用例共享同一会话上下文)", + ) diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/session_cleanup.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/session_cleanup.py new file mode 100644 index 0000000..827c1ba --- /dev/null +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/testing/session_cleanup.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +"""清空主 Agent 的 chat 会话历史。 + +主路径:`POST /api/sessions/delete`(匠厂官方 Host API,软删除,改名 `.deleted.jsonl`)。 +回退路径:直接把 `~/.openclaw/agents/main/sessions/*.jsonl` 重命名为 `.deleted.jsonl`。 +""" +from __future__ import annotations + +import logging +import os +from typing import Optional + +from .host_api import HostAPIClient, HostAPIError + +logger = logging.getLogger("jiangchang-desktop-sdk.session_cleanup") + + +def clear_main_agent_history( + api: Optional[HostAPIClient] = None, + *, + agent_id: str = "main", + strict: bool = False, +) -> int: + """清空主 Agent 的当前 session。返回被删除的 session 数量。 + + ``strict=True`` 时 API 失败立刻抛出;默认 ``False`` 下 API 失败会降级到 + 文件系统批量重命名,尽最大努力保证下一个测试看到的是干净环境。 + """ + api = api or HostAPIClient() + + # ── Path A: Host API ─────────────────────────────────── + try: + session_key = api.get_main_session_key() + logger.debug("clear_main_agent_history: 主 session key = %s", session_key) + api.delete_session(session_key) + logger.info( + "clear_main_agent_history: HTTP API 已软删除 %s", session_key + ) + return 1 + except HostAPIError as exc: + if strict: + raise + logger.warning( + "clear_main_agent_history: HTTP API 失败(%s),回退到文件系统", exc + ) + + # ── Path B: 文件系统回退 ──────────────────────────────── + sessions_dir = api.sessions_dir(agent_id) + if not os.path.isdir(sessions_dir): + logger.info( + "clear_main_agent_history: %s 不存在,无历史可清", sessions_dir + ) + return 0 + + deleted = 0 + for name in os.listdir(sessions_dir): + if not name.endswith(".jsonl") or name.endswith(".deleted.jsonl"): + continue + src = os.path.join(sessions_dir, name) + dst = src[: -len(".jsonl")] + ".deleted.jsonl" + try: + os.replace(src, dst) + deleted += 1 + except OSError as exc: + logger.warning( + "clear_main_agent_history: 重命名 %s 失败:%s", src, exc + ) + if strict: + raise + logger.info( + "clear_main_agent_history: 文件系统回退共软删除 %d 个 session", deleted + ) + return deleted diff --git a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/types.py b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/types.py index 6e902c1..a259876 100644 --- a/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/types.py +++ b/sdk/jiangchang-desktop-sdk/src/jiangchang_desktop_sdk/types.py @@ -17,9 +17,19 @@ class JiangchangMessage: @dataclass class AskOptions: """ask() 方法的选项""" - timeout: int = 120000 # 毫秒 + timeout: int = 120000 # 毫秒 wait_for_tools: bool = True agent_id: str = "main" + # 拟人化输入:每个字符间延迟(毫秒)。0 = 直接 fill()。 + typing_delay_ms: int = 25 + # 发送方式:True=按 Enter 键;False=点击发送按钮 + use_enter_key: bool = True + # 每次 ask 前是否自动新建任务(避免上下文污染) + new_task: bool = True + # 流式输出判稳阈值(秒):助手消息文本连续该秒数无增长视为完成 + stable_seconds: float = 3.0 + # 轮询间隔(秒) + poll_interval: float = 0.5 @dataclass class LaunchOptions: