本文聚焦一个具体问题:如何把每个 ATT&CK Technique(STIX attack-pattern)变成 embedding 向量,并用向量检索从全量 Technique 中检索 Top-K 候选

典型用途:用于“NSM 告警 -> ATT&CK Technique”映射中的 候选召回层(RAG 的 Retrieval)。

关联文章:


1. 总体流程(你真正要落地的管道)

ATT&CK STIX (attack-pattern)
  -> 文本化(Technique Doc)
    -> Embedding(本地/云端)
      -> 建索引(FAISS/向量库)
        -> Query(事件摘要 embedding)
          -> Top-K candidates

关键设计点:

  • Doc 结构要稳定:Technique 的文本拼接方式要可复现,否则更新时无法增量对齐。

  • 向量空间要版本化:embedding 模型变了 -> 全量重建索引。

  • 只做候选召回:Top-K 召回(如 20)即可,后续让 LLM 或规则引擎做精排与解释。

1.1 Top-K 的原理:相似度打分 + 排序截断

向量检索里,Top-K 的含义非常直接:

  1. 把查询(Query)文本变成向量 q(比如“事件摘要”)

  2. 把库里的每条文档变成向量 d_i(比如每个 Technique doc)

  3. 计算相似度分数 score(q, d_i)

  4. 按分数从高到低排序,取前 K 个(即 Top-K candidates)

在工程实现中最常见的是 cosine 相似度

  • cos(q, d) = (q · d) / (||q|| * ||d||)

如果你在生成向量时做了单位化(normalize_embeddings=True),就有:

  • ||q|| = ||d|| = 1

  • cos(q, d) = q · d

所以很多实现(包括本文的 FAISS IndexFlatIP)会用 内积(inner product) 做检索:

  • 分数越大,表示两段文本在 embedding 空间里越“语义相近”

K 取多大?本质是召回率 vs 成本的权衡

  • K 小:候选更少,后续 LLM 精排成本低,但可能漏掉正确 Technique(召回率下降)

  • K 大:候选更全,召回率提升,但后续 LLM token 成本/延迟上升,也更容易引入干扰项

经验值(NSM 告警 -> ATT&CK 映射):

  • 初期:K=20 或 30(更稳的召回)

  • 稳定后:K=10~20(降低成本)

“Exact Top-K” vs “Approx Top-K”

  • IndexFlatIP精确检索,等价于对所有向量逐个打分再取 Top-K(规模小/中等时足够)

  • IVF/HNSW 等:近似检索,更快,但 Top-K 可能不是绝对最优(规模大时常用)


2. 数据准备:从 STIX 提取 Technique,并生成“可检索 Doc”

2.1 从 STIX store 拿到 attack-pattern

你可以沿用上一篇的 STIX 读取方式(TAXII 或本地 JSON)。这里假设你已经有 store

from stix2 import Filter

techniques = store.query([Filter("type", "=", "attack-pattern")])
print("attack-pattern:", len(techniques))

