Skip to content
Engineering

管道中的电子邮件清洗:去重、标准化、存储

| | 2 分钟阅读
管道中的电子邮件清洗:去重、标准化、存储
Clean Emails in Pipelines: Dedup, Normalize, Store

电子邮件看似简单的输入,但在实际自动化中却表现为不可靠的事件流:重试、重复、编码异常、多部分正文和不一致的头部。如果您正在将入站邮件输入到LLM代理、QA工具或数据管道中,“清洗电子邮件”不是关于美观的格式,而是关于去重、确定性标准化和保留来源的存储

本指南阐述了一个实用的管道设计,将入站电子邮件转换为稳定、可查询的记录,无需脆弱的HTML解析或”sleep(10)“式的猜测方法。

管道中”清洗电子邮件”的含义

对于管道工作,“清洗的”电子邮件记录通常具有以下属性:

  • 稳定标识符:您可以在webhook重试、轮询循环和重新处理过程中一致地引用”这条消息”。
  • 标准化结构:头部、地址、正文和附件以可预测的类型和编码表示。
  • 清晰血统:您始终可以将派生字段(如OTP)追溯到原始消息和确切的提取逻辑。
  • 幂等摄取:管道可以安全处理至少一次交付。
  • 安全的代理消费:LLM看到的是最小化、净化的视图,而不是原始HTML和不受信任的头部。

一个有用的思维模型是将电子邮件摄取视为点击流或支付事件:您需要一个仅追加的日志、一个规范的标准化表示,以及用于快速消费的派生表。

第1步:正确去重(消息、交付、制品)

电子邮件管道经常失败,因为团队在错误的层进行去重。

三个去重层

交付去重(传输层)

由于SMTP重试、灰名单、webhook重试或轮询竞争,同一消息可能被多次交付。

消息去重(内容身份)

两次交付可能代表同一逻辑消息。您希望有一个规范记录。

制品去重(您实际需要的)

在验证流程中,“您要处理的东西”通常是OTP或魔术链接。您希望只消费制品一次,即使您多次收到消息。

真正有效的去重键

没有单一字段是通用可靠的。使用分层策略并存储所有候选项。

层级 目标 良好的去重键候选 备注
交付 不重复处理同一交付 provider_delivery_id(如果可用)、webhook事件id、(inbox_id, message_id, delivered_at) Webhook通常是至少一次的,所以必须假设重复。
消息 每个逻辑电子邮件一行 RFC Message-ID(标准化)、原始源的哈希值、(inbox_id, internal_message_id) Message-ID在实际中可能丢失或重复,保留备选方案。
制品 “消费一次”语义 sha256(artifact_type + canonical_value + context) 最适用于OTP和验证URL。

实用的去重算法

使用确定性序列:

  1. 如果您的提供商给出稳定的内部message_id,将其用作收件箱内的主键
  2. 存储RFC Message-ID(标准化)和内容哈希(例如,原始源的哈希值或规范化子集的哈希值)。
  3. 通过主键更新插入规范消息记录,并在单独的表中记录每个交付事件。
  4. 提取制品时,计算artifact_hash并强制实施唯一约束以保证消费一次。

这是幂等性的支柱:您接受重复,但数据库状态保持正确。

第2步:将电子邮件标准化为确定性形状

标准化是大多数”管道痛点”隐藏的地方。目标不是完美建模MIME的每个角落,而是为自动化创建稳定的合约。

标准化地址而不破坏边缘情况

常见错误:

  • 将整个地址转为小写,这对某些本地部分可能是不正确的。
  • 将显示名称视为可信标识符。
  • 使用正则表达式解析地址。

优先使用邮件解析库并保守地标准化:

  • 仅将域名转为小写。
  • 按接收时保留本地部分。
  • 既存储解析结构也存储原始字符串。

实用的标准化地址对象如下:

字段 类型 示例
original 字符串 "Jane Doe" <[email protected]>
address 字符串 [email protected]
local 字符串 Jane.Doe+qa
domain 字符串 example.com
display_name 字符串或null Jane Doe

使用信任模型标准化头部

头部是攻击者控制的输入。良好的标准化表示:

  • 保留原始头部块(用于法医调试)。
  • 产生处理折叠行和重复头部的解析映射。
  • 分离”高信任去重/追踪”字段(如Message-ID)和”低信任UI字段”(如Subject)。

如果您想了解原始电子邮件可能有多混乱,可以浏览RFC 5322中的核心格式。

为自动化标准化正文(文本优先)

对于管道和LLM代理:

  • 在可用时优先选择text/plain
  • 存储HTML,但不要让您的自动化依赖于脆弱的HTML选择器。
  • 考虑通过剥离跟踪像素、折叠空白和限制长度来产生”安全文本”字段。

有目的地标准化时间戳

电子邮件包含多个时间戳:

  • Date:头部:发送者提供的,可能是错误的。
  • 传输时间戳:您的提供商的接收时间,通常最可靠。

至少存储:

  • received_at(提供商接收时间,用于排序的规范时间)
  • date_header(可选,用于显示或诊断)

将附件标准化为元数据+内容指针

不要将附件转储到日志或代理提示中。

存储友好的模型:

  • 保留附件元数据(文件名、内容类型、大小)。
  • 存储哈希值以确保完整性。
  • 将字节存储在对象存储中,通过键引用。

