目标:把“看起来离散的网络告警(Suricata/Zeek/防火墙/代理)”转成“可用于研判、狩猎、统计的 TTP 视角”,并用 MITRE ATT&CK 作为统一语言。

说明:本文所有示例仅用于防御与检测工程(告警归因、检测覆盖评估、威胁狩猎),不包含漏洞利用或攻击操作步骤。


1. ATT&CK 是什么?为什么适合做告警归因

MITRE ATT&CK(Adversarial Tactics, Techniques, and Common Knowledge)是一套公开的对手行为知识库,核心对象是:

  • Tactic(战术):攻击者“为什么做”(阶段目标)

  • Technique(技术):攻击者“怎么做”(可观测到的行为)

  • Sub-technique(子技术):技术的更细粒度拆分

把 NSM(Network Security Monitoring,网络安全监测)告警映射到 ATT&CK 的好处:

  1. 跨产品统一口径:不同 IDS/NDR/SIEM 告警语义可收敛到同一套 TTP。

  2. 研判更结构化:从“命中了一条规则”升级到“出现了某类战术链条的证据”。

  3. 度量检测覆盖:能统计“我们对某些战术/技术的检测覆盖率如何”。

  4. 推动联动响应:把“技术”作为 playbook 的触发条件,做自动化响应编排。


2. ATT&CK 数据长什么样:STIX 2.x

ATT&CK 的官方发布数据以 STIX 2.x(结构化威胁情报格式)为主,常见获取方式:

  • GitHub:MITRE CTI 仓库(JSON / STIX bundle)

  • TAXII Server:标准化接口拉取集合(collection)

工程实践里建议:

  • 线上:用 TAXII 定期同步并缓存到本地数据库

  • 离线/开发:直接下载 STIX bundle 文件读入


3. 用 Python 获取 ATT&CK:两种常用方式

下面给你两条路线,任选其一即可:

  • 路线 A(推荐工程化):TAXII 同步(自动、可定期更新)

  • 路线 B(更简单):从 GitHub 下载 STIX 文件(本地读)