2.2 提取 ATT&CK Technique ID(Txxxx / Txxxx.xxx

def get_attack_id(stix_obj):
    for ref in stix_obj.get("external_references", []):
        if ref.get("source_name") == "mitre-attack" and ref.get("external_id"):
            return ref["external_id"]
    return None

2.3 生成 Technique Doc(推荐格式)

目标:让 embedding 模型能“读懂”这个 Technique 的语义,同时保证 doc 不至于过长。

推荐包含:

  • technique_id

  • name

  • tactics(从 kill_chain_phases 推导)

  • description(截断)

def technique_to_tactics(technique_obj):
    phases = technique_obj.get("kill_chain_phases", [])
    tactics = []
    for p in phases:
        if p.get("kill_chain_name") == "mitre-attack" and p.get("phase_name"):
            tactics.append(p["phase_name"])
    return sorted(set(tactics))


def technique_doc(technique_id: str, technique_obj: dict) -> str:
    name = technique_obj.get("name", "")
    desc = (technique_obj.get("description") or "").replace("\n", " ")
    tactics = ",".join(technique_to_tactics(technique_obj))

    # 把强结构字段放前面,便于模型对齐
    text = f"{technique_id} | {name} | tactics={tactics} | {desc}"

    # 控制长度:避免极长 description
    return text[:2000]

把 Technique 列表变成 technique_idscorpus

technique_ids = []
corpus = []
for t in techniques:
    tid = get_attack_id(t)
    if not tid:
        continue
    technique_ids.append(tid)
    corpus.append(technique_doc(tid, t))

print("docs:", len(corpus))

3. 本地实现:Sentence-Transformers + FAISS(CPU 可跑)

适用:内网、离线、不希望文本发到云端。

3.1 安装依赖

pip install -U sentence-transformers faiss-cpu numpy

如果你有 GPU,可自行安装对应 CUDA 的 PyTorch,并把 faiss-cpu 换成 faiss-gpu(取决于你的环境)。

3.2 选择 embedding 模型

常用选择(示例):

  • sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2:多语言、体积适中、易部署

你的告警文本如果主要是英文签名,英文向量模型可能更强;如果混合中文分析注释,优先多语言模型。

3.3 批量生成 Technique embedding 并建索引

用 cosine 相似度检索的经典做法:

  • normalize_embeddings=True 让向量单位化

  • FAISS 用 IndexFlatIP(inner product)

import numpy as np
import faiss
from sentence_transformers import SentenceTransformer


def build_faiss_index(
    corpus: list[str],
    model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
):
    model = SentenceTransformer(model_name)

    vecs = model.encode(
        corpus,
        batch_size=64,
        show_progress_bar=True,
        normalize_embeddings=True,
    ).astype("float32")

    dim = vecs.shape[1]
    index = faiss.IndexFlatIP(dim)
    index.add(vecs)

    return model, index

3.4 Query:把“事件摘要”向量化并 Top-K 检索

事件摘要要短、结构化,避免把整条 raw 日志塞进去。

from typing import Dict, Any, Tuple


def embed_event_summary(evt_summary: Dict[str, Any]) -> str:
    keys = [
        "vendor",
        "event_type",
        "alert.signature",
        "dns.query",
        "http.host",
        "http.url",
        "http.method",
        "dst_port",
        "proto",
    ]
    parts = []
    for k in keys:
        v = evt_summary.get(k)
        if v is None or v == "":
            continue
        parts.append(f"{k}={v}")
    return " | ".join(parts)


def search_topk(model, index, query_text: str, top_k: int = 20) -> Tuple[list[float], list[int]]:
    q = model.encode([query_text], normalize_embeddings=True).astype("float32")
    scores, idx = index.search(q, top_k)
    return scores[0].tolist(), idx[0].tolist()

把索引结果映射回 technique_ids

scores, idx = search_topk(model, index, query_text, top_k=20)

candidates = []
for s, i in zip(scores, idx):
    tid = technique_ids[i]
    candidates.append({"technique_id": tid, "score": float(s)})

print(candidates[:5])

3.5 索引与元数据落盘(上线必须做)

import json
import faiss

faiss.write_index(index, "attack_techniques.faiss")

with open("attack_technique_ids.json", "w", encoding="utf-8") as f:
    json.dump(
        {
            "embedding_model": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
            "technique_ids": technique_ids,
        },
        f,
        ensure_ascii=False,
        indent=2,
    )

加载:

import json
import faiss

index = faiss.read_index("attack_techniques.faiss")
meta = json.load(open("attack_technique_ids.json", "r", encoding="utf-8"))
technique_ids = meta["technique_ids"]

4. 云端实现:Embedding API + 向量库(FAISS / Milvus / Qdrant / pgvector)

适用:你已经有云端模型调用能力,追求更强 embedding 质量与更少本地模型运维。

4.1 通用接口:OpenAI-compatible /v1/embeddings

下面是“只依赖 requests”的最小封装。

import requests


def embed_cloud(texts, base_url: str, api_key: str, model: str):
    r = requests.post(
        f"{base_url}/v1/embeddings",
        headers={"Authorization": f"Bearer {api_key}"},
        json={"model": model, "input": texts},
        timeout=60,
    )
    r.raise_for_status()
    data = r.json()
    return [item["embedding"] for item in data["data"]]

批量构建(注意分批、注意速率限制):

def batched(xs, batch_size=128):
    for i in range(0, len(xs), batch_size):
        yield xs[i : i + batch_size]

all_vecs = []
for batch in batched(corpus, batch_size=128):
    vecs = embed_cloud(
        batch,
        base_url="https://your-provider.example.com",
        api_key="YOUR_KEY",
        model="your-embedding-model",
    )
    all_vecs.extend(vecs)

print(len(all_vecs), len(all_vecs[0]))

得到向量后,你可以:

  • 继续用 FAISS(本地索引,embedding 由云端生成)

  • 或写入 向量数据库(Milvus/Qdrant/pgvector)做在线检索

4.2 上线注意事项(云端 embedding 更需要)

  • 模型版本固定:embedding 模型一旦换版本,向量空间变化 -> 全量重建索引

  • 文本脱敏:事件摘要里不要带敏感字段(账号、明文 URL 参数、内部域名等),或做脱敏后再发云端

  • 批量与重试:embedding API 调用需要做分批、超时、重试、断点续跑


5. 候选候选:如何把 Top-K 候选转成 LLM 输入

向量检索只是返回 Top-K technique_id,你通常还要把这些候选补齐成:

  • technique_id

  • name

  • tactics

  • short_description

再喂给 LLM 做“选择/排序 + 解释”。候选越少越好(一般 10~30),否则会显著增加 LLM token 成本。


6. 常见坑(很容易踩)

  1. doc 太长:description 极长会稀释语义,建议截断到 1k~2k 字符。

  2. 把 raw 日志直接 embedding:噪声巨大、成本高、效果不稳定;用 event summary。

  3. 不做版本管理:embedding 模型、ATT&CK 数据版本没记录,索引不可追溯。

  4. 用向量检索做最终判定:向量召回是“找近似候选”,不是最终归因。


7. 完整实现源码(可直接运行)

下面给出一个完整的 Python 脚本,覆盖从“读取 ATT&CK STIX bundle(本地 JSON)-> 生成 Technique Doc -> 本地 embedding -> FAISS 建索引/落盘 -> Query 检索 Top-K”的全流程。

说明:示例使用本地文件 enterprise-attack.json(ATT&CK Enterprise STIX bundle)。你可以替换为自己从 TAXII 同步得到的 bundle 文件。

依赖:

pip install -U stix2 sentence-transformers faiss-cpu numpy

脚本源码(保存为 attack_embedding_faiss.py):

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""ATT&CK Technique Embedding + FAISS Top-K 检索示例。

功能:
- 从本地 ATT&CK Enterprise STIX bundle(enterprise-attack.json)读取 attack-pattern
- 生成 Technique Doc(technique_id + name + tactics + description 截断)
- 本地 embedding(sentence-transformers)
- FAISS 建索引(IndexFlatIP + normalize -> cosine)
- 索引与元数据落盘
- 对查询文本(事件摘要)做 Top-K 检索输出

运行示例:
  python attack_embedding_faiss.py \
    --bundle ./enterprise-attack.json \
    --out ./out \
    --topk 20 \
    --query 'vendor=suricata | event_type=alert | alert.signature=SCAN suspicious SMB sweep | dst_port=445 | proto=tcp'
"""

import argparse
import json
import os
from typing import Any, Dict, List, Tuple

import numpy as np
import faiss
from stix2 import MemoryStore, Filter
from sentence_transformers import SentenceTransformer


def get_attack_id(stix_obj: Dict[str, Any]) -> str | None:
    for ref in stix_obj.get("external_references", []):
        if ref.get("source_name") == "mitre-attack" and ref.get("external_id"):
            return ref["external_id"]
    return None


def technique_to_tactics(technique_obj: Dict[str, Any]) -> List[str]:
    phases = technique_obj.get("kill_chain_phases", [])
    tactics: List[str] = []
    for p in phases:
        if p.get("kill_chain_name") == "mitre-attack" and p.get("phase_name"):
            tactics.append(p["phase_name"])
    return sorted(set(tactics))


def technique_doc(technique_id: str, technique_obj: Dict[str, Any], max_chars: int = 2000) -> str:
    name = technique_obj.get("name", "")
    desc = (technique_obj.get("description") or "").replace("\n", " ")
    tactics = ",".join(technique_to_tactics(technique_obj))

    # 强结构字段放前面
    text = f"{technique_id} | {name} | tactics={tactics} | {desc}"
    return text[:max_chars]


def load_bundle_store(bundle_path: str) -> MemoryStore:
    with open(bundle_path, "r", encoding="utf-8") as f:
        bundle = json.load(f)
    return MemoryStore(stix_data=bundle["objects"])


def build_corpus(store: MemoryStore) -> Tuple[List[str], List[str]]:
    techniques = store.query([Filter("type", "=", "attack-pattern")])

    technique_ids: List[str] = []
    corpus: List[str] = []

    for t in techniques:
        tid = get_attack_id(t)
        if not tid:
            continue
        technique_ids.append(tid)
        corpus.append(technique_doc(tid, t))

    if not corpus:
        raise RuntimeError("未从 bundle 中提取到任何 technique doc,请检查 bundle 是否为 Enterprise ATT&CK STIX")

    return technique_ids, corpus


def build_faiss_index(
    corpus: List[str],
    model_name: str,
    batch_size: int = 64,
) -> Tuple[SentenceTransformer, faiss.Index]:
    model = SentenceTransformer(model_name)

    vecs = model.encode(
        corpus,
        batch_size=batch_size,
        show_progress_bar=True,
        normalize_embeddings=True,
    ).astype("float32")

    dim = vecs.shape[1]
    index = faiss.IndexFlatIP(dim)
    index.add(vecs)

    return model, index


def search_topk(
    model: SentenceTransformer,
    index: faiss.Index,
    query_text: str,
    top_k: int,
) -> Tuple[List[float], List[int]]:
    q = model.encode([query_text], normalize_embeddings=True).astype("float32")
    scores, idx = index.search(q, top_k)
    return scores[0].tolist(), idx[0].tolist()


def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)


def save_index(out_dir: str, index: faiss.Index, meta: Dict[str, Any]):
    ensure_dir(out_dir)
    faiss.write_index(index, os.path.join(out_dir, "attack_techniques.faiss"))
    with open(os.path.join(out_dir, "attack_technique_meta.json"), "w", encoding="utf-8") as f:
        json.dump(meta, f, ensure_ascii=False, indent=2)


def main():
    ap = argparse.ArgumentParser(description="ATT&CK Technique embedding + FAISS Top-K 检索")
    ap.add_argument("--bundle", required=True, help="ATT&CK Enterprise STIX bundle JSON,例如 enterprise-attack.json")
    ap.add_argument(
        "--model",
        default="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
        help="embedding 模型名称(Sentence-Transformers)",
    )
    ap.add_argument("--out", default="./out", help="索引输出目录")
    ap.add_argument("--topk", type=int, default=20, help="Top-K 候选数量")
    ap.add_argument("--query", default="", help="查询文本(建议用事件摘要字段拼接)")
    args = ap.parse_args()

    store = load_bundle_store(args.bundle)
    technique_ids, corpus = build_corpus(store)

    model, index = build_faiss_index(corpus=corpus, model_name=args.model)

    meta = {
        "embedding_model": args.model,
        "technique_ids": technique_ids,
        "doc_max_chars": 2000,
        "index_type": "IndexFlatIP",
        "similarity": "cosine (via normalized inner product)",
    }
    save_index(args.out, index, meta)

    print(f"[OK] built index: {len(technique_ids)} docs -> {os.path.join(args.out, 'attack_techniques.faiss')}")

    if args.query:
        scores, idx = search_topk(model, index, args.query, top_k=args.topk)
        print("\nTop-K results:")
        for rank, (s, i) in enumerate(zip(scores, idx), start=1):
            tid = technique_ids[i]
            print(f"{rank:02d}. {tid} score={s:.4f}")


if __name__ == "__main__":
    main()

你可以基于该脚本扩展两件事:

  • 用真实的 NSM 事件摘要替换 --query 文本(保持结构化字段拼接)

  • 把 Top-K technique_id 补全为 {id,name,tactics,short_description} 作为 LLM 的 candidates 输入


8. 小结

本文给出了两条可直接落地的 embedding 实现:

  • 本地:sentence-transformers 生成向量 + FAISS 建索引/检索

  • 云端:Embedding API 批量生成向量 + 向量库/FAISS 做检索

把它接到“NSM 告警 -> ATT&CK”的管道里,你就能稳定产出 Top-K Technique 候选,再交给 LLM/规则引擎做精排与解释。