""" DeepSeek 网页版驱动引擎(chat.deepseek.com)。 选择器说明(如页面改版需更新): - 输入框:#chat-input(textarea) - 发送按钮:[aria-label="发送消息"] 或 div[class*="send"] > button - 停止生成按钮:[aria-label="停止生成"](生成中可见,生成结束消失) - 复制按钮:最后一条回复下的 [aria-label="复制"] """ import asyncio from .base import BaseEngine class DeepSeekEngine(BaseEngine): async def generate(self, prompt: str) -> str: await self.page.goto("https://chat.deepseek.com") await self.page.wait_for_load_state("networkidle") # 登录检测:若能找到输入框则已登录 try: editor = self.page.locator("textarea#chat-input").first await editor.wait_for(state="visible", timeout=10000) except Exception: return "ERROR:REQUIRE_LOGIN" # 输入提示词 await editor.click() await self.page.keyboard.insert_text(prompt) await asyncio.sleep(0.5) # 发送(优先点按钮,失败则按 Enter) sent = False for sel in ('[aria-label="发送消息"]', 'div[class*="send"] > button', 'button[type="submit"]'): try: btn = self.page.locator(sel).first if await btn.is_visible(timeout=1000): await btn.click() sent = True break except Exception: continue if not sent: await self.page.keyboard.press("Enter") print("💡 [llm-manager/deepseek] 已发送提示词,等待 DeepSeek 生成响应...") await asyncio.sleep(2) # 等待生成完毕:停止按钮消失即为完成,超时 150 秒 stop_sel = '[aria-label="停止生成"]' deadline = asyncio.get_event_loop().time() + 150 while asyncio.get_event_loop().time() < deadline: try: visible = await self.page.locator(stop_sel).first.is_visible(timeout=500) if not visible: break except Exception: break await asyncio.sleep(2) await asyncio.sleep(2) # 通过最后一个复制按钮取结果 try: copy_btn = self.page.locator('[aria-label="复制"]').last await copy_btn.click() await asyncio.sleep(0.5) result = await self.page.evaluate("navigator.clipboard.readText()") return result.strip() except Exception as e: return f"ERROR:抓取 DeepSeek 内容时异常:{e}"