ATT&CK Technique 向量化实战:本地/云端生成 Embedding、构建向量索引、Top-K 检索候选(FAISS / Embedding API)
本文聚焦一个具体问题:如何把每个 ATT&CK Technique(STIX attack-pattern)变成 embedding 向量,并用向量检索从全量 Technique 中检索 Top-K 候选。
典型用途:用于“NSM 告警 -> ATT&CK Technique”映射中的 候选召回层(RAG 的 Retrieval)。
关联文章:
1. 总体流程(你真正要落地的管道)
ATT&CK STIX (attack-pattern)
-> 文本化(Technique Doc)
-> Embedding(本地/云端)
-> 建索引(FAISS/向量库)
-> Query(事件摘要 embedding)
-> Top-K candidates
关键设计点:
-
Doc 结构要稳定:Technique 的文本拼接方式要可复现,否则更新时无法增量对齐。
-
向量空间要版本化:embedding 模型变了 -> 全量重建索引。
-
只做候选召回:Top-K 召回(如 20)即可,后续让 LLM 或规则引擎做精排与解释。
1.1 Top-K 的原理:相似度打分 + 排序截断
向量检索里,Top-K 的含义非常直接:
-
把查询(Query)文本变成向量
q(比如“事件摘要”) -
把库里的每条文档变成向量
d_i(比如每个 Technique doc) -
计算相似度分数
score(q, d_i) -
按分数从高到低排序,取前 K 个(即 Top-K candidates)
在工程实现中最常见的是 cosine 相似度:
-
cos(q, d) = (q · d) / (||q|| * ||d||)
如果你在生成向量时做了单位化(normalize_embeddings=True),就有:
-
||q|| = ||d|| = 1 -
cos(q, d) = q · d
所以很多实现(包括本文的 FAISS IndexFlatIP)会用 内积(inner product) 做检索:
-
分数越大,表示两段文本在 embedding 空间里越“语义相近”
K 取多大?本质是召回率 vs 成本的权衡
-
K 小:候选更少,后续 LLM 精排成本低,但可能漏掉正确 Technique(召回率下降)
-
K 大:候选更全,召回率提升,但后续 LLM token 成本/延迟上升,也更容易引入干扰项
经验值(NSM 告警 -> ATT&CK 映射):
-
初期:K=20 或 30(更稳的召回)
-
稳定后:K=10~20(降低成本)
“Exact Top-K” vs “Approx Top-K”
-
IndexFlatIP:精确检索,等价于对所有向量逐个打分再取 Top-K(规模小/中等时足够) -
IVF/HNSW 等:近似检索,更快,但 Top-K 可能不是绝对最优(规模大时常用)
2. 数据准备:从 STIX 提取 Technique,并生成“可检索 Doc”
2.1 从 STIX store 拿到 attack-pattern
你可以沿用上一篇的 STIX 读取方式(TAXII 或本地 JSON)。这里假设你已经有 store。
from stix2 import Filter
techniques = store.query([Filter("type", "=", "attack-pattern")])
print("attack-pattern:", len(techniques))
2.2 提取 ATT&CK Technique ID(Txxxx / Txxxx.xxx)
def get_attack_id(stix_obj):
for ref in stix_obj.get("external_references", []):
if ref.get("source_name") == "mitre-attack" and ref.get("external_id"):
return ref["external_id"]
return None
2.3 生成 Technique Doc(推荐格式)
目标:让 embedding 模型能“读懂”这个 Technique 的语义,同时保证 doc 不至于过长。
推荐包含:
-
technique_id -
name -
tactics(从kill_chain_phases推导) -
description(截断)
def technique_to_tactics(technique_obj):
phases = technique_obj.get("kill_chain_phases", [])
tactics = []
for p in phases:
if p.get("kill_chain_name") == "mitre-attack" and p.get("phase_name"):
tactics.append(p["phase_name"])
return sorted(set(tactics))
def technique_doc(technique_id: str, technique_obj: dict) -> str:
name = technique_obj.get("name", "")
desc = (technique_obj.get("description") or "").replace("\n", " ")
tactics = ",".join(technique_to_tactics(technique_obj))
# 把强结构字段放前面,便于模型对齐
text = f"{technique_id} | {name} | tactics={tactics} | {desc}"
# 控制长度:避免极长 description
return text[:2000]
把 Technique 列表变成 technique_ids 与 corpus:
technique_ids = []
corpus = []
for t in techniques:
tid = get_attack_id(t)
if not tid:
continue
technique_ids.append(tid)
corpus.append(technique_doc(tid, t))
print("docs:", len(corpus))
3. 本地实现:Sentence-Transformers + FAISS(CPU 可跑)
适用:内网、离线、不希望文本发到云端。
3.1 安装依赖
pip install -U sentence-transformers faiss-cpu numpy
如果你有 GPU,可自行安装对应 CUDA 的 PyTorch,并把
faiss-cpu换成faiss-gpu(取决于你的环境)。
3.2 选择 embedding 模型
常用选择(示例):
-
sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2:多语言、体积适中、易部署
你的告警文本如果主要是英文签名,英文向量模型可能更强;如果混合中文分析注释,优先多语言模型。
3.3 批量生成 Technique embedding 并建索引
用 cosine 相似度检索的经典做法:
-
normalize_embeddings=True让向量单位化 -
FAISS 用
IndexFlatIP(inner product)
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
def build_faiss_index(
corpus: list[str],
model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
):
model = SentenceTransformer(model_name)
vecs = model.encode(
corpus,
batch_size=64,
show_progress_bar=True,
normalize_embeddings=True,
).astype("float32")
dim = vecs.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(vecs)
return model, index
3.4 Query:把“事件摘要”向量化并 Top-K 检索
事件摘要要短、结构化,避免把整条 raw 日志塞进去。
from typing import Dict, Any, Tuple
def embed_event_summary(evt_summary: Dict[str, Any]) -> str:
keys = [
"vendor",
"event_type",
"alert.signature",
"dns.query",
"http.host",
"http.url",
"http.method",
"dst_port",
"proto",
]
parts = []
for k in keys:
v = evt_summary.get(k)
if v is None or v == "":
continue
parts.append(f"{k}={v}")
return " | ".join(parts)
def search_topk(model, index, query_text: str, top_k: int = 20) -> Tuple[list[float], list[int]]:
q = model.encode([query_text], normalize_embeddings=True).astype("float32")
scores, idx = index.search(q, top_k)
return scores[0].tolist(), idx[0].tolist()
把索引结果映射回 technique_ids:
scores, idx = search_topk(model, index, query_text, top_k=20)
candidates = []
for s, i in zip(scores, idx):
tid = technique_ids[i]
candidates.append({"technique_id": tid, "score": float(s)})
print(candidates[:5])
3.5 索引与元数据落盘(上线必须做)
import json
import faiss
faiss.write_index(index, "attack_techniques.faiss")
with open("attack_technique_ids.json", "w", encoding="utf-8") as f:
json.dump(
{
"embedding_model": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
"technique_ids": technique_ids,
},
f,
ensure_ascii=False,
indent=2,
)
加载:
import json
import faiss
index = faiss.read_index("attack_techniques.faiss")
meta = json.load(open("attack_technique_ids.json", "r", encoding="utf-8"))
technique_ids = meta["technique_ids"]
4. 云端实现:Embedding API + 向量库(FAISS / Milvus / Qdrant / pgvector)
适用:你已经有云端模型调用能力,追求更强 embedding 质量与更少本地模型运维。
4.1 通用接口:OpenAI-compatible /v1/embeddings
下面是“只依赖 requests”的最小封装。
import requests
def embed_cloud(texts, base_url: str, api_key: str, model: str):
r = requests.post(
f"{base_url}/v1/embeddings",
headers={"Authorization": f"Bearer {api_key}"},
json={"model": model, "input": texts},
timeout=60,
)
r.raise_for_status()
data = r.json()
return [item["embedding"] for item in data["data"]]
批量构建(注意分批、注意速率限制):
def batched(xs, batch_size=128):
for i in range(0, len(xs), batch_size):
yield xs[i : i + batch_size]
all_vecs = []
for batch in batched(corpus, batch_size=128):
vecs = embed_cloud(
batch,
base_url="https://your-provider.example.com",
api_key="YOUR_KEY",
model="your-embedding-model",
)
all_vecs.extend(vecs)
print(len(all_vecs), len(all_vecs[0]))
得到向量后,你可以:
-
继续用 FAISS(本地索引,embedding 由云端生成)
-
或写入 向量数据库(Milvus/Qdrant/pgvector)做在线检索
4.2 上线注意事项(云端 embedding 更需要)
-
模型版本固定:embedding 模型一旦换版本,向量空间变化 -> 全量重建索引
-
文本脱敏:事件摘要里不要带敏感字段(账号、明文 URL 参数、内部域名等),或做脱敏后再发云端
-
批量与重试:embedding API 调用需要做分批、超时、重试、断点续跑
5. 候选候选:如何把 Top-K 候选转成 LLM 输入
向量检索只是返回 Top-K technique_id,你通常还要把这些候选补齐成:
-
technique_id -
name -
tactics -
short_description
再喂给 LLM 做“选择/排序 + 解释”。候选越少越好(一般 10~30),否则会显著增加 LLM token 成本。
6. 常见坑(很容易踩)
-
doc 太长:description 极长会稀释语义,建议截断到 1k~2k 字符。
-
把 raw 日志直接 embedding:噪声巨大、成本高、效果不稳定;用 event summary。
-
不做版本管理:embedding 模型、ATT&CK 数据版本没记录,索引不可追溯。
-
用向量检索做最终判定:向量召回是“找近似候选”,不是最终归因。
7. 完整实现源码(可直接运行)
下面给出一个完整的 Python 脚本,覆盖从“读取 ATT&CK STIX bundle(本地 JSON)-> 生成 Technique Doc -> 本地 embedding -> FAISS 建索引/落盘 -> Query 检索 Top-K”的全流程。
说明:示例使用本地文件
enterprise-attack.json(ATT&CK Enterprise STIX bundle)。你可以替换为自己从 TAXII 同步得到的 bundle 文件。
依赖:
pip install -U stix2 sentence-transformers faiss-cpu numpy
脚本源码(保存为 attack_embedding_faiss.py):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""ATT&CK Technique Embedding + FAISS Top-K 检索示例。
功能:
- 从本地 ATT&CK Enterprise STIX bundle(enterprise-attack.json)读取 attack-pattern
- 生成 Technique Doc(technique_id + name + tactics + description 截断)
- 本地 embedding(sentence-transformers)
- FAISS 建索引(IndexFlatIP + normalize -> cosine)
- 索引与元数据落盘
- 对查询文本(事件摘要)做 Top-K 检索输出
运行示例:
python attack_embedding_faiss.py \
--bundle ./enterprise-attack.json \
--out ./out \
--topk 20 \
--query 'vendor=suricata | event_type=alert | alert.signature=SCAN suspicious SMB sweep | dst_port=445 | proto=tcp'
"""
import argparse
import json
import os
from typing import Any, Dict, List, Tuple
import numpy as np
import faiss
from stix2 import MemoryStore, Filter
from sentence_transformers import SentenceTransformer
def get_attack_id(stix_obj: Dict[str, Any]) -> str | None:
for ref in stix_obj.get("external_references", []):
if ref.get("source_name") == "mitre-attack" and ref.get("external_id"):
return ref["external_id"]
return None
def technique_to_tactics(technique_obj: Dict[str, Any]) -> List[str]:
phases = technique_obj.get("kill_chain_phases", [])
tactics: List[str] = []
for p in phases:
if p.get("kill_chain_name") == "mitre-attack" and p.get("phase_name"):
tactics.append(p["phase_name"])
return sorted(set(tactics))
def technique_doc(technique_id: str, technique_obj: Dict[str, Any], max_chars: int = 2000) -> str:
name = technique_obj.get("name", "")
desc = (technique_obj.get("description") or "").replace("\n", " ")
tactics = ",".join(technique_to_tactics(technique_obj))
# 强结构字段放前面
text = f"{technique_id} | {name} | tactics={tactics} | {desc}"
return text[:max_chars]
def load_bundle_store(bundle_path: str) -> MemoryStore:
with open(bundle_path, "r", encoding="utf-8") as f:
bundle = json.load(f)
return MemoryStore(stix_data=bundle["objects"])
def build_corpus(store: MemoryStore) -> Tuple[List[str], List[str]]:
techniques = store.query([Filter("type", "=", "attack-pattern")])
technique_ids: List[str] = []
corpus: List[str] = []
for t in techniques:
tid = get_attack_id(t)
if not tid:
continue
technique_ids.append(tid)
corpus.append(technique_doc(tid, t))
if not corpus:
raise RuntimeError("未从 bundle 中提取到任何 technique doc,请检查 bundle 是否为 Enterprise ATT&CK STIX")
return technique_ids, corpus
def build_faiss_index(
corpus: List[str],
model_name: str,
batch_size: int = 64,
) -> Tuple[SentenceTransformer, faiss.Index]:
model = SentenceTransformer(model_name)
vecs = model.encode(
corpus,
batch_size=batch_size,
show_progress_bar=True,
normalize_embeddings=True,
).astype("float32")
dim = vecs.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(vecs)
return model, index
def search_topk(
model: SentenceTransformer,
index: faiss.Index,
query_text: str,
top_k: int,
) -> Tuple[List[float], List[int]]:
q = model.encode([query_text], normalize_embeddings=True).astype("float32")
scores, idx = index.search(q, top_k)
return scores[0].tolist(), idx[0].tolist()
def ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
def save_index(out_dir: str, index: faiss.Index, meta: Dict[str, Any]):
ensure_dir(out_dir)
faiss.write_index(index, os.path.join(out_dir, "attack_techniques.faiss"))
with open(os.path.join(out_dir, "attack_technique_meta.json"), "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
def main():
ap = argparse.ArgumentParser(description="ATT&CK Technique embedding + FAISS Top-K 检索")
ap.add_argument("--bundle", required=True, help="ATT&CK Enterprise STIX bundle JSON,例如 enterprise-attack.json")
ap.add_argument(
"--model",
default="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
help="embedding 模型名称(Sentence-Transformers)",
)
ap.add_argument("--out", default="./out", help="索引输出目录")
ap.add_argument("--topk", type=int, default=20, help="Top-K 候选数量")
ap.add_argument("--query", default="", help="查询文本(建议用事件摘要字段拼接)")
args = ap.parse_args()
store = load_bundle_store(args.bundle)
technique_ids, corpus = build_corpus(store)
model, index = build_faiss_index(corpus=corpus, model_name=args.model)
meta = {
"embedding_model": args.model,
"technique_ids": technique_ids,
"doc_max_chars": 2000,
"index_type": "IndexFlatIP",
"similarity": "cosine (via normalized inner product)",
}
save_index(args.out, index, meta)
print(f"[OK] built index: {len(technique_ids)} docs -> {os.path.join(args.out, 'attack_techniques.faiss')}")
if args.query:
scores, idx = search_topk(model, index, args.query, top_k=args.topk)
print("\nTop-K results:")
for rank, (s, i) in enumerate(zip(scores, idx), start=1):
tid = technique_ids[i]
print(f"{rank:02d}. {tid} score={s:.4f}")
if __name__ == "__main__":
main()
你可以基于该脚本扩展两件事:
-
用真实的 NSM 事件摘要替换
--query文本(保持结构化字段拼接) -
把 Top-K
technique_id补全为{id,name,tactics,short_description}作为 LLM 的 candidates 输入
8. 小结
本文给出了两条可直接落地的 embedding 实现:
-
本地:
sentence-transformers生成向量 +FAISS建索引/检索 -
云端:Embedding API 批量生成向量 + 向量库/FAISS 做检索
把它接到“NSM 告警 -> ATT&CK”的管道里,你就能稳定产出 Top-K Technique 候选,再交给 LLM/规则引擎做精排与解释。

