Add OpenClaw skills, platform kit, and template docs

Made-with: Cursor
This commit is contained in:
2026-04-04 10:35:02 +08:00
parent e37b03c00f
commit 35f4758da2
83 changed files with 8971 additions and 0 deletions

292
llm-manager/scripts/db.py Normal file
View File

@@ -0,0 +1,292 @@
"""
llm-manager 本地数据库:
- llm_keys: API Key 记录
- llm_web_accounts: 网页模式账号关联记录(账号主数据仍由 account-manager 管理)
"""
import os
import sqlite3
import time
from typing import Optional
from providers import get_data_root, get_user_id
SKILL_SLUG = "llm-manager"
# SQLite 无独立 DATETIME时间统一存 INTEGER Unix 秒UTC
LLM_KEYS_TABLE_SQL = """
CREATE TABLE llm_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- 主键(自增)
provider TEXT NOT NULL, -- 平台 slugdoubao/deepseek/qianwen/kimi/yiyan/yuanbao
label TEXT NOT NULL DEFAULT '', -- 用户自定义备注如「公司Key」
api_key TEXT NOT NULL, -- API Key 原文
default_model TEXT, -- 默认模型doubao 须填 ep-xxx
is_active INTEGER NOT NULL DEFAULT 1, -- 是否启用0 停用 1 启用
last_used_at INTEGER, -- 最近调用时间Unix 秒;从未用过为 NULL
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
"""
LLM_WEB_ACCOUNTS_TABLE_SQL = """
CREATE TABLE llm_web_accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
provider TEXT NOT NULL,
account_id INTEGER NOT NULL,
account_name TEXT NOT NULL DEFAULT '',
login_status INTEGER NOT NULL DEFAULT 1,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
UNIQUE(provider, account_id)
);
"""
def _now_unix() -> int:
return int(time.time())
def get_skill_data_dir() -> str:
path = os.path.join(get_data_root(), get_user_id(), SKILL_SLUG)
os.makedirs(path, exist_ok=True)
return path
def get_db_path() -> str:
return os.path.join(get_skill_data_dir(), f"{SKILL_SLUG}.db")
def get_conn():
return sqlite3.connect(get_db_path())
def init_db():
conn = get_conn()
try:
cur = conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='llm_keys'")
if not cur.fetchone():
cur.executescript(LLM_KEYS_TABLE_SQL)
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='llm_web_accounts'")
if not cur.fetchone():
cur.executescript(LLM_WEB_ACCOUNTS_TABLE_SQL)
conn.commit()
finally:
conn.close()
# ---------------------------------------------------------------------------
# CRUD
# ---------------------------------------------------------------------------
def add_key(provider: str, api_key: str, model: Optional[str] = None, label: str = "") -> int:
init_db()
now = _now_unix()
conn = get_conn()
try:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO llm_keys (provider, label, api_key, default_model, is_active, created_at, updated_at)
VALUES (?, ?, ?, ?, 1, ?, ?)
""",
(provider, label or "", api_key, model, now, now),
)
new_id = cur.lastrowid
conn.commit()
return new_id
finally:
conn.close()
def upsert_web_account(provider: str, account_id: int, account_name: str = "", login_status: int = 1) -> int:
init_db()
now = _now_unix()
conn = get_conn()
try:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO llm_web_accounts (provider, account_id, account_name, login_status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(provider, account_id) DO UPDATE SET
account_name = excluded.account_name,
login_status = excluded.login_status,
updated_at = excluded.updated_at
""",
(provider, int(account_id), account_name or "", int(login_status or 0), now, now),
)
conn.commit()
cur.execute(
"SELECT id FROM llm_web_accounts WHERE provider = ? AND account_id = ?",
(provider, int(account_id)),
)
row = cur.fetchone()
return int(row[0]) if row else 0
finally:
conn.close()
def list_keys(provider: Optional[str] = None, limit: int = 10) -> list:
init_db()
if not isinstance(limit, int) or limit <= 0:
limit = 10
conn = get_conn()
try:
cur = conn.cursor()
if provider:
cur.execute(
"SELECT id, provider, label, api_key, default_model, is_active, last_used_at, created_at "
"FROM llm_keys WHERE provider = ? ORDER BY created_at DESC, id DESC LIMIT ?",
(provider, limit),
)
else:
cur.execute(
"SELECT id, provider, label, api_key, default_model, is_active, last_used_at, created_at "
"FROM llm_keys ORDER BY created_at DESC, id DESC LIMIT ?",
(limit,),
)
rows = cur.fetchall()
finally:
conn.close()
result = []
for row in rows:
result.append({
"id": row[0],
"provider": row[1],
"label": row[2] or "",
"api_key": row[3],
"default_model": row[4] or "",
"is_active": row[5],
"last_used_at": row[6],
"created_at": row[7],
})
return result
def list_web_accounts(provider: Optional[str] = None, limit: int = 10) -> list:
init_db()
if not isinstance(limit, int) or limit <= 0:
limit = 10
conn = get_conn()
try:
cur = conn.cursor()
if provider:
cur.execute(
"SELECT id, provider, account_id, account_name, login_status, created_at, updated_at "
"FROM llm_web_accounts WHERE provider = ? ORDER BY created_at DESC, id DESC LIMIT ?",
(provider, limit),
)
else:
cur.execute(
"SELECT id, provider, account_id, account_name, login_status, created_at, updated_at "
"FROM llm_web_accounts ORDER BY created_at DESC, id DESC LIMIT ?",
(limit,),
)
rows = cur.fetchall()
finally:
conn.close()
result = []
for row in rows:
result.append({
"id": row[0],
"provider": row[1],
"account_id": row[2],
"account_name": row[3] or "",
"login_status": int(row[4] or 0),
"created_at": row[5],
"updated_at": row[6],
})
return result
def get_key_by_id(key_id: int) -> Optional[dict]:
init_db()
conn = get_conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT id, provider, label, api_key, default_model, is_active, last_used_at, created_at "
"FROM llm_keys WHERE id = ?",
(key_id,),
)
row = cur.fetchone()
if not row:
return None
return {
"id": row[0],
"provider": row[1],
"label": row[2] or "",
"api_key": row[3],
"default_model": row[4] or "",
"is_active": row[5],
"last_used_at": row[6],
"created_at": row[7],
}
finally:
conn.close()
def delete_key(key_id: int) -> bool:
init_db()
conn = get_conn()
try:
cur = conn.cursor()
cur.execute("SELECT id FROM llm_keys WHERE id = ?", (key_id,))
if not cur.fetchone():
return False
cur.execute("DELETE FROM llm_keys WHERE id = ?", (key_id,))
conn.commit()
return True
finally:
conn.close()
def find_active_key(provider: str) -> Optional[dict]:
"""查找该平台第一个 is_active=1 的 key按 id 升序)。"""
init_db()
conn = get_conn()
try:
cur = conn.cursor()
cur.execute(
"SELECT id, provider, label, api_key, default_model, is_active, last_used_at "
"FROM llm_keys WHERE provider = ? AND is_active = 1 ORDER BY id LIMIT 1",
(provider,),
)
row = cur.fetchone()
if not row:
return None
return {
"id": row[0],
"provider": row[1],
"label": row[2] or "",
"api_key": row[3],
"default_model": row[4] or "",
"is_active": row[5],
"last_used_at": row[6],
}
finally:
conn.close()
def mark_key_used(key_id: int):
now = _now_unix()
conn = get_conn()
try:
cur = conn.cursor()
cur.execute(
"UPDATE llm_keys SET last_used_at = ?, updated_at = ? WHERE id = ?",
(now, now, key_id),
)
conn.commit()
finally:
conn.close()
def _mask_key(api_key: str) -> str:
"""展示时打码前4位 + ... + 后4位。"""
k = api_key or ""
if len(k) <= 8:
return k[:2] + "****"
return k[:4] + "..." + k[-4:]

