RPA + AI 落地踩坑实录:10 个真实坑点与性能优化方案
做电商订单自动化,RPA 跑网页操作,支持内网离线部署,搭配大模型做图片识别。AI 负责识别订单状态和图片信息,听起来很美好,实际踩的坑比写的代码还多。流程跑了三天,第四天页面改版,XPath 定位全部失效,凌晨 2 点收到告警,日志里全是 NoSuchElementException。
前后折腾了两个月,踩了 10 个坑。这篇文章把最常见的坑点整理出来,每个都配了当时的报错和修复方案。如果你正在做 RPA + AI 落地,应该能省掉不少时间。
一、元素定位:今天能跑,明天就崩
最开始用 XPath 定位页面元素,写法是:
selector = "//button[@id='submit']"
跑了三天没问题,第四天页面改版,id 从 submit 变成 submit_v2,整个流程直接中断。那天凌晨 2 点收到告警,爬起来一看日志,全是 NoSuchElementException。
当时试了几种办法:先用 class 定位,结果 class 也是动态生成的;后来用文本内容定位,页面一多语言就崩。最后找到个办法——用多属性组合定位,同时匹配 class 和文本:
selector = "//button[contains(@class,'btn-primary') and text()='提交订单']"
另外增加了显式等待,等元素出现再操作:
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
wait = WebDriverWait(driver, 10)
element = wait.until(EC.presence_of_element_located((By.XPATH, selector)))
更省事的方案是直接用本地智能元素路径生成,它会根据页面结构自动推荐最稳定的定位策略。后来换了个工具,支持这个功能,改版后自动适配,不用手动改代码。
二、AI 接口超时:大模型不是随时在线
流程里调用了大模型做图片 OCR 识别,代码写得简单粗暴:
result = llm_client.recognize(image_path)
遇到网络波动或模型服务繁忙,直接卡死 30 秒,然后抛出 TimeoutError。整个 RPA 流程被这一个步骤拖垮。
后来加了超时和重试:
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def call_ai_api(image_path):
response = requests.post(
"https://api.xxx.com/ocr",
files={"image": open(image_path, "rb")},
timeout=15
)
return response.json()
还准备了降级方案。如果 AI 识别失败,回退到传统 OCR 或标记为人工复核。
费用控制方面,建议自己对接各平台 API,按量付费,比买套餐灵活。有些工具支持直接接入文心一言、DeepSeek、Kimi 等模型,费用完全透明,用多少付多少,没有中间商差价。
三、浏览器内存泄漏:跑久了就卡死
RPA 自动化需要打开几十个网页获取数据,跑了一晚上,服务器内存从 2G 飙到 16G,最后进程被系统杀掉。
根本原因是浏览器实例没正确关闭。Selenium 的 driver.quit() 有时候执行不完整,残留进程继续吃内存。
后来用上下文管理器确保资源释放:
from contextlib import contextmanager
@contextmanager
def managed_browser():
driver = webdriver.Chrome(options=options)
try:
yield driver
finally:
driver.quit()
# 强制清理残留进程
import subprocess
subprocess.run(["pkill", "-f", "chromedriver"], capture_output=True)
with managed_browser() as driver:
process_data(driver)
还加了内存监控,超过阈值自动重启:
import psutil
def check_memory():
mem = psutil.Process().memory_info().rss / 1024 / 1024 # MB
if mem > 2048: # 超过 2G 重启
restart_browser()
优化前内存 16G 崩溃,优化后稳定在 4G 以内。
四、并发冲突:多个任务抢同一个账号
公司上了 5 个 RPA 机器人,都登录同一个后台系统。结果互相挤掉线,A 刚登录,B 就把 A 踢下去,数据全乱。
根本原因是目标系统有单点登录限制,同一个账号不能多处同时在线。
解决方案:准备多个账号,每个 RPA 实例分配独立账号。如果账号有限,用分布式锁控制并发:
import redis
r = redis.Redis(host='localhost', port=6379)
def acquire_lock(lock_name, timeout=300):
return r.set(lock_name, "locked", nx=True, ex=timeout)
# 任务开始前抢锁
if acquire_lock("account_001"):
try:
run_task()
finally:
r.delete("account_001")
else:
log.warning("账号被占用,任务入队等待")
更彻底的方案是对接指纹浏览器,比如紫鸟浏览器、比特浏览器、AdsPower 等。每个任务用独立的浏览器环境 + 独立 Cookie,从根本上避免账号冲突。有些平台已内置了这些指纹浏览器的对接能力,配置一下就能用。
五、数据格式不一致:AI 输出结构千变万化
让 AI 从网页提取订单信息,要求返回 JSON 格式。结果有时候返回标准 JSON,有时候加了 Markdown 代码块标记 ```json,有时候干脆用纯文本描述。下游解析逻辑直接崩溃。
后来把 prompt 写得更严格:
prompt = """
从以下订单文本中提取信息,严格按 JSON 格式返回,不要添加任何其他内容:
{
"order_id": "订单编号",
"amount": "金额数字",
"status": "订单状态"
}
订单文本:{text}
"""
还加了格式校验和重试:
import json
def safe_parse_ai_output(raw_text):
# 去掉可能的 markdown 标记
clean = raw_text.replace("```json", "").replace("```", "").strip()
try:
return json.loads(clean)
except json.JSONDecodeError:
# 第二次尝试,让 AI 重新格式化
fix_prompt = f"将以下内容转为标准 JSON:{clean}"
return safe_parse_ai_output(call_ai(fix_prompt))
如果工具支持函数调用(Function Calling)模式,让模型直接输出结构化数据,比自由文本靠谱得多。
六、异常处理缺失:一个报错毁掉整批数据
批量处理 1000 条订单,第 87 条遇到异常(页面弹窗遮挡),整个流程终止,前面 86 条的处理结果也没保存。
后来改成每条记录独立 try-catch:
results = []
for idx, order in enumerate(orders):
try:
result = process_single_order(order)
results.append({"idx": idx, "status": "success", "data": result})
except Exception as e:
results.append({"idx": idx, "status": "failed", "error": str(e)})
log.error(f"订单 {order['id']} 处理失败: {e}")
# 继续下一条,不中断
continue
# 批量保存,避免丢失
save_results(results)
还实现了断点续传。记录处理进度,中断后从上次位置继续:
import pickle
def save_checkpoint(processed_ids):
with open("checkpoint.pkl", "wb") as f:
pickle.dump(processed_ids, f)
def load_checkpoint():
try:
with open("checkpoint.pkl", "rb") as f:
return pickle.load(f)
except FileNotFoundError:
return set()
processed = load_checkpoint()
for order in orders:
if order['id'] in processed:
continue
# 处理...
processed.add(order['id'])
save_checkpoint(processed)
七、日志缺失:出了问题找不到原因
流程凌晨 3 点报错,早上来看只有一句 Exception: timeout。不知道当时页面状态、不知道 AI 返回了什么、不知道是哪一步卡住的。
后来每个关键步骤都记详细上下文:
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(f"rpa_log_{datetime.now().strftime('%Y%m%d')}.log"),
logging.StreamHandler()
]
)
def process_order(order_id):
logging.info(f"开始处理订单 {order_id}")
try:
page_data = fetch_page(order_id)
logging.info(f"页面获取成功,URL: {page_data['url']}")
ai_result = call_ai(page_data['content'])
logging.info(f"AI 识别完成,原始输出: {ai_result[:200]}...")
save_result(order_id, ai_result)
logging.info(f"订单 {order_id} 处理完成")
except Exception as e:
logging.error(f"订单 {order_id} 失败: {e}", exc_info=True)
# 截图保存现场
driver.save_screenshot(f"error_{order_id}_{int(time.time())}.png")
raise
关键节点截图,页面异常时自动截图,事后复盘能看到当时页面状态。
八、性能瓶颈:单线程跑一天都跑不完
10000 条数据,单线程顺序处理,每条平均 30 秒,跑了 4 小时才完成 500 条。算下来要 83 小时,业务方要求当天出结果。
改成 5 线程并行:
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_batch(orders, max_workers=5):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_order, o): o for o in orders}
for future in as_completed(futures):
order = futures[future]
try:
result = future.result()
log.info(f"订单 {order['id']} 完成")
except Exception as e:
log.error(f"订单 {order['id']} 失败: {e}")
优化后 16 小时跑完。如果工具支持 API 触发 和 定时任务调度,可以拆成 5 个独立任务分布式跑,每个任务打包成 EXE 丢到不同机器上,配合定时执行,真正实现无人值守。
九、部署环境差异:本地能跑,服务器不行
开发环境 Windows 10,Chrome 120 版本,流程跑得顺顺的。部署到服务器(Windows Server 2019,Chrome 118),元素定位全部失效,字体渲染也不一样。
解决方案:固定浏览器版本,用 WebDriver Manager 自动匹配:
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
service = Service(ChromeDriverManager(version="120.0.6099.109").install())
统一分辨率设置:
options.add_argument("--window-size=1920,1080")
options.add_argument("--force-device-scale-factor=1")
如果环境差异实在太大,打包成 EXE 独立运行是更省事的方案。把浏览器和运行时一起打包,部署时直接复制过去就行,不用管服务器环境。某些工具支持在线推送更新,打开应用自动检测新版本,不用手动重新分发。
十、安全合规:数据泄露风险被忽视
RPA 流程里处理了客户手机号、地址等敏感信息。日志里全明文记录,截图也保存在公共目录。后来被安全审计抓出来,整改了一周。
后来做了日志脱敏:
import re
def mask_sensitive(text):
# 手机号脱敏
text = re.sub(r'(\d{3})\d{4}(\d{4})', r'\1****\2', text)
# 地址脱敏
text = re.sub(r'([\u4e00-\u9fa5]{2,}省)?[\u4e00-\u9fa5]{2,}市', r'**市', text)
return text
logging.info(f"处理结果: {mask_sensitive(raw_result)}")
敏感流程的数据不要上传到第三方服务器,尽量保存在本地设备。我目前用的 蓝印RPA,设计理念就是流程数据全部保存在用户本地,不同步到服务端,从根源上降低泄露风险。
打包分发时,给 EXE 设置授权和加密。谁能用、用多久、能不能复制,都可以控制。某些平台支持应用加密分享,分享出去的应用带授权验证,过期自动失效。
RPA + AI 的组合确实能省掉大量重复劳动,但落地过程中这些坑几乎每个人都会遇到。上面 10 个坑点,前 5 个偏技术实现,后 5 个偏工程化部署。
如果你也在做类似的落地,建议从工具选型阶段就考虑清楚:
-
元素定位是否支持智能生成,减少维护成本
-
AI 能力是否灵活,能接自己熟悉的模型,费用是否透明
-
部署方式是否简单,能不能离线运行,能不能打包成 EXE
-
数据安全怎么保障,能不能本地存储
-
分发协作怎么管理,能不能加密授权,能不能API 触发
我目前用的RPA 在这些方面做得比较到位。支持 API 触发、EXE 打包、内网离线、自定义界面,AI 能接 DeepSeek、Kimi、文心一言等主流模型,费用按实际 API 调用量算,没有中间商差价。最重要的是数据存在本地,不用担心泄露。另外它还能对接钉钉、飞书、企微,用 Agent 智能指令远程控制应用执行,回调通知结果,对团队协作很友好。
如果你也在选型,可以把上面这些维度作为 checklist,逐个对比。
更多推荐
所有评论(0)