威胁情报与分析
概述
威胁情报是网络安全运营的重要能力,通过收集、分析和应用威胁情报,可以提前识别潜在威胁,提高安全防护能力。本教程将详细介绍威胁情报的收集、分析和应用方法。
威胁情报基础
威胁情报定义
┌─────────────────────────────────────────────┐
│ 威胁情报层次模型 │
├─────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────┐ │
│ │ 战略情报 (Strategic) │ │
│ │ - 威胁趋势分析 │ │
│ │ - 攻击组织分析 │ │
│ │ - 长期威胁预测 │ │
│ └─────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ 战术情报 (Tactical) │ │
│ │ - TTP 分析 │ │
│ │ - 攻击工具分析 │ │
│ │ - 漏洞信息 │ │
│ └─────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ 操作情报 (Operational) │ │
│ │ - IOC (入侵指标) │ │
│ │ - 恶意域名/IP │ │
│ │ - 文件哈希 │ │
│ └─────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ 技术情报 (Technical) │ │
│ │ - 恶意软件样本 │ │
│ │ - 攻击代码 │ │
│ │ - 漏洞利用代码 │ │
│ └─────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────┘
威胁情报价值
威胁情报价值:
预防价值:
- 提前识别威胁
- 预防攻击发生
- 减少安全事件
检测价值:
- 提高检测准确率
- 减少误报率
- 加速威胁识别
响应价值:
- 加速事件响应
- 提供上下文信息
- 指导响应策略
决策价值:
- 支持安全决策
- 优化资源配置
- 制定安全策略
威胁情报收集
数据源分类
# 威胁情报数据源
class ThreatIntelSources:
def __init__(self):
self.sources = {
'open_source': self.get_open_source_intel,
'commercial': self.get_commercial_intel,
'government': self.get_government_intel,
'industry': self.get_industry_intel,
'internal': self.get_internal_intel
}
def get_open_source_intel(self):
"""开源威胁情报"""
return {
'sources': [
'CVE 数据库',
'MITRE ATT&CK',
'AlienVault OTX',
'VirusTotal',
'Abuse.ch',
'PhishTank'
],
'access': '免费',
'update_frequency': '实时/每日',
'quality': '中等'
}
def get_commercial_intel(self):
"""商业威胁情报"""
return {
'sources': [
'FireEye iSIGHT',
'CrowdStrike Falcon',
'Palo Alto Unit 42',
'Recorded Future',
'ThreatConnect'
],
'access': '付费',
'update_frequency': '实时',
'quality': '高'
}
def get_government_intel(self):
"""政府威胁情报"""
return {
'sources': [
'CISA (美国)',
'NCSC (英国)',
'ANSSI (法国)',
'CNCERT (中国)',
'CERT-EU (欧盟)'
],
'access': '免费/受限',
'update_frequency': '定期',
'quality': '高'
}
def get_industry_intel(self):
"""行业威胁情报"""
return {
'sources': [
'FS-ISOC (金融)',
'NH-ISAC (医疗)',
'Auto-ISAC (汽车)',
'E-ISAC (能源)',
'IT-ISAC (信息技术)'
],
'access': '会员制',
'update_frequency': '定期',
'quality': '高'
}
def get_internal_intel(self):
"""内部威胁情报"""
return {
'sources': [
'内部日志',
'事件记录',
'蜜罐数据',
'沙箱分析',
'用户报告'
],
'access': '内部',
'update_frequency': '实时',
'quality': '高'
}
# 使用示例
sources = ThreatIntelSources()
print("开源威胁情报:", sources.get_open_source_intel())
IOC 收集
# IOC 收集器
import requests
import hashlib
from datetime import datetime
class IOCCollector:
def __init__(self):
self.iocs = []
def collect_ip_iocs(self, source_url):
"""收集 IP 地址 IOC"""
try:
response = requests.get(source_url)
if response.status_code == 200:
data = response.json()
for item in data.get('indicators', []):
if item['type'] == 'ip':
ioc = {
'type': 'ip',
'value': item['value'],
'source': source_url,
'confidence': item.get('confidence', 'medium'),
'first_seen': item.get('first_seen'),
'last_seen': item.get('last_seen'),
'tags': item.get('tags', []),
'collected_at': datetime.now()
}
self.iocs.append(ioc)
return len(self.iocs)
except Exception as e:
print(f"收集 IP IOC 失败: {e}")
return 0
def collect_domain_iocs(self, source_url):
"""收集域名 IOC"""
try:
response = requests.get(source_url)
if response.status_code == 200:
data = response.json()
for item in data.get('indicators', []):
if item['type'] == 'domain':
ioc = {
'type': 'domain',
'value': item['value'],
'source': source_url,
'confidence': item.get('confidence', 'medium'),
'first_seen': item.get('first_seen'),
'last_seen': item.get('last_seen'),
'tags': item.get('tags', []),
'collected_at': datetime.now()
}
self.iocs.append(ioc)
return len(self.iocs)
except Exception as e:
print(f"收集域名 IOC 失败: {e}")
return 0
def collect_hash_iocs(self, file_path):
"""收集文件哈希 IOC"""
try:
# 计算文件哈希
md5_hash = self.calculate_hash(file_path, 'md5')
sha256_hash = self.calculate_hash(file_path, 'sha256')
iocs = [
{
'type': 'hash',
'algorithm': 'md5',
'value': md5_hash,
'file_path': file_path,
'collected_at': datetime.now()
},
{
'type': 'hash',
'algorithm': 'sha256',
'value': sha256_hash,
'file_path': file_path,
'collected_at': datetime.now()
}
]
self.iocs.extend(iocs)
return len(iocs)
except Exception as e:
print(f"收集哈希 IOC 失败: {e}")
return 0
def calculate_hash(self, file_path, algorithm):
"""计算文件哈希"""
hash_func = hashlib.new(algorithm)
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_func.update(chunk)
return hash_func.hexdigest()
def export_iocs(self, format='json'):
"""导出 IOC"""
if format == 'json':
import json
return json.dumps(self.iocs, indent=2, default=str)
elif format == 'stix':
return self.export_stix()
return None
def export_stix(self):
"""导出 STIX 格式"""
stix_objects = []
for ioc in self.iocs:
stix_obj = {
'type': 'indicator',
'id': f"indicator--{hashlib.md5(ioc['value'].encode()).hexdigest()}",
'created': ioc['collected_at'].isoformat(),
'modified': ioc['collected_at'].isoformat(),
'pattern': f"[file:hashes.MD5 = '{ioc['value']}']",
'valid_from': ioc['collected_at'].isoformat()
}
stix_objects.append(stix_obj)
return stix_objects
# 使用示例
collector = IOCCollector()
collector.collect_ip_iocs('https://api.threatintel.example.com/ip')
collector.collect_domain_iocs('https://api.threatintel.example.com/domain')
collector.collect_hash_iocs('/path/to/suspicious/file.exe')
print(f"收集到 {len(collector.iocs)} 个 IOC")
TTP 收集
# TTP 收集器
class TTPCollector:
def __init__(self):
self.ttps = []
def collect_from_mitre(self):
"""从 MITRE ATT&CK 收集 TTP"""
# MITRE ATT&CK Enterprise Tactics
tactics = [
'Reconnaissance',
'Resource Development',
'Initial Access',
'Execution',
'Persistence',
'Privilege Escalation',
'Defense Evasion',
'Credential Access',
'Discovery',
'Lateral Movement',
'Collection',
'Command and Control',
'Exfiltration',
'Impact'
]
for tactic in tactics:
ttp = {
'tactic': tactic,
'techniques': self.get_techniques_for_tactic(tactic),
'source': 'MITRE ATT&CK',
'collected_at': datetime.now()
}
self.ttps.append(ttp)
return len(self.ttps)
def get_techniques_for_tactic(self, tactic):
"""获取战术对应的技术"""
# 这里应该是从 MITRE ATT&CK API 获取
# 示例数据
techniques_map = {
'Initial Access': [
'Spearphishing Link',
'Spearphishing Attachment',
'Drive-by Compromise',
'Exploit Public-Facing Application',
'Supply Chain Compromise'
],
'Execution': [
'Command and Scripting Interpreter',
'PowerShell',
'Windows Management Instrumentation',
'User Execution'
],
'Persistence': [
'Account Manipulation',
'BITS Jobs',
'Boot or Logon Autostart Execution',
'Create Account'
]
}
return techniques_map.get(tactic, [])
def analyze_ttp_patterns(self):
"""分析 TTP 模式"""
patterns = {}
for ttp in self.ttps:
tactic = ttp['tactic']
techniques = ttp['techniques']
for technique in techniques:
if technique not in patterns:
patterns[technique] = {
'count': 0,
'tactics': set()
}
patterns[technique]['count'] += 1
patterns[technique]['tactics'].add(tactic)
# 转换 set 为 list
for technique in patterns:
patterns[technique]['tactics'] = list(patterns[technique]['tactics'])
return patterns
# 使用示例
ttp_collector = TTPCollector()
ttp_collector.collect_from_mitre()
patterns = ttp_collector.analyze_ttp_patterns()
print("TTP 模式分析:")
for technique, data in patterns.items():
print(f" {technique}: {data['count']} 次, 战术: {data['tactics']}")
威胁情报分析
情报验证
# 威胁情报验证器
class ThreatIntelValidator:
def __init__(self):
self.validation_rules = {
'ip': self.validate_ip,
'domain': self.validate_domain,
'hash': self.validate_hash,
'url': self.validate_url
}
def validate_ioc(self, ioc):
"""验证 IOC"""
ioc_type = ioc['type']
if ioc_type in self.validation_rules:
return self.validation_rules[ioc_type](ioc)
return {'valid': False, 'reason': '未知类型'}
def validate_ip(self, ioc):
"""验证 IP 地址"""
import ipaddress
try:
ip = ipaddress.ip_address(ioc['value'])
# 检查是否为私有 IP
if ip.is_private:
return {
'valid': True,
'confidence': 'low',
'reason': '私有 IP 地址'
}
# 检查是否为保留 IP
if ip.is_reserved:
return {
'valid': False,
'confidence': 'low',
'reason': '保留 IP 地址'
}
return {
'valid': True,
'confidence': 'medium',
'reason': '有效 IP 地址'
}
except ValueError:
return {
'valid': False,
'confidence': 'low',
'reason': '无效 IP 地址'
}
def validate_domain(self, ioc):
"""验证域名"""
import re
domain = ioc['value']
# 基本域名验证
domain_pattern = r'^[a-zA-Z0-9][a-zA-Z0-9-]{0,61}[a-zA-Z0-9]\.[a-zA-Z]{2,}$'
if re.match(domain_pattern, domain):
# 检查 TLD
tld = domain.split('.')[-1]
if len(tld) > 10:
return {
'valid': False,
'confidence': 'low',
'reason': '可疑 TLD'
}
return {
'valid': True,
'confidence': 'medium',
'reason': '有效域名'
}
return {
'valid': False,
'confidence': 'low',
'reason': '无效域名'
}
def validate_hash(self, ioc):
"""验证哈希值"""
hash_value = ioc['value']
algorithm = ioc.get('algorithm', 'md5')
if algorithm == 'md5':
if len(hash_value) == 32:
return {'valid': True, 'confidence': 'high'}
elif algorithm == 'sha256':
if len(hash_value) == 64:
return {'valid': True, 'confidence': 'high'}
return {
'valid': False,
'confidence': 'low',
'reason': '无效哈希值'
}
def validate_url(self, ioc):
"""验证 URL"""
from urllib.parse import urlparse
try:
url = urlparse(ioc['value'])
if not url.scheme or not url.netloc:
return {
'valid': False,
'confidence': 'low',
'reason': '无效 URL'
}
return {
'valid': True,
'confidence': 'medium',
'reason': '有效 URL'
}
except Exception:
return {
'valid': False,
'confidence': 'low',
'reason': '无效 URL'
}
# 使用示例
validator = ThreatIntelValidator()
ioc = {
'type': 'ip',
'value': '192.168.1.100'
}
result = validator.validate_ioc(ioc)
print(f"验证结果: {result}")
情报关联分析
# 情报关联分析器
class IntelCorrelationAnalyzer:
def __init__(self):
self.relationships = []
def correlate_iocs(self, iocs):
"""关联分析 IOC"""
correlations = []
# IP-域名关联
ip_domain_pairs = self.find_ip_domain_correlations(iocs)
correlations.extend(ip_domain_pairs)
# 域名-URL 关联
domain_url_pairs = self.find_domain_url_correlations(iocs)
correlations.extend(domain_url_pairs)
# 哈希-文件关联
hash_file_pairs = self.find_hash_file_correlations(iocs)
correlations.extend(hash_file_pairs)
return correlations
def find_ip_domain_correlations(self, iocs):
"""查找 IP-域名关联"""
correlations = []
ips = [ioc for ioc in iocs if ioc['type'] == 'ip']
domains = [ioc for ioc in iocs if ioc['type'] == 'domain']
for ip in ips:
for domain in domains:
# 检查 DNS 解析
if self.check_dns_resolution(domain['value'], ip['value']):
correlation = {
'type': 'ip_domain',
'ip': ip['value'],
'domain': domain['value'],
'confidence': 'high',
'discovered_at': datetime.now()
}
correlations.append(correlation)
return correlations
def find_domain_url_correlations(self, iocs):
"""查找域名-URL 关联"""
correlations = []
domains = [ioc for ioc in iocs if ioc['type'] == 'domain']
urls = [ioc for ioc in iocs if ioc['type'] == 'url']
for domain in domains:
for url in urls:
if domain['value'] in url['value']:
correlation = {
'type': 'domain_url',
'domain': domain['value'],
'url': url['value'],
'confidence': 'high',
'discovered_at': datetime.now()
}
correlations.append(correlation)
return correlations
def find_hash_file_correlations(self, iocs):
"""查找哈希-文件关联"""
correlations = []
hashes = [ioc for ioc in iocs if ioc['type'] == 'hash']
for hash_ioc in hashes:
# 检查文件路径
if 'file_path' in hash_ioc:
correlation = {
'type': 'hash_file',
'hash': hash_ioc['value'],
'file_path': hash_ioc['file_path'],
'confidence': 'high',
'discovered_at': datetime.now()
}
correlations.append(correlation)
return correlations
def check_dns_resolution(self, domain, ip):
"""检查 DNS 解析"""
import socket
try:
resolved_ips = socket.gethostbyname_ex(domain)[2]
return ip in resolved_ips
except socket.gaierror:
return False
def build_threat_graph(self, iocs, correlations):
"""构建威胁图谱"""
graph = {
'nodes': [],
'edges': []
}
# 添加节点
for ioc in iocs:
node = {
'id': ioc['value'],
'type': ioc['type'],
'label': ioc['value']
}
graph['nodes'].append(node)
# 添加边
for correlation in correlations:
edge = {
'source': correlation.get('ip') or correlation.get('domain') or correlation.get('hash'),
'target': correlation.get('domain') or correlation.get('url') or correlation.get('file_path'),
'type': correlation['type'],
'confidence': correlation['confidence']
}
graph['edges'].append(edge)
return graph
# 使用示例
analyzer = IntelCorrelationAnalyzer()
iocs = [
{'type': 'ip', 'value': '192.168.1.100'},
{'type': 'domain', 'value': 'malicious.example.com'},
{'type': 'url', 'value': 'http://malicious.example.com/payload.exe'},
{'type': 'hash', 'value': '5d41402abc4b2a76b9719d911017c592', 'file_path': '/tmp/payload.exe'}
]
correlations = analyzer.correlate_iocs(iocs)
graph = analyzer.build_threat_graph(iocs, correlations)
print(f"发现 {len(correlations)} 个关联关系")
print(f"威胁图谱: {graph}")
威胁评分
# 威胁评分系统
class ThreatScoringSystem:
def __init__(self):
self.scoring_rules = {
'confidence': self.score_confidence,
'severity': self.score_severity,
'relevance': self.score_relevance,
'freshness': self.score_freshness
}
def calculate_threat_score(self, ioc):
"""计算威胁评分"""
scores = {}
for rule_name, rule_func in self.scoring_rules.items():
scores[rule_name] = rule_func(ioc)
# 计算综合评分
total_score = sum(scores.values()) / len(scores)
return {
'total_score': total_score,
'component_scores': scores,
'risk_level': self.determine_risk_level(total_score)
}
def score_confidence(self, ioc):
"""评分置信度"""
confidence_map = {
'high': 0.9,
'medium': 0.6,
'low': 0.3
}
return confidence_map.get(ioc.get('confidence', 'medium'), 0.5)
def score_severity(self, ioc):
"""评分严重性"""
severity_map = {
'critical': 1.0,
'high': 0.8,
'medium': 0.5,
'low': 0.2
}
return severity_map.get(ioc.get('severity', 'medium'), 0.5)
def score_relevance(self, ioc):
"""评分相关性"""
# 基于标签评分
tags = ioc.get('tags', [])
high_risk_tags = ['apt', 'malware', 'ransomware', 'trojan']
medium_risk_tags = ['phishing', 'spam', 'botnet']
for tag in tags:
if tag.lower() in high_risk_tags:
return 0.9
elif tag.lower() in medium_risk_tags:
return 0.6
return 0.4
def score_freshness(self, ioc):
"""评分时效性"""
from datetime import timedelta
last_seen = ioc.get('last_seen')
if not last_seen:
return 0.3
if isinstance(last_seen, str):
last_seen = datetime.fromisoformat(last_seen)
age = datetime.now() - last_seen
if age < timedelta(days=7):
return 1.0
elif age < timedelta(days=30):
return 0.7
elif age < timedelta(days=90):
return 0.4
else:
return 0.2
def determine_risk_level(self, score):
"""确定风险等级"""
if score >= 0.8:
return 'critical'
elif score >= 0.6:
return 'high'
elif score >= 0.4:
return 'medium'
else:
return 'low'
# 使用示例
scorer = ThreatScoringSystem()
ioc = {
'type': 'ip',
'value': '192.168.1.100',
'confidence': 'high',
'severity': 'high',
'tags': ['apt', 'malware'],
'last_seen': datetime.now()
}
score_result = scorer.calculate_threat_score(ioc)
print(f"威胁评分: {score_result}")
威胁情报应用
威胁狩猎
# 威胁狩猎工具
class ThreatHunter:
def __init__(self, threat_intel):
self.threat_intel = threat_intel
self.hypotheses = []
def create_hypothesis(self, description, iocs):
"""创建狩猎假设"""
hypothesis = {
'id': f"HYP-{len(self.hypotheses) + 1}",
'description': description,
'iocs': iocs,
'status': 'active',
'created_at': datetime.now(),
'findings': []
}
self.hypotheses.append(hypothesis)
return hypothesis
def hunt_for_iocs(self, hypothesis_id, data_source):
"""狩猎 IOC"""
hypothesis = self.get_hypothesis(hypothesis_id)
if not hypothesis:
return None
findings = []
for ioc in hypothesis['iocs']:
matches = self.search_ioc(ioc, data_source)
if matches:
finding = {
'ioc': ioc,
'matches': matches,
'timestamp': datetime.now()
}
findings.append(finding)
hypothesis['findings'] = findings
return findings
def search_ioc(self, ioc, data_source):
"""搜索 IOC"""
matches = []
if ioc['type'] == 'ip':
matches = self.search_ip(ioc['value'], data_source)
elif ioc['type'] == 'domain':
matches = self.search_domain(ioc['value'], data_source)
elif ioc['type'] == 'hash':
matches = self.search_hash(ioc['value'], data_source)
return matches
def search_ip(self, ip, data_source):
"""搜索 IP 地址"""
# 实现具体的搜索逻辑
# 这里是示例
matches = []
# 搜索日志
log_matches = self.search_logs_for_ip(ip, data_source)
matches.extend(log_matches)
# 搜索网络流量
traffic_matches = self.search_traffic_for_ip(ip, data_source)
matches.extend(traffic_matches)
return matches
def search_domain(self, domain, data_source):
"""搜索域名"""
matches = []
# 搜索 DNS 日志
dns_matches = self.search_dns_for_domain(domain, data_source)
matches.extend(dns_matches)
# 搜索代理日志
proxy_matches = self.search_proxy_for_domain(domain, data_source)
matches.extend(proxy_matches)
return matches
def search_hash(self, hash_value, data_source):
"""搜索哈希值"""
matches = []
# 搜索文件系统
file_matches = self.search_files_for_hash(hash_value, data_source)
matches.extend(file_matches)
# 搜索进程
process_matches = self.search_processes_for_hash(hash_value, data_source)
matches.extend(process_matches)
return matches
def generate_hunting_report(self, hypothesis_id):
"""生成狩猎报告"""
hypothesis = self.get_hypothesis(hypothesis_id)
if not hypothesis:
return None
report = {
'hypothesis_id': hypothesis['id'],
'description': hypothesis['description'],
'status': hypothesis['status'],
'total_findings': len(hypothesis['findings']),
'findings': hypothesis['findings'],
'generated_at': datetime.now()
}
return report
def get_hypothesis(self, hypothesis_id):
"""获取假设"""
for hypothesis in self.hypotheses:
if hypothesis['id'] == hypothesis_id:
return hypothesis
return None
# 使用示例
hunter = ThreatHunter(threat_intel={})
# 创建狩猎假设
hypothesis = hunter.create_hypothesis(
"检测 APT 组织活动",
[
{'type': 'ip', 'value': '192.168.1.100'},
{'type': 'domain', 'value': 'malicious.example.com'}
]
)
# 执行威胁狩猎
findings = hunter.hunt_for_iocs(hypothesis['id'], 'log_source')
# 生成报告
report = hunter.generate_hunting_report(hypothesis['id'])
print(f"威胁狩猎报告: {report}")
威胁情报共享
# 威胁情报共享平台
class ThreatIntelSharing:
def __init__(self):
self.shared_intel = []
self.partners = []
def add_partner(self, partner_info):
"""添加合作伙伴"""
partner = {
'id': f"PARTNER-{len(self.partners) + 1}",
'name': partner_info['name'],
'contact': partner_info['contact'],
'trust_level': partner_info.get('trust_level', 'medium'),
'sharing_level': partner_info.get('sharing_level', 'restricted'),
'added_at': datetime.now()
}
self.partners.append(partner)
return partner
def share_intel(self, intel, partner_id):
"""共享威胁情报"""
partner = self.get_partner(partner_id)
if not partner:
return {'success': False, 'message': '合作伙伴不存在'}
# 检查共享级别
if partner['sharing_level'] == 'none':
return {'success': False, 'message': '不允许共享'}
# 记录共享
shared_item = {
'intel': intel,
'partner_id': partner_id,
'shared_at': datetime.now(),
'status': 'shared'
}
self.shared_intel.append(shared_item)
return {'success': True, 'message': '情报已共享'}
def receive_intel(self, intel, partner_id):
"""接收威胁情报"""
partner = self.get_partner(partner_id)
if not partner:
return {'success': False, 'message': '合作伙伴不存在'}
# 验证情报
validation_result = self.validate_received_intel(intel, partner)
if not validation_result['valid']:
return {'success': False, 'message': '情报验证失败'}
# 记录接收
received_item = {
'intel': intel,
'partner_id': partner_id,
'received_at': datetime.now(),
'status': 'received',
'trust_score': partner['trust_level']
}
self.shared_intel.append(received_item)
return {'success': True, 'message': '情报已接收'}
def validate_received_intel(self, intel, partner):
"""验证接收的情报"""
# 基本验证
if not intel or not isinstance(intel, dict):
return {'valid': False, 'reason': '无效情报格式'}
# 信任级别验证
if partner['trust_level'] == 'low':
return {'valid': False, 'reason': '信任级别过低'}
return {'valid': True}
def get_partner(self, partner_id):
"""获取合作伙伴"""
for partner in self.partners:
if partner['id'] == partner_id:
return partner
return None
def get_sharing_history(self, partner_id=None):
"""获取共享历史"""
if partner_id:
return [item for item in self.shared_intel if item['partner_id'] == partner_id]
return self.shared_intel
# 使用示例
sharing = ThreatIntelSharing()
# 添加合作伙伴
partner = sharing.add_partner({
'name': '安全联盟',
'contact': 'contact@security-alliance.com',
'trust_level': 'high',
'sharing_level': 'full'
})
# 共享情报
intel = {
'type': 'ip',
'value': '192.168.1.100',
'threat_type': 'apt',
'confidence': 'high'
}
result = sharing.share_intel(intel, partner['id'])
print(f"共享结果: {result}")
实际应用示例
构建威胁情报平台
# 威胁情报平台核心
from dataclasses import dataclass
from typing import List, Dict, Optional
import json
@dataclass
class ThreatIntel:
id: str
type: str
value: str
source: str
confidence: str
severity: str
tags: List[str]
first_seen: datetime
last_seen: datetime
created_at: datetime
updated_at: datetime
metadata: Dict
class ThreatIntelPlatform:
def __init__(self):
self.intel_db = {}
self.sources = {}
self.analyzers = {}
self.subscribers = {}
def ingest_intel(self, intel_data: Dict) -> ThreatIntel:
"""摄取威胁情报"""
intel_id = self.generate_intel_id(intel_data)
intel = ThreatIntel(
id=intel_id,
type=intel_data['type'],
value=intel_data['value'],
source=intel_data['source'],
confidence=intel_data.get('confidence', 'medium'),
severity=intel_data.get('severity', 'medium'),
tags=intel_data.get('tags', []),
first_seen=intel_data.get('first_seen', datetime.now()),
last_seen=intel_data.get('last_seen', datetime.now()),
created_at=datetime.now(),
updated_at=datetime.now(),
metadata=intel_data.get('metadata', {})
)
self.intel_db[intel_id] = intel
# 触发分析
self.analyze_intel(intel)
# 通知订阅者
self.notify_subscribers(intel)
return intel
def generate_intel_id(self, intel_data: Dict) -> str:
"""生成情报 ID"""
import hashlib
unique_string = f"{intel_data['type']}:{intel_data['value']}:{intel_data['source']}"
return hashlib.md5(unique_string.encode()).hexdigest()
def analyze_intel(self, intel: ThreatIntel):
"""分析威胁情报"""
# 验证情报
validation_result = self.validate_intel(intel)
# 计算威胁评分
threat_score = self.calculate_threat_score(intel)
# 关联分析
correlations = self.correlate_intel(intel)
# 更新元数据
intel.metadata.update({
'validation': validation_result,
'threat_score': threat_score,
'correlations': correlations
})
intel.updated_at = datetime.now()
def validate_intel(self, intel: ThreatIntel) -> Dict:
"""验证威胁情报"""
# 实现验证逻辑
return {
'valid': True,
'confidence': intel.confidence
}
def calculate_threat_score(self, intel: ThreatIntel) -> float:
"""计算威胁评分"""
# 实现评分逻辑
score = 0.5
if intel.confidence == 'high':
score += 0.3
elif intel.confidence == 'low':
score -= 0.2
if intel.severity == 'critical':
score += 0.4
elif intel.severity == 'high':
score += 0.2
return min(score, 1.0)
def correlate_intel(self, intel: ThreatIntel) -> List[Dict]:
"""关联威胁情报"""
correlations = []
for existing_intel in self.intel_db.values():
if existing_intel.id == intel.id:
continue
# 检查关联
if self.check_correlation(intel, existing_intel):
correlation = {
'intel_id': existing_intel.id,
'type': 'related',
'confidence': 'medium'
}
correlations.append(correlation)
return correlations
def check_correlation(self, intel1: ThreatIntel, intel2: ThreatIntel) -> bool:
"""检查关联"""
# 实现关联检查逻辑
if intel1.tags and intel2.tags:
common_tags = set(intel1.tags) & set(intel2.tags)
if len(common_tags) > 0:
return True
return False
def search_intel(self, query: Dict) -> List[ThreatIntel]:
"""搜索威胁情报"""
results = []
for intel in self.intel_db.values():
match = True
# 类型匹配
if 'type' in query and intel.type != query['type']:
match = False
# 值匹配
if 'value' in query and query['value'] not in intel.value:
match = False
# 标签匹配
if 'tags' in query:
query_tags = set(query['tags'])
intel_tags = set(intel.tags)
if not query_tags.issubset(intel_tags):
match = False
if match:
results.append(intel)
return results
def subscribe(self, subscriber_id: str, criteria: Dict):
"""订阅威胁情报"""
self.subscribers[subscriber_id] = {
'criteria': criteria,
'subscribed_at': datetime.now()
}
def notify_subscribers(self, intel: ThreatIntel):
"""通知订阅者"""
for subscriber_id, subscription in self.subscribers.items():
if self.match_criteria(intel, subscription['criteria']):
# 发送通知
self.send_notification(subscriber_id, intel)
def match_criteria(self, intel: ThreatIntel, criteria: Dict) -> bool:
"""匹配订阅条件"""
# 实现匹配逻辑
if 'type' in criteria and intel.type != criteria['type']:
return False
if 'severity' in criteria and intel.severity != criteria['severity']:
return False
return True
def send_notification(self, subscriber_id: str, intel: ThreatIntel):
"""发送通知"""
# 实现通知逻辑
print(f"通知 {subscriber_id}: 新威胁情报 {intel.id}")
def export_intel(self, format: str = 'json') -> str:
"""导出威胁情报"""
if format == 'json':
return json.dumps(
[self.intel_to_dict(intel) for intel in self.intel_db.values()],
indent=2,
default=str
)
elif format == 'stix':
return self.export_stix()
return None
def intel_to_dict(self, intel: ThreatIntel) -> Dict:
"""转换为字典"""
return {
'id': intel.id,
'type': intel.type,
'value': intel.value,
'source': intel.source,
'confidence': intel.confidence,
'severity': intel.severity,
'tags': intel.tags,
'first_seen': intel.first_seen.isoformat(),
'last_seen': intel.last_seen.isoformat(),
'metadata': intel.metadata
}
def export_stix(self) -> str:
"""导出 STIX 格式"""
stix_bundle = {
'type': 'bundle',
'id': 'bundle--' + str(hash('threat-intel')),
'objects': []
}
for intel in self.intel_db.values():
stix_object = {
'type': 'indicator',
'id': f"indicator--{intel.id}",
'created': intel.created_at.isoformat(),
'modified': intel.updated_at.isoformat(),
'pattern': f"[{intel.type}:value = '{intel.value}']",
'valid_from': intel.first_seen.isoformat()
}
stix_bundle['objects'].append(stix_object)
return json.dumps(stix_bundle, indent=2)
# 创建平台实例
platform = ThreatIntelPlatform()
# 摄取威胁情报
intel1 = platform.ingest_intel({
'type': 'ip',
'value': '192.168.1.100',
'source': 'internal',
'confidence': 'high',
'severity': 'high',
'tags': ['apt', 'malware']
})
intel2 = platform.ingest_intel({
'type': 'domain',
'value': 'malicious.example.com',
'source': 'external',
'confidence': 'medium',
'severity': 'medium',
'tags': ['phishing']
})
# 搜索威胁情报
results = platform.search_intel({'type': 'ip'})
print(f"搜索结果: {len(results)} 个")
# 导出威胁情报
exported = platform.export_intel('json')
print(f"导出的威胁情报: {exported[:200]}...")
总结
本教程详细介绍了威胁情报与分析:
-
威胁情报基础:
- 威胁情报定义
- 威胁情报层次
- 威胁情报价值
-
威胁情报收集:
- 数据源分类
- IOC 收集
- TTP 收集
- 情报摄取
-
威胁情报分析:
- 情报验证
- 关联分析
- 威胁评分
- 模式识别
-
威胁情报应用:
- 威胁狩猎
- 情报共享
- 预防检测
- 响应支持
-
实际应用:
- 威胁情报平台
- 自动化工具
- 集成方案
-
最佳实践:
- 多源情报
- 持续更新
- 质量控制
- 隐私保护
威胁情报是网络安全运营的重要能力,建立完善的威胁情报体系对于提高安全防护能力至关重要。
下一步
在下一教程中,我们将学习安全运营中心(SOC)建设,包括:
-
SOC 架构设计
-
SOC 人员配置
-
SOC 工具平台
-
SOC 运营流程
-
SOC 最佳实践
继续学习网络安全运营,掌握这门重要领域的更多知识!