View File

@@ -0,0 +1,28 @@
"""
OpenAI 兼容 API 引擎:适用于 DeepSeek、通义千问、Kimi、文心一言、豆包火山方舟
只要平台提供 OpenAI 兼容接口,均可用此引擎,无需 Playwright。
"""
class ApiEngine:
def __init__(self, api_base: str, api_key: str, model: str):
self.api_base = api_base
self.api_key = api_key
self.model = model
def generate(self, prompt: str) -> str:
try:
from openai import OpenAI
except ImportError:
return "ERROR:缺少依赖pip install openai"
try:
client = OpenAI(base_url=self.api_base, api_key=self.api_key)
response = client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
)
content = response.choices[0].message.content
return (content or "").strip()
except Exception as e:
return f"ERROR:API调用失败{e}"

View File

@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
from playwright.async_api import Page
class BaseEngine(ABC):
def __init__(self, page: Page):
self.page = page
@abstractmethod
async def generate(self, prompt: str) -> str:
"""
基于操作页面的具体大模型的生成逻辑。输入文本,抓取并返回文本。
返回值:正常结果字符串,或以 "ERROR:" 开头的错误描述。
"""
pass

View File

@@ -0,0 +1,70 @@
"""
DeepSeek 网页版驱动引擎chat.deepseek.com
选择器说明(如页面改版需更新):
- 输入框:#chat-inputtextarea
- 发送按钮:[aria-label="发送消息"] 或 div[class*="send"] > button
- 停止生成按钮:[aria-label="停止生成"](生成中可见,生成结束消失)
- 复制按钮:最后一条回复下的 [aria-label="复制"]
"""
import asyncio
from .base import BaseEngine
class DeepSeekEngine(BaseEngine):
async def generate(self, prompt: str) -> str:
await self.page.goto("https://chat.deepseek.com")
await self.page.wait_for_load_state("networkidle")
# 登录检测:若能找到输入框则已登录
try:
editor = self.page.locator("textarea#chat-input").first
await editor.wait_for(state="visible", timeout=10000)
except Exception:
return "ERROR:REQUIRE_LOGIN"
# 输入提示词
await editor.click()
await self.page.keyboard.insert_text(prompt)
await asyncio.sleep(0.5)
# 发送(优先点按钮,失败则按 Enter
sent = False
for sel in ('[aria-label="发送消息"]', 'div[class*="send"] > button', 'button[type="submit"]'):
try:
btn = self.page.locator(sel).first
if await btn.is_visible(timeout=1000):
await btn.click()
sent = True
break
except Exception:
continue
if not sent:
await self.page.keyboard.press("Enter")
print("💡 [llm-manager/deepseek] 已发送提示词,等待 DeepSeek 生成响应...")
await asyncio.sleep(2)
# 等待生成完毕:停止按钮消失即为完成,超时 150 秒
stop_sel = '[aria-label="停止生成"]'
deadline = asyncio.get_event_loop().time() + 150
while asyncio.get_event_loop().time() < deadline:
try:
visible = await self.page.locator(stop_sel).first.is_visible(timeout=500)
if not visible:
break
except Exception:
break
await asyncio.sleep(2)
await asyncio.sleep(2)
# 通过最后一个复制按钮取结果
try:
copy_btn = self.page.locator('[aria-label="复制"]').last
await copy_btn.click()
await asyncio.sleep(0.5)
result = await self.page.evaluate("navigator.clipboard.readText()")
return result.strip()
except Exception as e:
return f"ERROR:抓取 DeepSeek 内容时异常:{e}"

View File

