OpenClaw实践之钓鱼邮件识别小助手


2026年3月,全国上下掀起一股气功(OpenClaw)热,公司老板看到后产生好奇,遂发起为期一周的OpenClaw小挑战,鼓励大家基于OpenClaw自行搭建 Demo,探索在实际工作场景中的应用。因此有了这个小项目。

我挑选的方向是识别和处理钓鱼邮件,由于只是个demo且不能接入实际工作环境,很多环节做了简化,比如工作环境的O365邮箱用Gmail代替(API比较多,qq邮箱就没有给拉黑发件人的API),比如应该在邮件网关上做的封禁操作,暂且用Gmail个人配置中的拉黑代替。

研究背景和意义


在经过基本的钓鱼邮件安全意识培训后,员工具备识别恶意特征的能力,但是在企业邮件繁多的条件下,工作繁忙的财务/管理层仍然可能出错,因此有必要进行研究。

设想:

挑战:

(概念介绍)

具体实现

SKILL:

获取邮件内容、查VT等这些固定的操作其实完全可以用SOAR类似的自动化编排实现,没有必要引入AI Agent,在这里更贴切的点我认为是

    1. 分析域名仿冒(typosquatting)等。由于相关技术目的就是迷惑人眼、让人看错误以为是可信地址点进去,AI具有天然的分辨出大写i还是小写L、察觉仿冒的拉丁字母的能力。在企业环境中,大量邮件会使人眼疲劳警惕性下降,但是AI可以较好地避免此类情况;
    1. 分析邮件中的紧迫性、不合理性。在除了URL/附件/二维码的钓鱼方式外,还有发票财务欺诈等方式不易被传统的安全扫描发现,AI作为大语言模型有着天然的分析优势。

架构图和相关脚本:

gen_token.py

需要在windows上执行,调出浏览器完成谷歌的授权认证过程,本地生成token.json和credential.json文件

这里需要的权限是modify而不是read,是因为后面要添加发件人黑名单

import os.path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow

SCOPES = [
    "https://www.googleapis.com/auth/gmail.modify",
    "https://www.googleapis.com/auth/gmail.settings.basic"
]

def main():
    creds = None

    if os.path.exists("token.json"):
        creds = Credentials.from_authorized_user_file("token.json", SCOPES)

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                "credentials.json",
                SCOPES
            )
            creds = flow.run_local_server(port=0)

        with open("token.json", "w", encoding="utf-8") as token:
            token.write(creds.to_json())

    print("token.json generated successfully")

if __name__ == "__main__":
    main()

fetch_email.py太长了就不放了,让AI写个拿下来邮件的邮件头和邮件正文就行。

vt_check.py,会输出有几个malicious或suspicious的标记:

import base64
import requests
import argparse

API_KEY = "XXX"

# 解析命令行参数
parser = argparse.ArgumentParser(description="Query VirusTotal URL reputation")
parser.add_argument("--url", required=True, help="URL to check")
args = parser.parse_args()

url = args.url

# URL safe base64
url_id = base64.urlsafe_b64encode(url.encode()).decode().strip("=")

endpoint = f"https://www.virustotal.com/api/v3/urls/{url_id}"

headers = {
    "x-apikey": API_KEY
}

response = requests.get(endpoint, headers=headers)
data = response.json()

stats = data["data"]["attributes"]["last_analysis_stats"]

print("malicious:", stats["malicious"])
print("suspicious:", stats["suspicious"])

block_sender.py,添加过滤器操作

import argparse
import os

from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build

SCOPES = [
    "https://www.googleapis.com/auth/gmail.modify",
    "https://www.googleapis.com/auth/gmail.settings.basic"
]

def gmail_auth(token_path="token.json"):
    creds = None

    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)

    if not creds:
        raise RuntimeError("token.json not found")

    if not creds.valid:
        if creds.expired and creds.refresh_token:
            creds.refresh(Request())
            with open(token_path, "w") as f:
                f.write(creds.to_json())
        else:
            raise RuntimeError("token.json invalid")

    return build("gmail", "v1", credentials=creds)


def block_address(service, address):
    filter_body = {
        "criteria": {
            "from": address
        },
        "action": {
            "addLabelIds": ["TRASH"]
        }
    }

    result = service.users().settings().filters().create(
        userId="me",
        body=filter_body
    ).execute()

    return result


def main():
    parser = argparse.ArgumentParser(description="Block Gmail sender address")
    parser.add_argument("--address", required=True, help="email address to block")
    parser.add_argument("--token", default="token.json", help="token.json path")

    args = parser.parse_args()

    service = gmail_auth(args.token)

    result = block_address(service, args.address)

    print("Filter created successfully")
    print("Filter ID:", result["id"])
    print("Blocked address:", args.address)


if __name__ == "__main__":
    main()

DEMO展示

下达指令:分析我的谷歌邮箱中最新一封邮件是否为钓鱼邮件(如果是真实企业工作流中,就应该直接通过日志等方式触发预定义的分析流程,此处只是为了演示)

可以看到openclaw读文件和执行脚本的顺序:

读SKILL、执行脚本导出邮件,调用vt查询脚本

Agent综合多项指标输出分析结论报告:

SKILL中指示如果分析认为是钓鱼邮件则自动删除拉黑,因此可以看到添加了过滤器:

展望:更进一步的集成

感受

由于时间有限,本demo只是一个简单的场景,比较初步也没有多个Agent协同工作,比较多的时间反而是花在了挑选邮箱上,126/163、qq、proton、yahoo都试了个遍,最终只有gmail符合要求能完成我想展示的功能。后端大模型使用的是阿里云的qwen3-max,openclaw在执行分析时可能遇到各种问题,重现并不是很稳定。在gmail侧,凭据有可能失效需要重新生成;在openclaw调用工具时可能出现相对路径不对的问题,导致读取不到文件,所以最好把所有的参数都直接写成绝对路径,包括SKILL和脚本中的调用。很多使用者都会陷入配好龙虾后寻找使用场景的问题,我们大多也一样,不过这个钓鱼邮件分析处置的场景我认为还是确实有实践意义的。

文章目录
  1. 1. 研究背景和意义
  2. 2. 具体实现
  3. 3. DEMO展示
  4. 4. 展望:更进一步的集成
  5. 5. 感受
|