MITRE ATT&CK 教程:用 Python 调用 ATT&CK 数据,并把 NSM 网络告警关联到技术(Technique)
目标:把“看起来离散的网络告警(Suricata/Zeek/防火墙/代理)”转成“可用于研判、狩猎、统计的 TTP 视角”,并用 MITRE ATT&CK 作为统一语言。
说明:本文所有示例仅用于防御与检测工程(告警归因、检测覆盖评估、威胁狩猎),不包含漏洞利用或攻击操作步骤。
1. ATT&CK 是什么?为什么适合做告警归因
MITRE ATT&CK(Adversarial Tactics, Techniques, and Common Knowledge)是一套公开的对手行为知识库,核心对象是:
-
Tactic(战术):攻击者“为什么做”(阶段目标)
-
Technique(技术):攻击者“怎么做”(可观测到的行为)
-
Sub-technique(子技术):技术的更细粒度拆分
把 NSM(Network Security Monitoring,网络安全监测)告警映射到 ATT&CK 的好处:
-
跨产品统一口径:不同 IDS/NDR/SIEM 告警语义可收敛到同一套 TTP。
-
研判更结构化:从“命中了一条规则”升级到“出现了某类战术链条的证据”。
-
度量检测覆盖:能统计“我们对某些战术/技术的检测覆盖率如何”。
-
推动联动响应:把“技术”作为 playbook 的触发条件,做自动化响应编排。
2. ATT&CK 数据长什么样:STIX 2.x
ATT&CK 的官方发布数据以 STIX 2.x(结构化威胁情报格式)为主,常见获取方式:
-
GitHub:MITRE CTI 仓库(JSON / STIX bundle)
-
TAXII Server:标准化接口拉取集合(collection)
工程实践里建议:
-
线上:用 TAXII 定期同步并缓存到本地数据库
-
离线/开发:直接下载 STIX bundle 文件读入
3. 用 Python 获取 ATT&CK:两种常用方式
下面给你两条路线,任选其一即可:
-
路线 A(推荐工程化):TAXII 同步(自动、可定期更新)
-
路线 B(更简单):从 GitHub 下载 STIX 文件(本地读)
3.1 路线 A:从 TAXII 拉取(cti-taxii.mitre.org)
依赖:
pip install taxii2-client stix2
示例代码(TAXII 2.x + STIX2 读取):
from taxii2client.v21 import Server
from stix2 import MemoryStore, Filter
TAXII_SERVER = "https://cti-taxii.mitre.org/taxii/"
# 1) 连接 TAXII Server
server = Server(TAXII_SERVER)
api_root = server.api_roots[0]
# 2) 选择 Collection(Enterprise ATT&CK 通常在 collections 里)
collections = list(api_root.collections)
# 经验做法:根据标题挑选 Enterprise ATT&CK Collection
enterprise = None
for c in collections:
if "Enterprise ATT&CK" in (c.title or ""):
enterprise = c
break
if enterprise is None:
raise RuntimeError("未找到 Enterprise ATT&CK collection,请检查 TAXII 服务是否可用或标题是否变化")
# 3) 拉取 STIX objects
objects = enterprise.get_objects()
store = MemoryStore(stix_data=objects["objects"])
# 4) 查询:按 technique id (attack-pattern)
techniques = store.query([Filter("type", "=", "attack-pattern")])
print("techniques:", len(techniques))
要点:
-
attack-pattern对应 ATT&CK 的 Technique/Sub-technique。 -
TAXII 拉取下来的对象很多(含关系、矩阵、标注等),建议缓存到本地(见后文)。
3.2 路线 B:读取本地 STIX bundle(下载后离线用)
依赖:
pip install stix2
示例代码:
import json
from stix2 import MemoryStore, Filter
# 假设你把 ATT&CK Enterprise STIX bundle 保存为了 enterprise-attack.json
with open("enterprise-attack.json", "r", encoding="utf-8") as f:
bundle = json.load(f)
store = MemoryStore(stix_data=bundle["objects"])
techniques = store.query([Filter("type", "=", "attack-pattern")])
print("techniques:", len(techniques))
4. Python 查询:把 Technique 变成可用的“字典服务”
NSM 侧做映射,最终通常要用到这些字段:
-
Technique 的 ATT&CK ID(如
T1046) -
Technique 名称、描述
-
所属 Tactic(一个 Technique 可能属于多个 Tactic)
STIX 里 ATT&CK ID 一般在 external_references:
from stix2 import Filter
def get_attack_id(stix_obj):
for ref in stix_obj.get("external_references", []):
if ref.get("source_name") == "mitre-attack" and ref.get("external_id"):
return ref["external_id"]
return None
# 查询:找到 T1046
def find_technique_by_attack_id(store, attack_id: str):
techniques = store.query([Filter("type", "=", "attack-pattern")])
for t in techniques:
if get_attack_id(t) == attack_id:
return t
return None
把 tactic 也取出来,需要用到矩阵(matrix)与关系(relationship)对象。在工程上更常见的做法是:
-
直接使用 MITRE 的
mitreattack-python工具链(封装了常用查询) -
或者做一次离线“Technique -> Tactic 列表”的预计算缓存
这里给一个“够用且易理解”的预计算示例(基于 kill_chain_phases 字段):
from stix2 import Filter
def technique_to_tactics(technique_obj):
# kill_chain_phases 示例:[{"kill_chain_name": "mitre-attack", "phase_name": "discovery"}, ...]
phases = technique_obj.get("kill_chain_phases", [])
tactics = []
for p in phases:
if p.get("kill_chain_name") == "mitre-attack" and p.get("phase_name"):
tactics.append(p["phase_name"])
return sorted(set(tactics))
# demo
# t = find_technique_by_attack_id(store, "T1046")
# print(get_attack_id(t), t["name"], technique_to_tactics(t))
5. NSM 告警怎么和 ATT&CK 关联:设计一个“映射管道”
把 NSM 告警关联到 ATT&CK,本质是一个“把事件语义归一化”的问题。推荐拆成 4 层:
NSM 原始事件(Zeek/Suricata/Firewall/Proxy)
-> 事件标准化(字段统一、提取五元组/域名/JA3/SNI/URL)
-> 语义分类(扫描/暴力破解/C2/下载/横向/数据外传...)
-> ATT&CK 映射(Technique ID + 置信度 + 证据)
5.1 事件标准化:为不同数据源定义统一 schema
以常见 NSM 数据源举例:
-
Suricata EVE JSON:
alert.signature,flow,http,dns,tls -
Zeek logs:
conn.log,dns.log,http.log,ssl.log,notice.log
建议落地一个统一结构(简化示例):
from dataclasses import dataclass
from typing import Optional, Dict, Any
@dataclass
class NsmEvent:
vendor: str # suricata/zeek/firewall/proxy
event_type: str # alert/conn/dns/http/tls/notice
ts: str
src_ip: str
src_port: int
dst_ip: str
dst_port: int
proto: str # tcp/udp/icmp
# 可选:应用层语义
url: Optional[str] = None
http_host: Optional[str] = None
http_method: Optional[str] = None
dns_query: Optional[str] = None
tls_sni: Optional[str] = None
ja3: Optional[str] = None
# 原始载荷
raw: Optional[Dict[str, Any]] = None
这一步的目标不是“全量字段”,而是为了后续做规则映射/特征抽取。
5.2 映射输出:要能解释“为什么映射到这个 Technique”
推荐输出结构:
from dataclasses import dataclass
from typing import List
@dataclass
class AttackMapping:
technique_id: str # e.g. T1046
technique_name: str # e.g. Network Service Scanning
tactics: List[str] # e.g. ["discovery"]
confidence: float # 0~1
evidence: List[str] # 可解释证据:字段、规则命中、上下文
6. 案例:用 Python 把 NSM 网络威胁告警映射到 ATT&CK
下面用一个“现实里很常见”的链条做例子:
-
出现扫描类告警(可能是资产探测/内网横向前置)
-
随后出现可疑 DNS / HTTP / TLS 外联(可能是 C2 通信)
-
又出现下载行为(可能是载荷拉取)
对应的 ATT&CK 技术(示例映射):
-
扫描:
T1046(Network Service Scanning) -
C2 over HTTP:
T1071.001(Application Layer Protocol: Web Protocols) -
C2 over DNS:
T1071.004(Application Layer Protocol: DNS) -
下载/载荷传输:
T1105(Ingress Tool Transfer)
注意:这类映射是“检测语义层的归类”,不是“百分百归因”。输出时请保留
confidence与evidence,并允许多 Technique 并存。
6.1 准备:ATT&CK 本地缓存 + Technique 字典
下面示例假设你已经用第 3 节把 ATT&CK STIX objects 放到了 store。
from stix2 import Filter
def build_technique_dictionary(store):
"""构建 attack_id -> {name, tactics} 的快速字典。"""
techniques = store.query([Filter("type", "=", "attack-pattern")])
d = {}
for t in techniques:
attack_id = None
for ref in t.get("external_references", []):
if ref.get("source_name") == "mitre-attack" and ref.get("external_id"):
attack_id = ref["external_id"]
break
if not attack_id:
continue
d[attack_id] = {
"name": t.get("name", ""),
"tactics": technique_to_tactics(t),
}
return d
6.2 示例输入:三类 NSM 告警(简化 JSON)
你可以把它理解为从 NSM 平台(如 Security Onion、Elastic、Splunk、OpenSearch、Kafka)消费到的事件。
sample_events = [
{
"vendor": "suricata",
"event_type": "alert",
"ts": "2026-02-21T09:10:00+08:00",
"src_ip": "10.0.1.23",
"src_port": 49812,
"dst_ip": "10.0.2.10",
"dst_port": 445,
"proto": "tcp",
"alert": {"signature": "SCAN suspicious SMB sweep"},
},
{
"vendor": "zeek",
"event_type": "dns",
"ts": "2026-02-21T09:12:10+08:00",
"src_ip": "10.0.1.23",
"src_port": 5353,
"dst_ip": "8.8.8.8",
"dst_port": 53,
"proto": "udp",
"dns": {"query": "example-c2.bad"},
},
{
"vendor": "suricata",
"event_type": "http",
"ts": "2026-02-21T09:13:40+08:00",
"src_ip": "10.0.1.23",
"src_port": 49888,
"dst_ip": "203.0.113.10",
"dst_port": 80,
"proto": "tcp",
"http": {"host": "example-c2.bad", "url": "/payload.bin", "method": "GET"},
},
]
6.3 规则映射:从“事件语义”到“Technique 候选集合”
工程上常见的做法是“三层组合”:
-
规则层:基于告警名/分类/端口/协议的快速映射(低成本、易解释)
-
特征层:基于 DNS/HTTP/TLS 的字段特征映射(更细粒度)
-
上下文层:同一实体(host/user)在时间窗内的事件链(提升置信度)
本文先给一个可直接复用的“规则 + 特征”版本。
import re
from typing import Dict, Any, List
# 1) 简单规则:通过签名关键字映射(可扩展为你自己的规则库)
SIGNATURE_RULES = [
# 扫描
{
"pattern": re.compile(r"scan|sweep|port\s*scan|nmap", re.I),
"technique_id": "T1046",
"confidence": 0.7,
"evidence": "alert.signature indicates scanning",
},
]
# 2) 特征规则:通过字段存在与特征映射
def infer_techniques_from_event(evt: Dict[str, Any]) -> List[Dict[str, Any]]:
hits = []
vendor = evt.get("vendor")
event_type = evt.get("event_type")
# A. 告警签名关键字
signature = None
if vendor == "suricata":
signature = (evt.get("alert") or {}).get("signature")
if signature:
for rule in SIGNATURE_RULES:
if rule["pattern"].search(signature):
hits.append({
"technique_id": rule["technique_id"],
"confidence": rule["confidence"],
"evidence": [f"signature={signature}", rule["evidence"]],
})
# B. DNS 外联
if event_type == "dns":
q = ((evt.get("dns") or {}).get("query") or "").lower()
if q:
hits.append({
"technique_id": "T1071.004",
"confidence": 0.55,
"evidence": [f"dns.query={q}", "DNS traffic may indicate application-layer C2"],
})
# C. HTTP 外联与下载
if event_type == "http":
host = ((evt.get("http") or {}).get("host") or "").lower()
url = ((evt.get("http") or {}).get("url") or "")
method = ((evt.get("http") or {}).get("method") or "").upper()
if host:
hits.append({
"technique_id": "T1071.001",
"confidence": 0.6,
"evidence": [f"http.host={host}", f"http.method={method}", "Web protocol traffic may indicate C2"],
})
# 简化:以“可疑二进制路径/扩展名”作为下载线索
if method == "GET" and re.search(r"\.(bin|exe|dll|dat)$", url, re.I):
hits.append({
"technique_id": "T1105",
"confidence": 0.65,
"evidence": [f"http.url={url}", "Possible ingress of tool/payload over network"],
})
return hits
6.4 把候选 Technique 变成“带名称/战术/证据”的映射结果
from typing import Tuple
def map_event_to_attack(evt: Dict[str, Any], technique_dict: Dict[str, Dict[str, Any]]):
candidates = infer_techniques_from_event(evt)
mappings = []
for c in candidates:
tid = c["technique_id"]
meta = technique_dict.get(tid, {"name": "(unknown)", "tactics": []})
mappings.append({
"technique_id": tid,
"technique_name": meta["name"],
"tactics": meta["tactics"],
"confidence": float(c["confidence"]),
"evidence": c["evidence"],
})
# 可选:按置信度降序
mappings.sort(key=lambda x: x["confidence"], reverse=True)
return mappings
6.5 运行示例:得到可解释的 ATT&CK 关联
# 假设你已构建 store 和 technique_dict
# technique_dict = build_technique_dictionary(store)
for evt in sample_events:
mappings = map_event_to_attack(evt, technique_dict)
print("=" * 60)
print(evt["vendor"], evt["event_type"], evt["src_ip"], "->", evt["dst_ip"], evt.get("dst_port"))
for m in mappings:
print(f"- {m['technique_id']} {m['technique_name']} tactics={m['tactics']} conf={m['confidence']}")
for e in m["evidence"]:
print(" *", e)
输出(示意):
suricata alert 10.0.1.23 -> 10.0.2.10 445
- T1046 Network Service Scanning tactics=['discovery'] conf=0.7
* signature=SCAN suspicious SMB sweep
* alert.signature indicates scanning
zeek dns 10.0.1.23 -> 8.8.8.8 53
- T1071.004 Application Layer Protocol: DNS tactics=['command-and-control'] conf=0.55
* dns.query=example-c2.bad
* DNS traffic may indicate application-layer C2
suricata http 10.0.1.23 -> 203.0.113.10 80
- T1105 Ingress Tool Transfer tactics=['command-and-control'] conf=0.65
* http.url=/payload.bin
* Possible ingress of tool/payload over network
- T1071.001 Application Layer Protocol: Web Protocols tactics=['command-and-control'] conf=0.6
* http.host=example-c2.bad
* http.method=GET
* Web protocol traffic may indicate C2
提示:
T1105的 tactic 在 ATT&CK 里可能出现在多个阶段(取决于版本与矩阵);以你同步的 ATT&CK 版本为准。
7. 让映射更“像工程”:时间窗聚合与置信度提升
单条告警映射通常置信度有限,真正有价值的是:
-
同一 src_ip / asset_id / user 在时间窗内的多事件串联
-
从“单点命中”升级为“链条证据”
简单的时间窗聚合思路:
from collections import defaultdict
def group_by_entity(events, key="src_ip"):
g = defaultdict(list)
for e in events:
g[e.get(key)].append(e)
return g
def boost_confidence_if_chain(mappings):
"""示例:如果同一实体同时出现扫描 + C2 + 下载,则提升整体置信度。"""
tids = {m["technique_id"] for m in mappings}
bonus = 0.0
if "T1046" in tids and ("T1071.001" in tids or "T1071.004" in tids):
bonus += 0.1
if ("T1071.001" in tids or "T1071.004" in tids) and "T1105" in tids:
bonus += 0.1
out = []
for m in mappings:
m2 = dict(m)
m2["confidence"] = min(1.0, m2["confidence"] + bonus)
out.append(m2)
return out
实践建议:
-
以 资产 ID 为主键(src_ip 会变化、NAT/代理会扰动)
-
引入 CMDB / DHCP / 终端管理系统做实体解析
-
维护“证据列表”,而不是只保留 Technique ID
8. 用大语言模型(LLM)完成告警与 ATT&CK 关联(可落地方案)
规则映射的优点是稳定、可解释,但在这些场景会比较吃力:
-
告警文本差异大:不同厂商/不同规则集的
signature语义不一致 -
事件上下文复杂:单条告警信息不足,需要综合 DNS/HTTP/TLS/资产信息
-
需要“研判式表达”:输出不仅是 Technique ID,还要给出归因理由与证据摘要
LLM 适合补齐“语义理解与归纳”的那一段,但要以工程方式使用:LLM 只做“候选选择 + 解释”,不能直接在全量 ATT&CK 上盲猜。
8.1 推荐架构:RAG(检索增强)+ 结构化输出 + 规则兜底
NSM Event
-> 标准化(结构化字段)
-> 候选检索(Top-K Technique)
-> LLM 选择/排序(Top-N + 解释)
-> 校验(Technique ID 存在性/置信度阈值)
-> 写回事件(attack.* 字段)
同时:规则引擎作为兜底与强信号来源(例如明确的扫描规则命中)。
关键点:
-
候选检索:用关键词/向量相似度从 ATT&CK 技术库中先找 Top-K(如 20)
-
结构化输出:强制 LLM 输出 JSON(便于落库与自动化消费)
-
校验:对 LLM 输出的
technique_id做白名单校验(必须在本地 technique_dict 里)
8.2 候选检索怎么做(两种简单实现)
实现 A:关键词检索(轻量、可解释)
-
把每个 Technique 的
name/description/keywords拼成一段文本 -
对事件侧提取关键词(如 signature、端口、协议、DNS/HTTP 字段)
-
用 BM25/TF-IDF 做 Top-K 检索
实现 B:向量检索(更鲁棒)
-
对每个 Technique 文本做 embedding(本地或云端)
-
对事件摘要做 embedding
-
用余弦相似度取 Top-K
注意:向量检索的收益通常来自“告警文本很碎/口径不统一”的环境。
8.2.1 实际实现(本地):Sentence-Transformers 生成 embedding + FAISS 检索
适用:内网/离线场景、希望 embedding 不出网。
安装依赖(CPU 版也可用;有 GPU 可自行换 CUDA 版本 PyTorch):
pip install -U sentence-transformers faiss-cpu numpy
步骤 1:把每个 Technique 变成一条“可检索文本”
from typing import Dict, Any, List
def technique_doc(technique_id: str, technique_obj: Dict[str, Any]) -> str:
# technique_obj 可来自 STIX 的 attack-pattern
name = technique_obj.get("name", "")
desc = (technique_obj.get("description") or "").replace("\n", " ")
tactics = ",".join(technique_to_tactics(technique_obj))
# 经验:把 technique_id / name / tactics 放在前面,有助于召回
text = f"{technique_id} | {name} | tactics={tactics} | {desc}"
# 控制长度:避免极长 description 影响 embedding 与成本
return text[:2000]
步骤 2:批量生成 Technique embedding 并建立向量索引
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
def build_faiss_index(
technique_corpus: List[str],
model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
):
model = SentenceTransformer(model_name)
# normalize_embeddings=True 让向量单位化,后续可直接用 inner product 近似 cosine
vecs = model.encode(
technique_corpus,
batch_size=64,
show_progress_bar=True,
normalize_embeddings=True,
).astype("float32")
dim = vecs.shape[1]
index = faiss.IndexFlatIP(dim) # 内积相似度(配合 normalize -> cosine)
index.add(vecs)
return model, index, vecs
步骤 3:对 NSM 事件摘要做 embedding,检索 Top-K 候选 Technique
from typing import Tuple
def embed_event_summary(evt_summary: Dict[str, Any]) -> str:
# 把关键字段串起来即可;不要塞 raw 全量日志
parts = []
for k in [
"vendor",
"event_type",
"alert.signature",
"dns.query",
"http.host",
"http.url",
"http.method",
"dst_port",
"proto",
]:
v = evt_summary.get(k)
if v is None or v == "":
continue
parts.append(f"{k}={v}")
return " | ".join(parts)
def search_topk(
model: SentenceTransformer,
index: faiss.Index,
query_text: str,
top_k: int = 20,
) -> Tuple[np.ndarray, np.ndarray]:
q = model.encode([query_text], normalize_embeddings=True).astype("float32")
scores, idx = index.search(q, top_k)
return scores[0], idx[0]
把索引与映射保存到磁盘(便于上线服务化):
import json
import faiss
# 保存索引
faiss.write_index(index, "attack_techniques.faiss")
# 保存 technique_id 列表(与向量顺序一一对应)
with open("attack_technique_ids.json", "w", encoding="utf-8") as f:
json.dump(technique_ids, f, ensure_ascii=False, indent=2)
模型选择建议:中文/多语言环境优先选择 multilingual embedding 模型;如果你的告警文本主要是英文签名,英文模型通常更强。
8.2.2 实际实现(云端):调用 Embedding API 生成向量 + 向量库检索
适用:你已有云端模型调用能力(如 OpenAI-compatible),希望更强 embedding 质量/更低本地运维。
通用流程:
-
为每个 Technique doc 调用 embedding 接口得到向量
-
将向量写入向量库(FAISS / Milvus / Qdrant / pgvector 等)
-
对事件摘要向量化后做 Top-K 检索
以 OpenAI-compatible 接口为例(伪代码级别,字段以你的 provider 文档为准):
import requests
def embed_cloud(texts, base_url: str, api_key: str, model: str):
r = requests.post(
f"{base_url}/v1/embeddings",
headers={"Authorization": f"Bearer {api_key}"},
json={"model": model, "input": texts},
timeout=60,
)
r.raise_for_status()
data = r.json()
# 典型返回:data=[{"embedding": [...], "index": 0}, ...]
return [item["embedding"] for item in data["data"]]
上线建议:
-
先做一遍全量 Technique embedding 的离线构建(每天/每周更新一次)
-
线上只对事件摘要做 embedding + 检索(低延迟)
-
严格记录 embedding 模型版本(更换模型会导致向量空间不一致,需要重建索引)
8.3 给 LLM 的输入要“短、结构化、可追溯”
建议把 LLM 输入分成两部分:
-
事件摘要(Event Summary):只保留与映射相关的字段(五元组 + DNS/HTTP/TLS + 告警名)
-
候选 Technique 列表(Candidates):Top-K,每个候选只给:
-
technique_id -
name -
tactics -
short_description(可截断到 200~400 字)
这样 LLM 的任务会变成:
-
在“有限候选”里做分类/排序
-
输出
technique_id + confidence + evidence
8.4 Prompt 模板(强制 JSON 输出)
核心原则:让模型做“从候选集中挑选/排序”,不要让模型在全量 ATT&CK 上自由发挥。
你可以把 system prompt 固化为“安全分析助手 + 严格输出 JSON”,例如:
你是 SOC 的威胁分析助手。
你的任务是:根据输入的 NSM 事件摘要,在给定的 ATT&CK technique 候选列表中,选择最匹配的 0~3 个 technique。
严格要求:
- 只能从 candidates 里选择 technique_id,不得输出 candidates 之外的 ID。
- 必须输出合法 JSON,且仅输出 JSON(不要解释性文字)。
- evidence 必须引用 event_summary 中的字段,使用 "field=value" 形式(例如 "dns.query=example.com")。
- 如果无法判断,请返回空数组,并在 reason 里说明缺失信息。
输出 JSON schema:
{
"mappings": [
{
"technique_id": "Txxxx"|"Txxxx.xxx",
"confidence": 0.0~1.0,
"evidence": ["..."],
"why": "一句话原因"
}
],
"reason": "可选,无法判断时填写"
}
user prompt 建议只包含两块:
-
event_summary:结构化、短文本/JSON -
candidates:Top-K techniques(每条含 id/name/tactics/short_description)
8.5 Python 落地示例:RAG + LLM + 校验(以 Ollama 为例)
下面给出一个“可直接粘贴改造”的最小可用实现:
-
输入:标准化 NSM 事件
evt+ 候选 techniquescandidates -
输出:经过白名单校验后的
attack mappings
依赖:
pip install requests
代码示例:
import json
import requests
from typing import Any, Dict, List
SYSTEM_PROMPT = """
你是 SOC 的威胁分析助手。
你的任务是:根据输入的 NSM 事件摘要,在给定的 ATT&CK technique 候选列表中,选择最匹配的 0~3 个 technique。
严格要求:
- 只能从 candidates 里选择 technique_id,不得输出 candidates 之外的 ID。
- 必须输出合法 JSON,且仅输出 JSON(不要解释性文字)。
- evidence 必须引用 event_summary 中的字段,使用 \"field=value\" 形式。
- 如果无法判断,请返回空数组,并在 reason 里说明缺失信息。
输出 JSON schema:
{\n \"mappings\": [{\"technique_id\": \"Txxxx\", \"confidence\": 0.0, \"evidence\": [], \"why\": \"...\"}],\n \"reason\": \"...\"\n}
""".strip()
def build_event_summary(evt: Dict[str, Any]) -> Dict[str, Any]:
"""只保留映射需要的字段,避免把整条 raw 事件塞给模型。"""
out = {
"vendor": evt.get("vendor"),
"event_type": evt.get("event_type"),
"src_ip": evt.get("src_ip"),
"src_port": evt.get("src_port"),
"dst_ip": evt.get("dst_ip"),
"dst_port": evt.get("dst_port"),
"proto": evt.get("proto"),
}
# 可选字段:根据你的数据源填充
if (evt.get("alert") or {}).get("signature"):
out["alert.signature"] = evt["alert"]["signature"]
if (evt.get("dns") or {}).get("query"):
out["dns.query"] = evt["dns"]["query"]
if evt.get("http"):
http = evt["http"]
if http.get("host"):
out["http.host"] = http["host"]
if http.get("url"):
out["http.url"] = http["url"]
if http.get("method"):
out["http.method"] = http["method"]
return out
def call_ollama_json(model: str, system_prompt: str, user_payload: Dict[str, Any]) -> Dict[str, Any]:
"""调用本地 Ollama /api/chat 并解析返回 JSON。"""
url = "http://127.0.0.1:11434/api/chat"
r = requests.post(
url,
json={
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(user_payload, ensure_ascii=False)},
],
"stream": False,
# 可选:降低随机性,稳定输出
"options": {"temperature": 0.1},
},
timeout=120,
)
r.raise_for_status()
content = r.json()["message"]["content"]
return json.loads(content)
def validate_llm_result(
llm_result: Dict[str, Any],
technique_dict: Dict[str, Dict[str, Any]],
max_items: int = 3,
min_confidence: float = 0.3,
) -> List[Dict[str, Any]]:
"""把 LLM 输出变成可写回事件的 mappings,并做最基本的安全校验。"""
mappings = llm_result.get("mappings") or []
if not isinstance(mappings, list):
return []
out = []
for m in mappings[:max_items]:
if not isinstance(m, dict):
continue
tid = m.get("technique_id")
conf = float(m.get("confidence") or 0.0)
evidence = m.get("evidence") or []
why = m.get("why") or ""
# 1) 白名单:必须是 ATT&CK 字典中存在的 Technique
if tid not in technique_dict:
continue
# 2) 置信度阈值
if conf < min_confidence:
continue
# 3) 证据必须是列表
if not isinstance(evidence, list):
evidence = [str(evidence)]
meta = technique_dict[tid]
out.append(
{
"technique_id": tid,
"technique_name": meta.get("name", ""),
"tactics": meta.get("tactics", []),
"confidence": conf,
"evidence": [str(x) for x in evidence],
"why": str(why),
"source": "llm",
}
)
# 按置信度排序
out.sort(key=lambda x: x["confidence"], reverse=True)
return out
def llm_map_event_to_attack(
evt: Dict[str, Any],
candidates: List[Dict[str, Any]],
technique_dict: Dict[str, Dict[str, Any]],
model: str = "qwen2.5",
) -> List[Dict[str, Any]]:
payload = {
"event_summary": build_event_summary(evt),
"candidates": candidates,
}
llm_result = call_ollama_json(model=model, system_prompt=SYSTEM_PROMPT, user_payload=payload)
return validate_llm_result(llm_result, technique_dict)
你需要补齐的两块:
-
candidates:来自你的检索层(BM25/向量库),建议 Top-20 -
technique_dict:来自第 6.1 节的本地 ATT&CK 字典(用于白名单校验与补全名称/战术)
如果你不使用 Ollama,也可以把
call_ollama_json()替换为任何 OpenAI-compatible Chat API 调用;关键不在供应商,而在“候选检索 + JSON 输出 + 校验”。
落地时建议加 4 个防护:
-
Technique 白名单校验:必须存在于
technique_dict -
置信度阈值:低于阈值不写回(或标注为
low_confidence) -
证据必须可追溯:
evidence必须引用事件字段(如dns.query=...) -
与规则融合:规则命中时提升置信度或直接锁定 Technique
8.6 “规则 + LLM”融合的推荐策略
-
规则引擎产出一组
hard_hits(强信号,conf 可设为 0.8~0.95) -
检索产出
candidates -
LLM 在
candidates + hard_hits上做排序与解释 -
如果 LLM 输出和
hard_hits冲突,以hard_hits为准并要求 LLM 给出“冲突说明”(用于规则优化)
9. 常见坑与最佳实践
-
不要把 ATT&CK 当成 IOC 库:ATT&CK 侧重行为知识(TTP),不是“某域名/某 IP 是否恶意”。
-
映射是概率而非真相:同一网络行为可能对应多个 Technique,建议支持“多标签 + 置信度”。
-
版本管理:ATT&CK 会更新(Technique 名称、归类、子技术拆分都会变化),建议缓存时记录
attack_version与同步时间。 -
映射规则要可审计:让分析师能回溯“为什么映射成这个 Technique”。
-
把映射结果写回你的数据平台:例如在事件里追加字段:
attack.technique_id/attack.technique_name/attack.tactics/attack.confidence
-
LLM 一定要“先检索后判断”:避免模型在全量技术上凭印象输出错误 ID。
-
LLM 输出必须可验证:JSON schema + 白名单 + 证据引用,是上线的底线。
10. 小结
你现在有了一条可落地的路径:
-
用 Python 从 TAXII 或 STIX 文件读取 ATT&CK
-
预计算
Technique ID -> 名称/战术字典 -
把 NSM 告警标准化为统一事件结构
-
用“规则 + 特征 + 上下文”的方式输出可解释的 ATT&CK 映射结果
-
在规则覆盖不足时,引入 RAG + LLM 做“候选选择/排序 + 可解释摘要”,并通过校验机制保证可控性
这套方法的关键不是“映射是否完美”,而是:结果可解释、可迭代、可度量。