@@ -0,0 +1,222 @@
"""
豆包网页版驱动doubao.com/chat/)。
生成后不依赖「喇叭瞬时出现/消失」这种不稳定信号。
而是轮询判断两种输出是否已进入可复制状态:
1) 右侧写作模式:出现「改用对话直接回答」入口并可点右侧复制按钮
2) 左侧对话模式:出现可点的消息复制按钮(拿最后一条新增回复)
"""
import asyncio
import time
from .base import BaseEngine
_DOUBAO_SPEAKER_PATH_SNIPPET = "M19.8628 9.29346C20.3042"
class DoubaoEngine(BaseEngine):
async def generate(self, prompt: str) -> str:
self.page.set_default_timeout(60_000)
await self.page.goto(
"https://www.doubao.com/chat/",
wait_until="domcontentloaded",
timeout=60_000,
)
editor = self.page.locator('textarea[data-testid="chat_input_input"]').first
try:
await editor.wait_for(state="visible", timeout=30_000)
except Exception:
return "ERROR:REQUIRE_LOGIN"
text = (prompt or "").strip()
if not text:
return "ERROR:PROMPT_EMPTY"
await editor.click()
await editor.fill(text)
await asyncio.sleep(0.2)
send = self.page.locator(
'#flow-end-msg-send, [data-testid="chat_input_send_button"]'
).first
try:
await send.wait_for(state="visible", timeout=15_000)
await send.click(timeout=10_000)
except Exception as e:
return f"ERROR:DOUBAO_SEND_FAILED {e}"
print("💡 [llm-manager/doubao] 已发送,等待生成完成后再复制(最长 180s...")
# 发送前记录:用于确认点的是新增复制按钮,且剪贴板确实变化了
left_copy = self.page.locator('[data-testid="message_action_copy"]')
right_copy = self.page.locator('[data-testid="container_inner_copy_btn"]')
left_prev = await left_copy.count()
right_prev = await right_copy.count()
clipboard_before = await self._safe_read_clipboard()
# 稳定检测:避免生成中“喇叭短暂出现”误判(需要连续多次可见)
speaker = self.page.locator(
f'svg path[d*="{_DOUBAO_SPEAKER_PATH_SNIPPET}"]'
).last
stable_needed = 3
stable = 0
interval_sec = 0.5
deadline = time.monotonic() + 180.0
while time.monotonic() < deadline:
try:
if await speaker.is_visible():
stable += 1
else:
stable = 0
except Exception:
stable = 0
if stable >= stable_needed:
break
await asyncio.sleep(interval_sec)
else:
return "ERROR:DOUBAO_WAIT_COMPLETE_TIMEOUT 180 秒内未检测到“喇叭稳定回显”。"
# 生成完成后:依据当前页面状态决定点右侧还是左侧复制
write_mode = await self._is_write_mode()
if write_mode:
if await self._wait_count_gt(right_copy, right_prev, timeout_sec=15.0):
copy_btn = right_copy.last
elif await self._wait_count_gt(left_copy, left_prev, timeout_sec=15.0):
copy_btn = left_copy.last
else:
return "ERROR:DOUBAO_COPY_BUTTON_NOT_READY 右侧/左侧复制按钮均未出现新增。"
else:
if await self._wait_count_gt(left_copy, left_prev, timeout_sec=15.0):
copy_btn = left_copy.last
elif await self._wait_count_gt(right_copy, right_prev, timeout_sec=5.0):
copy_btn = right_copy.last
else:
return "ERROR:DOUBAO_COPY_BUTTON_NOT_READY 左侧复制按钮未出现新增。"
try:
await copy_btn.scroll_into_view_if_needed()
await copy_btn.click(timeout=15_000, force=True)
except Exception as e:
return f"ERROR:DOUBAO_COPY_CLICK_FAILED {e}"
out = await self._wait_clipboard_changed(clipboard_before, timeout_sec=20.0)
if out is None:
return "ERROR:DOUBAO_CLIPBOARD_NOT_UPDATED 剪贴板未在规定时间内更新。"
if self._is_invalid_clipboard(out, clipboard_before=clipboard_before):
return f"ERROR:DOUBAO_CLIPBOARD_INVALID {out[:200]}"
await asyncio.sleep(5)
return out
async def _is_write_mode(self) -> bool:
marker = self.page.locator('div.bottom-entry-GSWErB:has-text("改用对话直接回答")').first
try:
return await marker.is_visible()
except Exception:
return False
async def _wait_count_gt(self, locator, prev_count: int, timeout_sec: float) -> bool:
deadline = time.monotonic() + timeout_sec
while time.monotonic() < deadline:
try:
if await locator.count() > prev_count:
return True
except Exception:
pass
await asyncio.sleep(0.3)
return False
async def _wait_clipboard_changed(
self, clipboard_before: str | None, timeout_sec: float
) -> str | None:
deadline = time.monotonic() + timeout_sec
last = clipboard_before
while time.monotonic() < deadline:
cur = await self._safe_read_clipboard()
if cur and cur != last and not self._is_invalid_clipboard(
cur, clipboard_before=clipboard_before
):
return cur.strip()
last = cur
await asyncio.sleep(0.4)
return None
async def _try_copy_right(self, right_copy) -> str | None:
# 右侧复制按钮
try:
btn = right_copy.first
await btn.wait_for(state="visible", timeout=2000)
await btn.scroll_into_view_if_needed()
await btn.click(timeout=5000, force=True)
except Exception:
return None
await asyncio.sleep(0.4)
out = await self._safe_read_clipboard()
if self._is_invalid_clipboard(out):
return None
if out == (await self._safe_read_clipboard()):
pass
return (out or "").strip()
async def _try_copy_left(self, left_copy, prev_count: int) -> str | None:
n = await left_copy.count()
if n <= prev_count:
return None
# 从新增区间末尾往前找第一个可点击的复制按钮
for idx in range(n - 1, prev_count - 1, -1):
btn = left_copy.nth(idx)
try:
if not await btn.is_visible():
continue
await btn.scroll_into_view_if_needed()
await btn.click(timeout=5000, force=True)
except Exception:
continue
await asyncio.sleep(0.4)
out = await self._safe_read_clipboard()
if self._is_invalid_clipboard(out):
continue
return (out or "").strip()
return None
def _is_invalid_clipboard(
self, text: str | None, clipboard_before: str | None = None
) -> bool:
if not text:
return True
t = str(text).strip()
if not t:
return True
if clipboard_before is not None and t == str(clipboard_before).strip():
return True
if t.startswith("ERROR:"):
return True
# 防止误点拿到 UI 文案/占位文本
banned_substrings = (
"改用对话直接回答",
"新对话",
"正在生成",
"复制",
"下载",
"快捷",
"发消息",
)
if any(s in t for s in banned_substrings):
return True
if "data-testid" in t or "<button" in t:
return True
# 经验阈值:正常正文不会太短
if len(t) < 30:
return True
return False
async def _safe_read_clipboard(self) -> str | None:
try:
return await self.page.evaluate("navigator.clipboard.readText()")
except Exception:
return None

View File

@@ -0,0 +1,48 @@
import asyncio
from .base import BaseEngine
class KimiEngine(BaseEngine):
async def generate(self, prompt: str) -> str:
await self.page.goto("https://kimi.moonshot.cn/")
await self.page.wait_for_load_state("networkidle")
# 登录检测:等待输入框出现,超时则未登录
try:
editor = self.page.locator(".chat-input-editor").first
await editor.wait_for(state="visible", timeout=10000)
except Exception:
return "ERROR:REQUIRE_LOGIN"
# 输入提示词insert_text 避免换行符被误触发)
await editor.click()
await self.page.keyboard.insert_text(prompt)
await asyncio.sleep(0.5)
# 点击发送按钮
await self.page.locator(".send-button-container").first.click()
print("💡 [llm-manager/kimi] 已发送提示词,等待 Kimi 生成响应...")
# 等待 2 秒让发送按钮进入"生成中"状态
await asyncio.sleep(2)
# 等待 Send SVG 图标重新出现(表示生成完毕)
try:
send_icon = self.page.locator('.send-button-container svg[name="Send"]')
await send_icon.wait_for(state="visible", timeout=150000)
except Exception:
return "ERROR:Kimi 生成超时150秒未见完成标志可能网络卡住或内容被拦截。"
# 稳妥起见多等 2 秒,让复制按钮完全渲染
await asyncio.sleep(2)
# 点击最后一条回复的复制按钮,通过剪贴板取完整文本
try:
copy_btn = self.page.locator('svg[name="Copy"]').last
await copy_btn.click()
await asyncio.sleep(0.5)
result = await self.page.evaluate("navigator.clipboard.readText()")
return result.strip()
except Exception as e:
return f"ERROR:抓取 Kimi 内容时异常:{e}"

View File