3.1 路线 A:从 TAXII 拉取(cti-taxii.mitre.org

依赖:

pip install taxii2-client stix2

示例代码(TAXII 2.x + STIX2 读取):

from taxii2client.v21 import Server
from stix2 import MemoryStore, Filter

TAXII_SERVER = "https://cti-taxii.mitre.org/taxii/"

# 1) 连接 TAXII Server
server = Server(TAXII_SERVER)
api_root = server.api_roots[0]

# 2) 选择 Collection(Enterprise ATT&CK 通常在 collections 里)
collections = list(api_root.collections)

# 经验做法:根据标题挑选 Enterprise ATT&CK Collection
enterprise = None
for c in collections:
    if "Enterprise ATT&CK" in (c.title or ""):
        enterprise = c
        break

if enterprise is None:
    raise RuntimeError("未找到 Enterprise ATT&CK collection,请检查 TAXII 服务是否可用或标题是否变化")

# 3) 拉取 STIX objects
objects = enterprise.get_objects()
store = MemoryStore(stix_data=objects["objects"])

# 4) 查询:按 technique id (attack-pattern)
techniques = store.query([Filter("type", "=", "attack-pattern")])
print("techniques:", len(techniques))

要点:

  • attack-pattern 对应 ATT&CK 的 Technique/Sub-technique。

  • TAXII 拉取下来的对象很多(含关系、矩阵、标注等),建议缓存到本地(见后文)。

3.2 路线 B:读取本地 STIX bundle(下载后离线用)

依赖:

pip install stix2

示例代码:

import json
from stix2 import MemoryStore, Filter

# 假设你把 ATT&CK Enterprise STIX bundle 保存为了 enterprise-attack.json
with open("enterprise-attack.json", "r", encoding="utf-8") as f:
    bundle = json.load(f)

store = MemoryStore(stix_data=bundle["objects"])
techniques = store.query([Filter("type", "=", "attack-pattern")])
print("techniques:", len(techniques))

4. Python 查询:把 Technique 变成可用的“字典服务”

NSM 侧做映射,最终通常要用到这些字段:

  • Technique 的 ATT&CK ID(如 T1046

  • Technique 名称、描述

  • 所属 Tactic(一个 Technique 可能属于多个 Tactic)

STIX 里 ATT&CK ID 一般在 external_references

from stix2 import Filter

def get_attack_id(stix_obj):
    for ref in stix_obj.get("external_references", []):
        if ref.get("source_name") == "mitre-attack" and ref.get("external_id"):
            return ref["external_id"]
    return None

# 查询:找到 T1046
def find_technique_by_attack_id(store, attack_id: str):
    techniques = store.query([Filter("type", "=", "attack-pattern")])
    for t in techniques:
        if get_attack_id(t) == attack_id:
            return t
    return None

把 tactic 也取出来,需要用到矩阵(matrix)与关系(relationship)对象。在工程上更常见的做法是:

  • 直接使用 MITRE 的 mitreattack-python 工具链(封装了常用查询)

  • 或者做一次离线“Technique -> Tactic 列表”的预计算缓存

这里给一个“够用且易理解”的预计算示例(基于 kill_chain_phases 字段):

from stix2 import Filter

def technique_to_tactics(technique_obj):
    # kill_chain_phases 示例:[{"kill_chain_name": "mitre-attack", "phase_name": "discovery"}, ...]
    phases = technique_obj.get("kill_chain_phases", [])
    tactics = []
    for p in phases:
        if p.get("kill_chain_name") == "mitre-attack" and p.get("phase_name"):
            tactics.append(p["phase_name"])
    return sorted(set(tactics))

# demo
# t = find_technique_by_attack_id(store, "T1046")
# print(get_attack_id(t), t["name"], technique_to_tactics(t))

5. NSM 告警怎么和 ATT&CK 关联:设计一个“映射管道”

把 NSM 告警关联到 ATT&CK,本质是一个“把事件语义归一化”的问题。推荐拆成 4 层:

NSM 原始事件(Zeek/Suricata/Firewall/Proxy)
  -> 事件标准化(字段统一、提取五元组/域名/JA3/SNI/URL)
    -> 语义分类(扫描/暴力破解/C2/下载/横向/数据外传...)
      -> ATT&CK 映射(Technique ID + 置信度 + 证据)

5.1 事件标准化:为不同数据源定义统一 schema

以常见 NSM 数据源举例:

  • Suricata EVE JSONalert.signature, flow, http, dns, tls

  • Zeek logsconn.log, dns.log, http.log, ssl.log, notice.log

建议落地一个统一结构(简化示例):

from dataclasses import dataclass
from typing import Optional, Dict, Any

@dataclass
class NsmEvent:
    vendor: str                 # suricata/zeek/firewall/proxy
    event_type: str             # alert/conn/dns/http/tls/notice
    ts: str
    src_ip: str
    src_port: int
    dst_ip: str
    dst_port: int
    proto: str                  # tcp/udp/icmp

    # 可选:应用层语义
    url: Optional[str] = None
    http_host: Optional[str] = None
    http_method: Optional[str] = None
    dns_query: Optional[str] = None
    tls_sni: Optional[str] = None
    ja3: Optional[str] = None

    # 原始载荷
    raw: Optional[Dict[str, Any]] = None

这一步的目标不是“全量字段”,而是为了后续做规则映射/特征抽取

5.2 映射输出:要能解释“为什么映射到这个 Technique”

推荐输出结构:

from dataclasses import dataclass
from typing import List

@dataclass
class AttackMapping:
    technique_id: str           # e.g. T1046
    technique_name: str         # e.g. Network Service Scanning
    tactics: List[str]          # e.g. ["discovery"]
    confidence: float           # 0~1
    evidence: List[str]         # 可解释证据:字段、规则命中、上下文

6. 案例:用 Python 把 NSM 网络威胁告警映射到 ATT&CK

下面用一个“现实里很常见”的链条做例子:

  1. 出现扫描类告警(可能是资产探测/内网横向前置)

  2. 随后出现可疑 DNS / HTTP / TLS 外联(可能是 C2 通信)

  3. 又出现下载行为(可能是载荷拉取)

对应的 ATT&CK 技术(示例映射):

  • 扫描:T1046(Network Service Scanning)

  • C2 over HTTP:T1071.001(Application Layer Protocol: Web Protocols)

  • C2 over DNS:T1071.004(Application Layer Protocol: DNS)

  • 下载/载荷传输:T1105(Ingress Tool Transfer)

注意:这类映射是“检测语义层的归类”,不是“百分百归因”。输出时请保留 confidenceevidence,并允许多 Technique 并存。

6.1 准备:ATT&CK 本地缓存 + Technique 字典

下面示例假设你已经用第 3 节把 ATT&CK STIX objects 放到了 store

from stix2 import Filter

def build_technique_dictionary(store):
    """构建 attack_id -> {name, tactics} 的快速字典。"""
    techniques = store.query([Filter("type", "=", "attack-pattern")])

    d = {}
    for t in techniques:
        attack_id = None
        for ref in t.get("external_references", []):
            if ref.get("source_name") == "mitre-attack" and ref.get("external_id"):
                attack_id = ref["external_id"]
                break
        if not attack_id:
            continue

        d[attack_id] = {
            "name": t.get("name", ""),
            "tactics": technique_to_tactics(t),
        }

    return d

6.2 示例输入:三类 NSM 告警(简化 JSON)

你可以把它理解为从 NSM 平台(如 Security Onion、Elastic、Splunk、OpenSearch、Kafka)消费到的事件。

sample_events = [
    {
        "vendor": "suricata",
        "event_type": "alert",
        "ts": "2026-02-21T09:10:00+08:00",
        "src_ip": "10.0.1.23",
        "src_port": 49812,
        "dst_ip": "10.0.2.10",
        "dst_port": 445,
        "proto": "tcp",
        "alert": {"signature": "SCAN suspicious SMB sweep"},
    },
    {
        "vendor": "zeek",
        "event_type": "dns",
        "ts": "2026-02-21T09:12:10+08:00",
        "src_ip": "10.0.1.23",
        "src_port": 5353,
        "dst_ip": "8.8.8.8",
        "dst_port": 53,
        "proto": "udp",
        "dns": {"query": "example-c2.bad"},
    },
    {
        "vendor": "suricata",
        "event_type": "http",
        "ts": "2026-02-21T09:13:40+08:00",
        "src_ip": "10.0.1.23",
        "src_port": 49888,
        "dst_ip": "203.0.113.10",
        "dst_port": 80,
        "proto": "tcp",
        "http": {"host": "example-c2.bad", "url": "/payload.bin", "method": "GET"},
    },
]

6.3 规则映射:从“事件语义”到“Technique 候选集合”

工程上常见的做法是“三层组合”:

  • 规则层:基于告警名/分类/端口/协议的快速映射(低成本、易解释)

  • 特征层:基于 DNS/HTTP/TLS 的字段特征映射(更细粒度)

  • 上下文层:同一实体(host/user)在时间窗内的事件链(提升置信度)

本文先给一个可直接复用的“规则 + 特征”版本。

import re
from typing import Dict, Any, List

# 1) 简单规则:通过签名关键字映射(可扩展为你自己的规则库)
SIGNATURE_RULES = [
    # 扫描
    {
        "pattern": re.compile(r"scan|sweep|port\s*scan|nmap", re.I),
        "technique_id": "T1046",
        "confidence": 0.7,
        "evidence": "alert.signature indicates scanning",
    },
]

# 2) 特征规则:通过字段存在与特征映射
def infer_techniques_from_event(evt: Dict[str, Any]) -> List[Dict[str, Any]]:
    hits = []

    vendor = evt.get("vendor")
    event_type = evt.get("event_type")

    # A. 告警签名关键字
    signature = None
    if vendor == "suricata":
        signature = (evt.get("alert") or {}).get("signature")

    if signature:
        for rule in SIGNATURE_RULES:
            if rule["pattern"].search(signature):
                hits.append({
                    "technique_id": rule["technique_id"],
                    "confidence": rule["confidence"],
                    "evidence": [f"signature={signature}", rule["evidence"]],
                })

    # B. DNS 外联
    if event_type == "dns":
        q = ((evt.get("dns") or {}).get("query") or "").lower()
        if q:
            hits.append({
                "technique_id": "T1071.004",
                "confidence": 0.55,
                "evidence": [f"dns.query={q}", "DNS traffic may indicate application-layer C2"],
            })

    # C. HTTP 外联与下载
    if event_type == "http":
        host = ((evt.get("http") or {}).get("host") or "").lower()
        url = ((evt.get("http") or {}).get("url") or "")
        method = ((evt.get("http") or {}).get("method") or "").upper()

        if host:
            hits.append({
                "technique_id": "T1071.001",
                "confidence": 0.6,
                "evidence": [f"http.host={host}", f"http.method={method}", "Web protocol traffic may indicate C2"],
            })

        # 简化:以“可疑二进制路径/扩展名”作为下载线索
        if method == "GET" and re.search(r"\.(bin|exe|dll|dat)$", url, re.I):
            hits.append({
                "technique_id": "T1105",
                "confidence": 0.65,
                "evidence": [f"http.url={url}", "Possible ingress of tool/payload over network"],
            })

    return hits

6.4 把候选 Technique 变成“带名称/战术/证据”的映射结果

from typing import Tuple

def map_event_to_attack(evt: Dict[str, Any], technique_dict: Dict[str, Dict[str, Any]]):
    candidates = infer_techniques_from_event(evt)

    mappings = []
    for c in candidates:
        tid = c["technique_id"]
        meta = technique_dict.get(tid, {"name": "(unknown)", "tactics": []})

        mappings.append({
            "technique_id": tid,
            "technique_name": meta["name"],
            "tactics": meta["tactics"],
            "confidence": float(c["confidence"]),
            "evidence": c["evidence"],
        })

    # 可选:按置信度降序
    mappings.sort(key=lambda x: x["confidence"], reverse=True)
    return mappings

6.5 运行示例:得到可解释的 ATT&CK 关联

# 假设你已构建 store 和 technique_dict
# technique_dict = build_technique_dictionary(store)

for evt in sample_events:
    mappings = map_event_to_attack(evt, technique_dict)
    print("=" * 60)
    print(evt["vendor"], evt["event_type"], evt["src_ip"], "->", evt["dst_ip"], evt.get("dst_port"))
    for m in mappings:
        print(f"- {m['technique_id']} {m['technique_name']} tactics={m['tactics']} conf={m['confidence']}")
        for e in m["evidence"]:
            print("  *", e)

输出(示意):

suricata alert 10.0.1.23 -> 10.0.2.10 445
- T1046 Network Service Scanning tactics=['discovery'] conf=0.7
  * signature=SCAN suspicious SMB sweep
  * alert.signature indicates scanning

zeek dns 10.0.1.23 -> 8.8.8.8 53
- T1071.004 Application Layer Protocol: DNS tactics=['command-and-control'] conf=0.55
  * dns.query=example-c2.bad
  * DNS traffic may indicate application-layer C2

suricata http 10.0.1.23 -> 203.0.113.10 80
- T1105 Ingress Tool Transfer tactics=['command-and-control'] conf=0.65
  * http.url=/payload.bin
  * Possible ingress of tool/payload over network
- T1071.001 Application Layer Protocol: Web Protocols tactics=['command-and-control'] conf=0.6
  * http.host=example-c2.bad
  * http.method=GET
  * Web protocol traffic may indicate C2

提示:T1105 的 tactic 在 ATT&CK 里可能出现在多个阶段(取决于版本与矩阵);以你同步的 ATT&CK 版本为准。


7. 让映射更“像工程”:时间窗聚合与置信度提升

单条告警映射通常置信度有限,真正有价值的是:

  • 同一 src_ip / asset_id / user 在时间窗内的多事件串联

  • 从“单点命中”升级为“链条证据”

简单的时间窗聚合思路:

from collections import defaultdict

def group_by_entity(events, key="src_ip"):
    g = defaultdict(list)
    for e in events:
        g[e.get(key)].append(e)
    return g

def boost_confidence_if_chain(mappings):
    """示例:如果同一实体同时出现扫描 + C2 + 下载,则提升整体置信度。"""
    tids = {m["technique_id"] for m in mappings}
    bonus = 0.0
    if "T1046" in tids and ("T1071.001" in tids or "T1071.004" in tids):
        bonus += 0.1
    if ("T1071.001" in tids or "T1071.004" in tids) and "T1105" in tids:
        bonus += 0.1

    out = []
    for m in mappings:
        m2 = dict(m)
        m2["confidence"] = min(1.0, m2["confidence"] + bonus)
        out.append(m2)
    return out

实践建议:

  • 资产 ID 为主键(src_ip 会变化、NAT/代理会扰动)

  • 引入 CMDB / DHCP / 终端管理系统做实体解析

  • 维护“证据列表”,而不是只保留 Technique ID


8. 用大语言模型(LLM)完成告警与 ATT&CK 关联(可落地方案)

规则映射的优点是稳定、可解释,但在这些场景会比较吃力:

  • 告警文本差异大:不同厂商/不同规则集的 signature 语义不一致

  • 事件上下文复杂:单条告警信息不足,需要综合 DNS/HTTP/TLS/资产信息

  • 需要“研判式表达”:输出不仅是 Technique ID,还要给出归因理由与证据摘要

LLM 适合补齐“语义理解与归纳”的那一段,但要以工程方式使用:LLM 只做“候选选择 + 解释”,不能直接在全量 ATT&CK 上盲猜

8.1 推荐架构:RAG(检索增强)+ 结构化输出 + 规则兜底

NSM Event
  -> 标准化(结构化字段)
    -> 候选检索(Top-K Technique)
      -> LLM 选择/排序(Top-N + 解释)
        -> 校验(Technique ID 存在性/置信度阈值)
          -> 写回事件(attack.* 字段)

同时:规则引擎作为兜底与强信号来源(例如明确的扫描规则命中)。

关键点:

  • 候选检索:用关键词/向量相似度从 ATT&CK 技术库中先找 Top-K(如 20)

  • 结构化输出:强制 LLM 输出 JSON(便于落库与自动化消费)

  • 校验:对 LLM 输出的 technique_id 做白名单校验(必须在本地 technique_dict 里)

8.2 候选检索怎么做(两种简单实现)

实现 A:关键词检索(轻量、可解释)

  • 把每个 Technique 的 name/description/keywords 拼成一段文本

  • 对事件侧提取关键词(如 signature、端口、协议、DNS/HTTP 字段)

  • 用 BM25/TF-IDF 做 Top-K 检索

实现 B:向量检索(更鲁棒)

  • 对每个 Technique 文本做 embedding(本地或云端)

  • 对事件摘要做 embedding

  • 用余弦相似度取 Top-K

注意:向量检索的收益通常来自“告警文本很碎/口径不统一”的环境。

8.2.1 实际实现(本地):Sentence-Transformers 生成 embedding + FAISS 检索

适用:内网/离线场景、希望 embedding 不出网。

安装依赖(CPU 版也可用;有 GPU 可自行换 CUDA 版本 PyTorch):

pip install -U sentence-transformers faiss-cpu numpy

步骤 1:把每个 Technique 变成一条“可检索文本”

from typing import Dict, Any, List

def technique_doc(technique_id: str, technique_obj: Dict[str, Any]) -> str:
    # technique_obj 可来自 STIX 的 attack-pattern
    name = technique_obj.get("name", "")
    desc = (technique_obj.get("description") or "").replace("\n", " ")
    tactics = ",".join(technique_to_tactics(technique_obj))

    # 经验:把 technique_id / name / tactics 放在前面,有助于召回
    text = f"{technique_id} | {name} | tactics={tactics} | {desc}"

    # 控制长度:避免极长 description 影响 embedding 与成本
    return text[:2000]

步骤 2:批量生成 Technique embedding 并建立向量索引

import numpy as np
import faiss
from sentence_transformers import SentenceTransformer


def build_faiss_index(
    technique_corpus: List[str],
    model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
):
    model = SentenceTransformer(model_name)

    # normalize_embeddings=True 让向量单位化,后续可直接用 inner product 近似 cosine
    vecs = model.encode(
        technique_corpus,
        batch_size=64,
        show_progress_bar=True,
        normalize_embeddings=True,
    ).astype("float32")

    dim = vecs.shape[1]
    index = faiss.IndexFlatIP(dim)  # 内积相似度(配合 normalize -> cosine)
    index.add(vecs)

    return model, index, vecs

步骤 3:对 NSM 事件摘要做 embedding,检索 Top-K 候选 Technique

from typing import Tuple


def embed_event_summary(evt_summary: Dict[str, Any]) -> str:
    # 把关键字段串起来即可;不要塞 raw 全量日志
    parts = []
    for k in [
        "vendor",
        "event_type",
        "alert.signature",
        "dns.query",
        "http.host",
        "http.url",
        "http.method",
        "dst_port",
        "proto",
    ]:
        v = evt_summary.get(k)
        if v is None or v == "":
            continue
        parts.append(f"{k}={v}")
    return " | ".join(parts)


def search_topk(
    model: SentenceTransformer,
    index: faiss.Index,
    query_text: str,
    top_k: int = 20,
) -> Tuple[np.ndarray, np.ndarray]:
    q = model.encode([query_text], normalize_embeddings=True).astype("float32")
    scores, idx = index.search(q, top_k)
    return scores[0], idx[0]

把索引与映射保存到磁盘(便于上线服务化):

import json
import faiss

# 保存索引
faiss.write_index(index, "attack_techniques.faiss")

# 保存 technique_id 列表(与向量顺序一一对应)
with open("attack_technique_ids.json", "w", encoding="utf-8") as f:
    json.dump(technique_ids, f, ensure_ascii=False, indent=2)

模型选择建议:中文/多语言环境优先选择 multilingual embedding 模型;如果你的告警文本主要是英文签名,英文模型通常更强。

8.2.2 实际实现(云端):调用 Embedding API 生成向量 + 向量库检索

适用:你已有云端模型调用能力(如 OpenAI-compatible),希望更强 embedding 质量/更低本地运维。

通用流程:

  1. 为每个 Technique doc 调用 embedding 接口得到向量

  2. 将向量写入向量库(FAISS / Milvus / Qdrant / pgvector 等)

  3. 对事件摘要向量化后做 Top-K 检索

以 OpenAI-compatible 接口为例(伪代码级别,字段以你的 provider 文档为准):

import requests


def embed_cloud(texts, base_url: str, api_key: str, model: str):
    r = requests.post(
        f"{base_url}/v1/embeddings",
        headers={"Authorization": f"Bearer {api_key}"},
        json={"model": model, "input": texts},
        timeout=60,
    )
    r.raise_for_status()
    data = r.json()
    # 典型返回:data=[{"embedding": [...], "index": 0}, ...]
    return [item["embedding"] for item in data["data"]]

上线建议:

  • 先做一遍全量 Technique embedding 的离线构建(每天/每周更新一次)

  • 线上只对事件摘要做 embedding + 检索(低延迟)

  • 严格记录 embedding 模型版本(更换模型会导致向量空间不一致,需要重建索引)

8.3 给 LLM 的输入要“短、结构化、可追溯”

建议把 LLM 输入分成两部分:

  1. 事件摘要(Event Summary):只保留与映射相关的字段(五元组 + DNS/HTTP/TLS + 告警名)

  2. 候选 Technique 列表(Candidates):Top-K,每个候选只给:

  • technique_id

  • name

  • tactics

  • short_description(可截断到 200~400 字)

这样 LLM 的任务会变成:

  • 在“有限候选”里做分类/排序

  • 输出 technique_id + confidence + evidence

8.4 Prompt 模板(强制 JSON 输出)

核心原则:让模型做“从候选集中挑选/排序”,不要让模型在全量 ATT&CK 上自由发挥

你可以把 system prompt 固化为“安全分析助手 + 严格输出 JSON”,例如:

你是 SOC 的威胁分析助手。
你的任务是:根据输入的 NSM 事件摘要,在给定的 ATT&CK technique 候选列表中,选择最匹配的 0~3 个 technique。

严格要求:
- 只能从 candidates 里选择 technique_id,不得输出 candidates 之外的 ID。
- 必须输出合法 JSON,且仅输出 JSON(不要解释性文字)。
- evidence 必须引用 event_summary 中的字段,使用 "field=value" 形式(例如 "dns.query=example.com")。
- 如果无法判断,请返回空数组,并在 reason 里说明缺失信息。

输出 JSON schema:
{
  "mappings": [
    {
      "technique_id": "Txxxx"|"Txxxx.xxx",
      "confidence": 0.0~1.0,
      "evidence": ["..."],
      "why": "一句话原因"
    }
  ],
  "reason": "可选,无法判断时填写"
}

user prompt 建议只包含两块:

  • event_summary:结构化、短文本/JSON

  • candidates:Top-K techniques(每条含 id/name/tactics/short_description)

8.5 Python 落地示例:RAG + LLM + 校验(以 Ollama 为例)

下面给出一个“可直接粘贴改造”的最小可用实现:

  • 输入:标准化 NSM 事件 evt + 候选 techniques candidates

  • 输出:经过白名单校验后的 attack mappings

依赖:

pip install requests

代码示例:

import json
import requests
from typing import Any, Dict, List

SYSTEM_PROMPT = """
你是 SOC 的威胁分析助手。
你的任务是:根据输入的 NSM 事件摘要,在给定的 ATT&CK technique 候选列表中,选择最匹配的 0~3 个 technique。

严格要求:
- 只能从 candidates 里选择 technique_id,不得输出 candidates 之外的 ID。
- 必须输出合法 JSON,且仅输出 JSON(不要解释性文字)。
- evidence 必须引用 event_summary 中的字段,使用 \"field=value\" 形式。
- 如果无法判断,请返回空数组,并在 reason 里说明缺失信息。

输出 JSON schema:
{\n  \"mappings\": [{\"technique_id\": \"Txxxx\", \"confidence\": 0.0, \"evidence\": [], \"why\": \"...\"}],\n  \"reason\": \"...\"\n}
""".strip()


def build_event_summary(evt: Dict[str, Any]) -> Dict[str, Any]:
    """只保留映射需要的字段,避免把整条 raw 事件塞给模型。"""
    out = {
        "vendor": evt.get("vendor"),
        "event_type": evt.get("event_type"),
        "src_ip": evt.get("src_ip"),
        "src_port": evt.get("src_port"),
        "dst_ip": evt.get("dst_ip"),
        "dst_port": evt.get("dst_port"),
        "proto": evt.get("proto"),
    }

    # 可选字段:根据你的数据源填充
    if (evt.get("alert") or {}).get("signature"):
        out["alert.signature"] = evt["alert"]["signature"]

    if (evt.get("dns") or {}).get("query"):
        out["dns.query"] = evt["dns"]["query"]

    if evt.get("http"):
        http = evt["http"]
        if http.get("host"):
            out["http.host"] = http["host"]
        if http.get("url"):
            out["http.url"] = http["url"]
        if http.get("method"):
            out["http.method"] = http["method"]

    return out


def call_ollama_json(model: str, system_prompt: str, user_payload: Dict[str, Any]) -> Dict[str, Any]:
    """调用本地 Ollama /api/chat 并解析返回 JSON。"""
    url = "http://127.0.0.1:11434/api/chat"
    r = requests.post(
        url,
        json={
            "model": model,
            "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(user_payload, ensure_ascii=False)},
            ],
            "stream": False,
            # 可选:降低随机性,稳定输出
            "options": {"temperature": 0.1},
        },
        timeout=120,
    )
    r.raise_for_status()

    content = r.json()["message"]["content"]
    return json.loads(content)


def validate_llm_result(
    llm_result: Dict[str, Any],
    technique_dict: Dict[str, Dict[str, Any]],
    max_items: int = 3,
    min_confidence: float = 0.3,
) -> List[Dict[str, Any]]:
    """把 LLM 输出变成可写回事件的 mappings,并做最基本的安全校验。"""
    mappings = llm_result.get("mappings") or []
    if not isinstance(mappings, list):
        return []

    out = []
    for m in mappings[:max_items]:
        if not isinstance(m, dict):
            continue

        tid = m.get("technique_id")
        conf = float(m.get("confidence") or 0.0)
        evidence = m.get("evidence") or []
        why = m.get("why") or ""

        # 1) 白名单:必须是 ATT&CK 字典中存在的 Technique
        if tid not in technique_dict:
            continue

        # 2) 置信度阈值
        if conf < min_confidence:
            continue

        # 3) 证据必须是列表
        if not isinstance(evidence, list):
            evidence = [str(evidence)]

        meta = technique_dict[tid]
        out.append(
            {
                "technique_id": tid,
                "technique_name": meta.get("name", ""),
                "tactics": meta.get("tactics", []),
                "confidence": conf,
                "evidence": [str(x) for x in evidence],
                "why": str(why),
                "source": "llm",
            }
        )

    # 按置信度排序
    out.sort(key=lambda x: x["confidence"], reverse=True)
    return out


def llm_map_event_to_attack(
    evt: Dict[str, Any],
    candidates: List[Dict[str, Any]],
    technique_dict: Dict[str, Dict[str, Any]],
    model: str = "qwen2.5",
) -> List[Dict[str, Any]]:
    payload = {
        "event_summary": build_event_summary(evt),
        "candidates": candidates,
    }

    llm_result = call_ollama_json(model=model, system_prompt=SYSTEM_PROMPT, user_payload=payload)
    return validate_llm_result(llm_result, technique_dict)

你需要补齐的两块:

  • candidates:来自你的检索层(BM25/向量库),建议 Top-20

  • technique_dict:来自第 6.1 节的本地 ATT&CK 字典(用于白名单校验与补全名称/战术)

如果你不使用 Ollama,也可以把 call_ollama_json() 替换为任何 OpenAI-compatible Chat API 调用;关键不在供应商,而在“候选检索 + JSON 输出 + 校验”。

落地时建议加 4 个防护:

  1. Technique 白名单校验:必须存在于 technique_dict

  2. 置信度阈值:低于阈值不写回(或标注为 low_confidence

  3. 证据必须可追溯evidence 必须引用事件字段(如 dns.query=...

  4. 与规则融合:规则命中时提升置信度或直接锁定 Technique

8.6 “规则 + LLM”融合的推荐策略

  • 规则引擎产出一组 hard_hits(强信号,conf 可设为 0.8~0.95)

  • 检索产出 candidates

  • LLM 在 candidates + hard_hits 上做排序与解释

  • 如果 LLM 输出和 hard_hits 冲突,以 hard_hits 为准并要求 LLM 给出“冲突说明”(用于规则优化)


9. 常见坑与最佳实践

  1. 不要把 ATT&CK 当成 IOC 库:ATT&CK 侧重行为知识(TTP),不是“某域名/某 IP 是否恶意”。

  2. 映射是概率而非真相:同一网络行为可能对应多个 Technique,建议支持“多标签 + 置信度”。

  3. 版本管理:ATT&CK 会更新(Technique 名称、归类、子技术拆分都会变化),建议缓存时记录 attack_version 与同步时间。

  4. 映射规则要可审计:让分析师能回溯“为什么映射成这个 Technique”。

  5. 把映射结果写回你的数据平台:例如在事件里追加字段:

    • attack.technique_id / attack.technique_name / attack.tactics / attack.confidence
  6. LLM 一定要“先检索后判断”:避免模型在全量技术上凭印象输出错误 ID。

  7. LLM 输出必须可验证:JSON schema + 白名单 + 证据引用,是上线的底线。


10. 小结

你现在有了一条可落地的路径:

  • 用 Python 从 TAXII 或 STIX 文件读取 ATT&CK

  • 预计算 Technique ID -> 名称/战术 字典

  • 把 NSM 告警标准化为统一事件结构

  • 用“规则 + 特征 + 上下文”的方式输出可解释的 ATT&CK 映射结果

  • 在规则覆盖不足时,引入 RAG + LLM 做“候选选择/排序 + 可解释摘要”,并通过校验机制保证可控性

这套方法的关键不是“映射是否完美”,而是:结果可解释、可迭代、可度量