第3步:存储用于重新处理,而不仅仅是”快乐路径”

无法重放的管道是您无法信任的管道。

推荐的存储层

大多数团队最终都有三层:

  1. 原始:原始RFC 5322源(或提供商原始负载),不可变。
  2. 标准化:下游系统使用的规范JSON表示。
  3. 派生:为特定工作流提取的制品,如OTP、验证URL、票证ID。

这种结构使得在模板更改时重新运行标准化或提取变得容易。

最小关系模式

这里是一个实用的基线模式(适用于Postgres、MySQL等):

目的 关键列
email_messages 每个逻辑消息一行 pkinbox_idprovider_message_idrfc_message_idreceived_atnormalized_jsonraw_ref
email_deliveries 每次交付尝试/事件 pkmessage_pkdelivered_atsource(webhook/轮询)、event_id
email_artifacts 消费一次的派生记录 pkmessage_pkartifact_typeartifact_valueartifact_hash(唯一)、extracted_at

两个重要的操作注意事项:

  • (inbox_id, provider_message_id)或您选择的消息主键上设置唯一约束
  • artifact_hash上设置唯一约束以实现消费一次语义。

设计上的保留和隐私

电子邮件可能包含机密和个人数据。即使在QA中,团队也会意外地将类似生产的内容路由到测试收件箱。

将保留视为”清洗电子邮件”的第一级部分:

  • 保留原始内容的最短窗口,足以调试和重放。
  • 在日志中编辑或标记化敏感字段。
  • 如果存储原始内容,请静态加密。

第4步:可靠摄取(webhook优先,轮询备用)

即使”完美标准化”,如果摄取不稳定也会失败。

稳健的模式是:

  • Webhook优先用于低延迟事件。
  • 轮询备用用于当webhook失败、队列积压或您的端点关闭时的弹性。

Webhook交付通常是至少一次的,所以您的处理程序必须是幂等的。

Webhook摄取伪代码(幂等)

def handle_webhook(event):
    verify_signature(event)  # 拒绝伪造或重放的请求

    msg = event["message"]
    inbox_id = msg["inbox_id"]
    provider_message_id = msg["id"]  # 提供商稳定id(示例)

    # 1) 更新插入消息(幂等)
    message_pk = upsert_email_message(
        inbox_id=inbox_id,
        provider_message_id=provider_message_id,
        rfc_message_id=normalize_message_id(msg.get("headers", {}).get("message-id")),
        received_at=msg["received_at"],
        normalized_json=msg,
        raw_ref=msg.get("raw_ref"),
    )

    # 2) 记录交付事件(仅追加)
    insert_delivery_event(
        message_pk=message_pk,
        event_id=event["event_id"],
        delivered_at=event["delivered_at"],
        source="webhook",
    )

    # 3) 提取制品(消费一次)
    for artifact in extract_artifacts(msg):
        insert_artifact_if_new(message_pk, artifact)

这种结构意味着webhook重试不会创建重复,提取保持安全。

用于吞吐量的批处理

如果您摄取大量数据(CI机群、许多代理运行或回填),批处理API很重要。批处理摄取让您能够:

  • 减少每条消息的开销。
  • 应用一致的背压。
  • 在消息片段上重新运行提取作业。

第5步:为电子邮件创建”LLM安全视图”

LLM是强大的消费者,但电子邮件是不受信任的输入和常见的提示注入载体。代理的”清洗电子邮件”意味着提供最小化的合约。

典型的面向代理的视图:

  • message_idinbox_idreceived_at
  • fromto(解析的地址)
  • subject(可选)
  • text(仅纯文本,长度限制)
  • artifacts(已提取的OTP或允许列表URL)

除非您有非常具体的原因,否则将原始HTML和完整头部保持在代理上下文之外。

一个简单的数据流程图,显示入站电子邮件到达摄取服务,然后通过标记为去重、标准化、存储的三个阶段,最终分支到分析和LLM代理工具。

Mailhook在清洗电子邮件管道中的位置

如果您正在围绕一次性收件箱(用于LLM代理、QA或验证流程)构建自动化,Mailhook提供了与上述管道映射良好的原语:

  • 通过API创建一次性收件箱。
  • 接收结构化JSON格式的电子邮件。
  • 通过实时webhook交付事件。
  • 使用轮询作为备用检索机制。
  • 验证webhook安全性的签名负载。
  • 批量处理消息。
  • 选择共享域名或带来自定义域名。

有关确切的请求/响应字段和规范集成合约,请使用Mailhook的机器可读参考:llms.txt。您也可以从主站点开始:Mailhook

“清洗电子邮件”的最终检查清单

如果您只实现几件事,就实现这些:

  • 设计上的去重:将交付事件与规范消息分离,并强制实施唯一约束。
  • 确定性标准化:保守的地址标准化,处理重复的解析头部,文本优先的正文。
  • 存储用于重放:保留原始(简要)加上标准化JSON,并将制品派生到消费一次的表中。
  • 假设至少一次:webhook处理程序必须是幂等的。
  • 让代理有所约束:给LLM一个最小化的电子邮件视图和提取的制品,而不是原始HTML。

清洗的电子邮件不仅仅是更好的数据,它们是您可以扩展的管道和您需要照看的管道之间的区别。

相关文章