@@ -0,0 +1,91 @@
"""
通义千问网页版驱动引擎tongyi.aliyun.com/qianwen
选择器说明(如页面改版需更新):
- 输入框:#search-input 或 textarea[class*="input"]textarea
- 发送按钮button[class*="send"] 或 [aria-label="发送"]
- 停止生成button[class*="stop"] 或 [aria-label="停止"]
- 复制按钮:最后一条回复下的 [class*="copy"] 或 [aria-label="复制"]
"""
import asyncio
from .base import BaseEngine
class QianwenEngine(BaseEngine):
async def generate(self, prompt: str) -> str:
await self.page.goto("https://tongyi.aliyun.com/qianwen/")
await self.page.wait_for_load_state("networkidle")
# 登录检测
input_selectors = [
"#search-input",
"textarea[class*='input']",
"div[class*='input'][contenteditable='true']",
]
editor = None
for sel in input_selectors:
try:
loc = self.page.locator(sel).first
await loc.wait_for(state="visible", timeout=4000)
editor = loc
break
except Exception:
continue
if editor is None:
return "ERROR:REQUIRE_LOGIN"
# 输入提示词
await editor.click()
await self.page.keyboard.insert_text(prompt)
await asyncio.sleep(0.5)
# 发送
sent = False
for sel in (
'button[class*="send"]',
'[aria-label="发送"]',
'button[type="submit"]',
):
try:
btn = self.page.locator(sel).first
if await btn.is_visible(timeout=1000):
await btn.click()
sent = True
break
except Exception:
continue
if not sent:
await self.page.keyboard.press("Enter")
print("💡 [llm-manager/qianwen] 已发送提示词,等待通义千问生成响应...")
await asyncio.sleep(2)
# 等待生成完毕
stop_selectors = ['button[class*="stop"]', '[aria-label="停止"]', '[aria-label="停止生成"]']
deadline = asyncio.get_event_loop().time() + 150
while asyncio.get_event_loop().time() < deadline:
stop_visible = False
for sel in stop_selectors:
try:
if await self.page.locator(sel).first.is_visible(timeout=300):
stop_visible = True
break
except Exception:
pass
if not stop_visible:
break
await asyncio.sleep(2)
await asyncio.sleep(2)
# 取结果
try:
copy_btn = self.page.locator(
'[aria-label="复制"], [class*="copy-btn"], button:has(svg[class*="copy"])'
).last
await copy_btn.click()
await asyncio.sleep(0.5)
result = await self.page.evaluate("navigator.clipboard.readText()")
return result.strip()
except Exception as e:
return f"ERROR:抓取通义千问内容时异常:{e}"

View File

@@ -0,0 +1,91 @@
"""
文心一言网页版驱动引擎yiyan.baidu.com
选择器说明(如页面改版需更新):
- 输入框:[class*="editor"] 或 div[contenteditable="true"](富文本编辑器)
- 发送按钮:[class*="send-btn"] 或 [aria-label="发送"]
- 停止生成:[class*="stop"] 相关按钮
- 复制按钮:最后一条回复下的复制按钮
"""
import asyncio
from .base import BaseEngine
class YiyanEngine(BaseEngine):
async def generate(self, prompt: str) -> str:
await self.page.goto("https://yiyan.baidu.com")
await self.page.wait_for_load_state("networkidle")
# 登录检测
input_selectors = [
"div[class*='editor'][contenteditable='true']",
"textarea[class*='input']",
"[contenteditable='true']",
]
editor = None
for sel in input_selectors:
try:
loc = self.page.locator(sel).first
await loc.wait_for(state="visible", timeout=4000)
editor = loc
break
except Exception:
continue
if editor is None:
return "ERROR:REQUIRE_LOGIN"
# 输入提示词
await editor.click()
await self.page.keyboard.insert_text(prompt)
await asyncio.sleep(0.5)
# 发送
sent = False
for sel in (
'[class*="send-btn"]',
'[aria-label="发送"]',
'button[class*="send"]',
):
try:
btn = self.page.locator(sel).first
if await btn.is_visible(timeout=1000):
await btn.click()
sent = True
break
except Exception:
continue
if not sent:
await self.page.keyboard.press("Enter")
print("💡 [llm-manager/yiyan] 已发送提示词,等待文心一言生成响应...")
await asyncio.sleep(2)
# 等待生成完毕
stop_selectors = ['[class*="stop"]', '[aria-label="停止"]', '[aria-label="停止生成"]']
deadline = asyncio.get_event_loop().time() + 150
while asyncio.get_event_loop().time() < deadline:
stop_visible = False
for sel in stop_selectors:
try:
if await self.page.locator(sel).first.is_visible(timeout=300):
stop_visible = True
break
except Exception:
pass
if not stop_visible:
break
await asyncio.sleep(2)
await asyncio.sleep(2)
# 取结果
try:
copy_btn = self.page.locator(
'[aria-label="复制"], [class*="copy"], button:has(svg[class*="copy"])'
).last
await copy_btn.click()
await asyncio.sleep(0.5)
result = await self.page.evaluate("navigator.clipboard.readText()")
return result.strip()
except Exception as e:
return f"ERROR:抓取文心一言内容时异常:{e}"

View File

@@ -0,0 +1,94 @@
"""
腾讯元宝网页版驱动引擎yuanbao.tencent.com
元宝暂无公开 API仅支持网页模式。
选择器说明(如页面改版需更新):
- 输入框:[class*="input-area"] textarea 或 [contenteditable="true"]
- 发送按钮:[class*="send-btn"] 或 [aria-label="发送"]
- 停止生成:[class*="stop"] 相关按钮
- 复制按钮:最后一条回复下的复制按钮
"""
import asyncio
from .base import BaseEngine
class YuanbaoEngine(BaseEngine):
async def generate(self, prompt: str) -> str:
await self.page.goto("https://yuanbao.tencent.com/chat")
await self.page.wait_for_load_state("networkidle")
# 登录检测
input_selectors = [
"textarea[class*='input']",
"div[class*='input-area'] textarea",
"[contenteditable='true']",
"textarea",
]
editor = None
for sel in input_selectors:
try:
loc = self.page.locator(sel).first
await loc.wait_for(state="visible", timeout=4000)
editor = loc
break
except Exception:
continue
if editor is None:
return "ERROR:REQUIRE_LOGIN"
# 输入提示词
await editor.click()
await self.page.keyboard.insert_text(prompt)
await asyncio.sleep(0.5)
# 发送
sent = False
for sel in (
'[class*="send-btn"]',
'[aria-label="发送"]',
'button[class*="send"]',
'button[type="submit"]',
):
try:
btn = self.page.locator(sel).first
if await btn.is_visible(timeout=1000):
await btn.click()
sent = True
break
except Exception:
continue
if not sent:
await self.page.keyboard.press("Enter")
print("💡 [llm-manager/yuanbao] 已发送提示词,等待腾讯元宝生成响应...")
await asyncio.sleep(2)
# 等待生成完毕
stop_selectors = ['[class*="stop"]', '[aria-label="停止"]', '[aria-label="停止生成"]']
deadline = asyncio.get_event_loop().time() + 150
while asyncio.get_event_loop().time() < deadline:
stop_visible = False
for sel in stop_selectors:
try:
if await self.page.locator(sel).first.is_visible(timeout=300):
stop_visible = True
break
except Exception:
pass
if not stop_visible:
break
await asyncio.sleep(2)
await asyncio.sleep(2)
# 取结果
try:
copy_btn = self.page.locator(
'[aria-label="复制"], [class*="copy"], button:has(svg[class*="copy"])'
).last
await copy_btn.click()
await asyncio.sleep(0.5)
result = await self.page.evaluate("navigator.clipboard.readText()")
return result.strip()
except Exception as e:
return f"ERROR:抓取腾讯元宝内容时异常:{e}"

575
llm-manager/scripts/main.py Normal file
View File

@@ -0,0 +1,575 @@
"""
llm-manager 主入口 CLI。
子命令:
health 快速离线健康检查
version 输出版本 JSON
add <platform> [api_key] 添加 API Key不传 api_key 则走网页账号关联)
key list [platform] 列出 Key打码
key del <key_id> 删除 Key
generate <platform_or_account_id> 生成内容(优先网页模式,备用 API Key 模式)
"<prompt>"
generate 调度规则:
1. 若 target 为纯数字 → 视为 account-manager 账号 ID → 强制网页模式
2. 若 target 为平台名/别名:
a. 先查 account-manager 有无该平台已登录账号 → 网页模式(免费)
b. 再查 llm_keys 有无可用 Key → API Key 模式(付费)
c. 两者均无 → 报错并给出操作指引
"""
import sys
import json
import os
import asyncio
import subprocess
# Windows GBK 编码兼容修复
if sys.platform == "win32":
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
OPENCLAW_DIR = os.path.dirname(BASE_DIR)
# 确保 scripts 目录在 sys.path使 providers/db 可直接 import
_scripts_dir = os.path.dirname(os.path.abspath(__file__))
if _scripts_dir not in sys.path:
sys.path.insert(0, _scripts_dir)
from providers import (
LLM_PROVIDERS,
resolve_provider_key,
provider_list_cn,
find_logged_in_account,
resolve_chromium_channel,
)
from db import (
add_key,
upsert_web_account,
list_keys,
list_web_accounts,
get_key_by_id,
delete_key,
find_active_key,
mark_key_used,
_mask_key,
)
from engines.api_engine import ApiEngine
from engines.kimi import KimiEngine
from engines.doubao import DoubaoEngine
from engines.deepseek import DeepSeekEngine
from engines.qianwen import QianwenEngine
from engines.yiyan import YiyanEngine
from engines.yuanbao import YuanbaoEngine
# 平台 slug → 网页引擎类(全部 6 个平台)
WEB_ENGINES = {
"doubao": DoubaoEngine,
"deepseek": DeepSeekEngine,
"qianwen": QianwenEngine,
"kimi": KimiEngine,
"yiyan": YiyanEngine,
"yuanbao": YuanbaoEngine,
}
SKILL_VERSION = "1.0.3"
def _engine_result_is_error(text: str) -> bool:
"""网页/API 引擎约定:失败时返回以 ERROR: 开头的字符串,不得当作正文包进 LLM 标记块。"""
return (text or "").lstrip().startswith("ERROR:")
def _unix_to_iso(ts):
if ts is None:
return ""
try:
import datetime as _dt
return _dt.datetime.fromtimestamp(int(ts)).isoformat(timespec="seconds")
except Exception:
return str(ts)
# ---------------------------------------------------------------------------
# account-manager 跨 Skill 调用
# ---------------------------------------------------------------------------
def _get_account_from_manager(account_id) -> dict | None:
"""通过 account-manager CLI 按 ID 取账号 JSON。"""
script = os.path.join(OPENCLAW_DIR, "account-manager", "scripts", "main.py")
try:
result = subprocess.run(
[sys.executable, script, "get", str(account_id)],
capture_output=True, text=True, encoding="utf-8", errors="replace",
)
raw = result.stdout.strip()
if raw.startswith("ERROR"):
return None
return json.loads(raw)
except Exception:
return None
# ---------------------------------------------------------------------------
# 网页模式
# ---------------------------------------------------------------------------
async def _run_web_generate(account: dict, prompt: str) -> bool:
from playwright.async_api import async_playwright
platform = account.get("platform", "")
profile_dir = account.get("profile_dir", "").strip()
engine_cls = WEB_ENGINES.get(platform)
if not engine_cls:
print(f"ERROR:ENGINE_NOT_FOUND 平台 {platform} 暂无网页引擎实现。")
return False
if not profile_dir or not os.path.isdir(profile_dir):
print(f"ERROR:PROFILE_DIR_MISSING 账号 {account.get('id')} 的浏览器数据目录不存在:{profile_dir}")
print("请先通过 account-manager 执行登录python account-manager/scripts/main.py login <id>")
return False
channel = resolve_chromium_channel()
if not channel:
print("ERROR:浏览器未找到。请安装 Google Chrome 或 Microsoft Edge 后重试。")
return False
print(f"[llm-manager] 使用网页模式 | 平台: {LLM_PROVIDERS.get(platform, {}).get('label', platform)} | 账号: {account.get('name', account.get('id'))}")
async with async_playwright() as p:
browser = await p.chromium.launch_persistent_context(
user_data_dir=profile_dir,
headless=False,
channel=channel,
no_viewport=True,
permissions=["clipboard-read", "clipboard-write"],
args=["--start-maximized"],
)
try:
page = browser.pages[0] if browser.pages else await browser.new_page()
engine = engine_cls(page)
try:
result = await engine.generate(prompt)
except Exception as e:
print(f"ERROR:WEB_GENERATE_FAILED 网页模式生成失败:{e}")
return False
result = (result or "").strip()
if _engine_result_is_error(result):
print(result)
return False
print("===LLM_START===")
print(result)
print("===LLM_END===")
return True
finally:
await asyncio.sleep(3)
await browser.close()
# ---------------------------------------------------------------------------
# API Key 模式
# ---------------------------------------------------------------------------
def _run_api_generate(provider_key: str, key_record: dict, prompt: str) -> bool:
provider = LLM_PROVIDERS[provider_key]
api_base = provider.get("api_base")
model = (key_record.get("default_model") or "").strip() or (provider.get("api_model") or "").strip()
if not api_base:
print(f"ERROR:NO_API_BASE {provider['label']} 暂无 API 地址(不支持 API 模式)。")
return False
if not model:
print(
f"ERROR:API_MODEL_MISSING {provider['label']} 需要指定模型名称。\n"
f"提示:{provider.get('api_note', '')}\n"
f"请删除该 Key 并重新添加时带上 --model 参数python main.py add {provider_key} \"Key\" --model \"模型名\""
)
return False
print(f"[llm-manager] 使用 API Key 模式 | 平台: {provider['label']} | 模型: {model}")
try:
engine = ApiEngine(api_base=api_base, api_key=key_record["api_key"], model=model)
result = engine.generate(prompt)
except Exception as e:
print(f"ERROR:API_GENERATE_FAILED API 模式调用失败:{e}")
return False
result = (result or "").strip()
if _engine_result_is_error(result):
print(result)
return False
mark_key_used(key_record["id"])
print("===LLM_START===")
print(result)
print("===LLM_END===")
return True
# ---------------------------------------------------------------------------
# generate 命令
# ---------------------------------------------------------------------------
def cmd_generate(target: str, prompt: str) -> bool:
"""
target 可以是:
- 纯数字 → account-manager 账号 ID强制网页模式
- 平台名/别名 → 自动选择模式网页优先API Key 备用)
成功返回 True任一步失败返回 False调用方应设进程退出码非 0
"""
target = (target or "").strip()
prompt = (prompt or "").strip()
if not prompt:
print("ERROR:PROMPT_EMPTY 提示词不能为空。")
return False
# ---- 情况 1显式 account_id纯数字→ 强制网页模式 ----
if target.isdigit():
account = _get_account_from_manager(target)
if not account:
print(f"ERROR:ACCOUNT_NOT_FOUND 未在「模型管理」对应的账号列表中找到 id={target}")
print("请先在模型管理中添加该平台账号,或检查 id 是否抄错。")
return False
if account.get("login_status") != 1:
print(
f"ERROR:REQUIRE_LOGIN 账号「{account.get('name', target)}」尚未完成网页登录。\n"
"请先在「模型管理」里确认已添加该账号,再执行下面命令完成登录:\n"
f" python account-manager/scripts/main.py login {target}\n"
"若使用网页模式:请先在对应平台官网注册好账号,再回到本工具添加并登录。"
)
return False
return asyncio.run(_run_web_generate(account, prompt))
# ---- 情况 2平台名 → 自动选模式 ----
provider_key = resolve_provider_key(target)
if not provider_key:
print(f"ERROR:INVALID_PLATFORM 无法识别的平台「{target}」。")
print("支持的平台:" + provider_list_cn())
return False
provider = LLM_PROVIDERS[provider_key]
label = provider["label"]
web_url = (provider.get("web_url") or "").strip()
# 优先级 1查 account-manager 已登录账号(网页模式,免费)
account = find_logged_in_account(provider_key)
if account:
return asyncio.run(_run_web_generate(account, prompt))
# 优先级 2查本地 API Key付费
key_record = find_active_key(provider_key)
if key_record:
return _run_api_generate(provider_key, key_record, prompt)
# 两种凭据均无 → 明确说明「模型管理 + 网页模式需官网账号」
print(f"ERROR:NO_CREDENTIAL 当前没有可用的「{label}」调用凭据(既没有已登录的网页账号,也没有可用的 API Key")
print()
print("【推荐 · 网页模式(免费)】按顺序做:")
print(f" ① 打开「模型管理」(与 OpenClaw 里 account-manager 账号管理是同一套数据),先把「{label}」账号添加进去。")
print(" 命令行示例(在 OpenClaw 根目录执行):")
print(f" python account-manager/scripts/main.py add \"{label}\" \"你的手机号或登录名\"")
print(" python account-manager/scripts/main.py login <上一步返回的账号 id>")
print(" ② 若走网页模式:请先在对应平台官方网站注册并创建好账号(否则浏览器里登录会失败)。")
if web_url:
print(f"{label}」网页入口:{web_url}")
print()
if provider.get("has_api"):
print("【备选 · API Key付费】若已在厂商控制台开通 API可本地登记 Key 后走接口调用:")
print(f" python llm-manager/scripts/main.py add {provider_key} \"你的API Key\"")
if provider.get("api_note"):
print(f" 说明:{provider['api_note']}")
else:
print(f"说明:「{label}」暂无公开 API只能使用上面的网页模式。")
return False
# ---------------------------------------------------------------------------
# key 子命令
# ---------------------------------------------------------------------------
def cmd_key_add(provider_input: str, api_key: str = "", model: str = None, label: str = ""):
provider_key = resolve_provider_key(provider_input)
if not provider_key:
print(f"ERROR:INVALID_PLATFORM 无法识别的平台「{provider_input}」。")
print("支持:" + provider_list_cn())
return
provider = LLM_PROVIDERS[provider_key]
api_key = (api_key or "").strip()
# 不传 api_key默认走网页版本检查 account-manager 是否已有该平台已登录账号
if not api_key:
account = find_logged_in_account(provider_key)
if account:
web_row_id = upsert_web_account(
provider=provider_key,
account_id=int(account.get("id")),
account_name=(account.get("name") or account.get("account_name") or ""),
login_status=int(account.get("login_status") or 0),
)
print(
f"OK:WEB_ACCOUNT_READY 已关联网页模式账号 | 平台: {provider['label']} "
f"| account_id: {account.get('id')} | 账号: {account.get('name') or account.get('account_name') or ''}"
)
print(f"已写入 llm-manager 记录WEB_ID {web_row_id}")
print(f"后续可直接调用python main.py generate {provider_key} \"<提示词>\"")
return
print(f"ERROR:WEB_ACCOUNT_NOT_FOUND 未找到已登录的「{provider['label']}」账号,暂时无法走网页模式。")
print("请先在 account-manager 添加并登录该平台账号:")
print(f" python account-manager/scripts/main.py add \"{provider['label']}\" \"你的登录名\"")
print(" python account-manager/scripts/main.py login <账号id>")
if provider.get("has_api"):
print("或直接提供 API Key")
print(f" python main.py add {provider_key} \"<API_Key>\"")
return
if not provider.get("has_api"):
print(f"ERROR:{provider['label']} 暂无公开 API不支持 API Key 模式。")
print("请改用网页模式并先在 account-manager 完成登录。")
return
new_id = add_key(provider=provider_key, api_key=api_key, model=model, label=label)
print(f"✅ 已保存 API KeyID {new_id} | {provider['label']} | 模型: {model or provider.get('api_model') or '(未指定)'} | {_mask_key(api_key)}")
if not model and not provider.get("api_model"):
print(f"⚠️ 注意:该平台需要指定模型名称,否则调用时会报错。")
print(f" {provider.get('api_note', '')}")
print(f" 可删除后重新添加python main.py key del {new_id}")
def cmd_key_list(provider_input: str = None, limit: int = 10):
provider_key = None
if provider_input:
provider_key = resolve_provider_key(provider_input)
if not provider_key:
print(f"ERROR:INVALID_PLATFORM 无法识别的平台「{provider_input}」。")
return
keys = list_keys(provider_key, limit=limit)
web_accounts = list_web_accounts(provider_key, limit=limit)
if not keys and not web_accounts:
print("暂无记录API Key / 网页账号关联 都为空)。")
print("添加 API Keypython main.py add <platform> \"API Key\" [--model 模型名]")
print("添加网页账号关联python main.py add <platform>")
return
sep_line = "_" * 39
rows = []
for k in keys:
rows.append({
"type": "api_key",
"created_at": int(k.get("created_at") or 0),
"payload": k,
})
for w in web_accounts:
rows.append({
"type": "web_account",
"created_at": int(w.get("created_at") or 0),
"payload": w,
})
rows.sort(key=lambda x: (x["created_at"], int(x["payload"].get("id") or 0)), reverse=True)
for idx, row in enumerate(rows):
if row["type"] == "api_key":
k = row["payload"]
print("record_typeapi_key")
print(f"id{k['id']}")
print(f"platform{k['provider']}")
print(f"platform_cn{LLM_PROVIDERS.get(k['provider'], {}).get('label', k['provider'])}")
print(f"label{k.get('label') or ''}")
print(f"api_key{_mask_key(k.get('api_key') or '')}")
print(f"default_model{k.get('default_model') or ''}")
print(f"is_active{int(k.get('is_active') or 0)}")
print(f"last_used_at{_unix_to_iso(k.get('last_used_at'))}")
print(f"created_at{_unix_to_iso(k.get('created_at'))}")
else:
w = row["payload"]
print("record_typeweb_account")
print(f"id{w['id']}")
print(f"platform{w['provider']}")
print(f"platform_cn{LLM_PROVIDERS.get(w['provider'], {}).get('label', w['provider'])}")
print(f"account_id{w.get('account_id')}")
print(f"account_name{w.get('account_name') or ''}")
print(f"login_status{int(w.get('login_status') or 0)}")
print(f"created_at{_unix_to_iso(w.get('created_at'))}")
print(f"updated_at{_unix_to_iso(w.get('updated_at'))}")
if idx != len(rows) - 1:
print(sep_line)
print()
def cmd_key_del(key_id_str: str):
if not key_id_str.isdigit():
print(f"ERROR:INVALID_ID key_id 须为正整数,收到「{key_id_str}」。")
return
key_id = int(key_id_str)
key = get_key_by_id(key_id)
if not key:
print(f"ERROR:KEY_NOT_FOUND 未找到 ID={key_id} 的 API Key。")
return
delete_key(key_id)
label_str = f"{key['label']}" if key.get("label") else ""
print(f"✅ 已删除ID {key_id} | {LLM_PROVIDERS.get(key['provider'], {}).get('label', key['provider'])}{label_str} | {_mask_key(key['api_key'])}")
# ---------------------------------------------------------------------------
# health / version
# ---------------------------------------------------------------------------
def cmd_health():
ok = sys.version_info >= (3, 10)
sys.exit(0 if ok else 1)
def cmd_version():
print(json.dumps({"version": SKILL_VERSION, "skill": "llm-manager"}, ensure_ascii=False))
# ---------------------------------------------------------------------------
# CLI 解析
# ---------------------------------------------------------------------------
def _print_usage():
print("用法:")
print(" python main.py health")
print(" python main.py version")
print(" python main.py generate <平台名或account_id> \"<提示词>\"")
print(" python main.py add <平台> [API_Key] [--model 模型名] [--label 备注]")
print(" python main.py list [平台] [--limit 条数]")
print(" python main.py del <key_id>")
print()
print("支持的平台:" + provider_list_cn())
print()
print("generate 说明:")
print(" · 传入 account_id数字→ 指定账号网页模式(需先用 account-manager 登录)")
print(" · 传入平台名 → 自动选择:优先网页模式(免费),无可用账号时才用 API Key付费")
print()
print("兼容说明key add/list/del 旧写法仍可用。")
def main(argv=None) -> int:
args = argv if argv is not None else sys.argv[1:]
if not args:
_print_usage()
return 1
if args[0] in ("-h", "--help"):
_print_usage()
return 0
def _parse_list_args(rest_args):
platform_filter = None
limit = 10
i = 0
while i < len(rest_args):
if rest_args[i] == "--limit" and i + 1 < len(rest_args):
try:
limit = int(rest_args[i + 1])
except ValueError:
print(f"ERROR:CLI_KEY_LIST_BAD_LIMIT limit 必须是整数,收到「{rest_args[i + 1]}」。")
return None, None, 1
i += 2
else:
if platform_filter is None:
platform_filter = rest_args[i]
i += 1
return platform_filter, limit, 0
def _parse_add_args(rest_args):
if len(rest_args) < 1:
print("ERROR:CLI_ADD_MISSING_ARGS")
print("用法python main.py add <平台> [API_Key] [--model 模型] [--label 备注]")
return None, None, None, None, 1
platform_arg = rest_args[0]
api_key_arg = ""
model_arg = None
label_arg = ""
i = 1
if i < len(rest_args) and not rest_args[i].startswith("--"):
api_key_arg = rest_args[i]
i += 1
while i < len(rest_args):
if rest_args[i] == "--model" and i + 1 < len(rest_args):
model_arg = rest_args[i + 1]
i += 2
elif rest_args[i] == "--label" and i + 1 < len(rest_args):
label_arg = rest_args[i + 1]
i += 2
else:
i += 1
return platform_arg, api_key_arg, model_arg, label_arg, 0
cmd = args[0]
if cmd == "health":
cmd_health()
elif cmd == "version":
cmd_version()
elif cmd == "generate":
if len(args) < 3:
print("ERROR:CLI_GENERATE_MISSING_ARGS")
print("用法python main.py generate <平台名或account_id> \"<提示词>\"")
return 1
if not cmd_generate(args[1], args[2]):
return 1
elif cmd == "list":
platform_filter, limit, err = _parse_list_args(args[1:])
if err:
return err
cmd_key_list(platform_filter, limit=limit)
elif cmd == "add":
platform_arg, api_key_arg, model_arg, label_arg, err = _parse_add_args(args[1:])
if err:
return err
cmd_key_add(platform_arg, api_key_arg, model=model_arg, label=label_arg)
elif cmd == "del":
if len(args) < 2:
print("ERROR:CLI_DEL_MISSING_ARGS 用法python main.py del <key_id>")
return 1
cmd_key_del(args[1])
elif cmd == "key":
if len(args) < 2:
print("ERROR:CLI_KEY_MISSING_SUBCOMMAND 请指定子命令add / list / del")
return 1
sub = args[1]
if sub == "add":
platform_arg, api_key_arg, model_arg, label_arg, err = _parse_add_args(args[2:])
if err:
return err
cmd_key_add(platform_arg, api_key_arg, model=model_arg, label=label_arg)
elif sub == "list":
platform_filter, limit, err = _parse_list_args(args[2:])
if err:
return err
cmd_key_list(platform_filter, limit=limit)
elif sub == "del":
if len(args) < 3:
print("ERROR:CLI_KEY_DEL_MISSING_ARGS 用法python main.py key del <key_id>")
return 1
cmd_key_del(args[2])
else:
print(f"ERROR:CLI_UNKNOWN_KEY_SUB 未知 key 子命令「{sub}支持add / list / del")
return 1
else:
print(f"ERROR:CLI_UNKNOWN_COMMAND 未知命令「{cmd}」。")
_print_usage()
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,199 @@
"""
平台配置中心7 个 LLM 平台的静态配置、别名解析、以及通过 account-manager 暴露的 CLI 查询已登录网页账号。
"""
import json
import os
import subprocess
import sys
# ---------------------------------------------------------------------------
# 平台静态配置
# ---------------------------------------------------------------------------
LLM_PROVIDERS = {
"doubao": {
"label": "豆包",
"aliases": ["豆包"],
"web_url": "https://www.doubao.com/chat/",
"api_base": "https://ark.volces.com/api/v3",
"api_model": None, # 豆包需要用户在火山引擎控制台建推理接入点model=ep-xxx
"has_api": True,
"api_note": "需先在火山引擎控制台创建推理接入点,--model 传 endpoint_id格式 ep-xxx",
},
"deepseek": {
"label": "DeepSeek",
"aliases": ["深度求索"],
"web_url": "https://chat.deepseek.com",
"api_base": "https://api.deepseek.com/v1",
"api_model": "deepseek-chat",
"has_api": True,
"api_note": "模型可选deepseek-chat / deepseek-reasoner",
},
"qianwen": {
"label": "通义千问",
"aliases": ["通义", "千问", "qwen", "tongyi"],
"web_url": "https://tongyi.aliyun.com/qianwen/",
"api_base": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"api_model": "qwen-plus",
"has_api": True,
"api_note": "模型可选qwen-turbo / qwen-plus / qwen-max",
},
"kimi": {
"label": "Kimi",
"aliases": ["月之暗面", "moonshot"],
"web_url": "https://kimi.moonshot.cn",
"api_base": "https://api.moonshot.cn/v1",
"api_model": "moonshot-v1-8k",
"has_api": True,
"api_note": "模型可选moonshot-v1-8k / moonshot-v1-32k / moonshot-v1-128k",
},
"yiyan": {
"label": "文心一言",
"aliases": ["文心", "一言", "ernie", "wenxin"],
"web_url": "https://yiyan.baidu.com",
"api_base": "https://qianfan.baidubce.com/v2",
"api_model": "ernie-4.0-8k",
"has_api": True,
"api_note": "模型可选ernie-4.0-8k / ernie-3.5-8k",
},
"yuanbao": {
"label": "腾讯元宝",
"aliases": ["元宝"],
"web_url": "https://yuanbao.tencent.com/chat",
"api_base": None,
"api_model": None,
"has_api": False,
"api_note": "暂无公开 API仅支持网页模式",
},
"minimax": {
"label": "MiniMax",
"aliases": ["minimax", "MiniMax", "海螺", "海螺AI"],
"web_url": "https://chat.minimax.io/",
"api_base": "https://api.minimax.chat/v1",
"api_model": "MiniMax-Text-01",
"has_api": True,
"api_note": "模型可按 MiniMax 控制台可用模型调整,建议通过 --model 显式指定。",
},
}
# 构建别名查找表含中文、英文键、aliases
_ALIAS_TO_KEY: dict = {}
for _k, _spec in LLM_PROVIDERS.items():
_ALIAS_TO_KEY[_k] = _k
_ALIAS_TO_KEY[_k.lower()] = _k
_ALIAS_TO_KEY[_spec["label"]] = _k
for _a in (_spec.get("aliases") or []):
_ALIAS_TO_KEY[_a] = _k
_ALIAS_TO_KEY[_a.lower()] = _k
def resolve_provider_key(name: str):
"""将用户输入的平台名称/别名解析为内部 slug无法识别返回 None。"""
if not name:
return None
s = str(name).strip()
return _ALIAS_TO_KEY.get(s) or _ALIAS_TO_KEY.get(s.lower())
def provider_list_cn() -> str:
return "".join(s["label"] for s in LLM_PROVIDERS.values())
# ---------------------------------------------------------------------------
# 路径帮助(与 account-manager/scripts/main.py 完全一致:仅 JIANGCHANG_*
# ---------------------------------------------------------------------------
_OPENCLAW_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def get_data_root() -> str:
env = (os.getenv("JIANGCHANG_DATA_ROOT") or "").strip()
if env:
return env
if sys.platform == "win32":
return r"D:\jiangchang-data"
return os.path.join(os.path.expanduser("~"), ".jiangchang-data")
def get_user_id() -> str:
uid = (os.getenv("JIANGCHANG_USER_ID") or "").strip()
return uid or "_anon"
# ---------------------------------------------------------------------------
# 跨技能:仅通过 account-manager 提供的 CLI 读取账号(不直接打开其数据库文件)
# ---------------------------------------------------------------------------
def _account_manager_script_path() -> str:
return os.path.join(_OPENCLAW_DIR, "account-manager", "scripts", "main.py")
def find_logged_in_account(provider_key: str) -> dict | None:
"""
调用 account-managerpick-logged-in <platform_key>取该平台已登录login_status=1的优先账号。
成功返回与 account.py get 一致的 dict失败或不可用时返回 None。
"""
script = _account_manager_script_path()
if not os.path.isfile(script):
return None
try:
proc = subprocess.run(
[sys.executable, script, "pick-logged-in", provider_key],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
except OSError:
return None
raw = (proc.stdout or "").strip()
if not raw or raw.startswith("ERROR:"):
return None
try:
data = json.loads(raw.splitlines()[0])
except (json.JSONDecodeError, IndexError):
return None
if not isinstance(data, dict) or data.get("id") is None:
return None
plat = data.get("platform") or provider_key
if not (data.get("url") or "").strip():
data["url"] = LLM_PROVIDERS.get(provider_key, {}).get("web_url", "")
data["platform"] = plat
return data
# ---------------------------------------------------------------------------
# Chrome/Edge 检测(与 account-manager 逻辑保持一致)
# ---------------------------------------------------------------------------
def _win_find_exe(candidates):
for p in candidates:
if p and os.path.isfile(p):
return p
return None
def resolve_chromium_channel() -> str | None:
"""返回 'chrome' | 'msedge' | None。"""
if sys.platform != "win32":
return "chrome"
pf = os.environ.get("ProgramFiles", r"C:\Program Files")
pfx86 = os.environ.get("ProgramFiles(x86)", r"C:\Program Files (x86)")
local = os.environ.get("LocalAppData", "")
chrome = _win_find_exe([
os.path.join(pf, "Google", "Chrome", "Application", "chrome.exe"),
os.path.join(pfx86, "Google", "Chrome", "Application", "chrome.exe"),
os.path.join(local, "Google", "Chrome", "Application", "chrome.exe") if local else "",
])
if chrome:
return "chrome"
edge = _win_find_exe([
os.path.join(pfx86, "Microsoft", "Edge", "Application", "msedge.exe"),
os.path.join(pf, "Microsoft", "Edge", "Application", "msedge.exe"),
os.path.join(local, "Microsoft", "Edge", "Application", "msedge.exe") if local else "",
])
if edge:
return "msedge"
return None