概述

威胁情报是网络安全运营的重要能力,通过收集、分析和应用威胁情报,可以提前识别潜在威胁,提高安全防护能力。本教程将详细介绍威胁情报的收集、分析和应用方法。

威胁情报基础

威胁情报定义

┌─────────────────────────────────────────────┐
│            威胁情报层次模型                │
├─────────────────────────────────────────────┤
│                                             │
│  ┌─────────────────────────────────────┐  │
│  │      战略情报 (Strategic)           │  │
│  │  - 威胁趋势分析                      │  │
│  │  - 攻击组织分析                      │  │
│  │  - 长期威胁预测                      │  │
│  └─────────────────────────────────────┘  │
│                                             │
│  ┌─────────────────────────────────────┐  │
│  │      战术情报 (Tactical)             │  │
│  │  - TTP 分析                          │  │
│  │  - 攻击工具分析                      │  │
│  │  - 漏洞信息                          │  │
│  └─────────────────────────────────────┘  │
│                                             │
│  ┌─────────────────────────────────────┐  │
│  │      操作情报 (Operational)          │  │
│  │  - IOC (入侵指标)                    │  │
│  │  - 恶意域名/IP                       │  │
│  │  - 文件哈希                          │  │
│  └─────────────────────────────────────┘  │
│                                             │
│  ┌─────────────────────────────────────┐  │
│  │      技术情报 (Technical)            │  │
│  │  - 恶意软件样本                      │  │
│  │  - 攻击代码                          │  │
│  │  - 漏洞利用代码                      │  │
│  └─────────────────────────────────────┘  │
│                                             │
└─────────────────────────────────────────────┘

威胁情报价值

威胁情报价值:
  预防价值:
    - 提前识别威胁
    - 预防攻击发生
    - 减少安全事件
  
  检测价值:
    - 提高检测准确率
    - 减少误报率
    - 加速威胁识别
  
  响应价值:
    - 加速事件响应
    - 提供上下文信息
    - 指导响应策略
  
  决策价值:
    - 支持安全决策
    - 优化资源配置
    - 制定安全策略

威胁情报收集

数据源分类

# 威胁情报数据源
class ThreatIntelSources:
    def __init__(self):
        self.sources = {
            'open_source': self.get_open_source_intel,
            'commercial': self.get_commercial_intel,
            'government': self.get_government_intel,
            'industry': self.get_industry_intel,
            'internal': self.get_internal_intel
        }
    
    def get_open_source_intel(self):
        """开源威胁情报"""
        return {
            'sources': [
                'CVE 数据库',
                'MITRE ATT&CK',
                'AlienVault OTX',
                'VirusTotal',
                'Abuse.ch',
                'PhishTank'
            ],
            'access': '免费',
            'update_frequency': '实时/每日',
            'quality': '中等'
        }
    
    def get_commercial_intel(self):
        """商业威胁情报"""
        return {
            'sources': [
                'FireEye iSIGHT',
                'CrowdStrike Falcon',
                'Palo Alto Unit 42',
                'Recorded Future',
                'ThreatConnect'
            ],
            'access': '付费',
            'update_frequency': '实时',
            'quality': '高'
        }
    
    def get_government_intel(self):
        """政府威胁情报"""
        return {
            'sources': [
                'CISA (美国)',
                'NCSC (英国)',
                'ANSSI (法国)',
                'CNCERT (中国)',
                'CERT-EU (欧盟)'
            ],
            'access': '免费/受限',
            'update_frequency': '定期',
            'quality': '高'
        }
    
    def get_industry_intel(self):
        """行业威胁情报"""
        return {
            'sources': [
                'FS-ISOC (金融)',
                'NH-ISAC (医疗)',
                'Auto-ISAC (汽车)',
                'E-ISAC (能源)',
                'IT-ISAC (信息技术)'
            ],
            'access': '会员制',
            'update_frequency': '定期',
            'quality': '高'
        }
    
    def get_internal_intel(self):
        """内部威胁情报"""
        return {
            'sources': [
                '内部日志',
                '事件记录',
                '蜜罐数据',
                '沙箱分析',
                '用户报告'
            ],
            'access': '内部',
            'update_frequency': '实时',
            'quality': '高'
        }

# 使用示例
sources = ThreatIntelSources()
print("开源威胁情报:", sources.get_open_source_intel())

IOC 收集

# IOC 收集器
import requests
import hashlib
from datetime import datetime

class IOCCollector:
    def __init__(self):
        self.iocs = []
    
    def collect_ip_iocs(self, source_url):
        """收集 IP 地址 IOC"""
        try:
            response = requests.get(source_url)
            if response.status_code == 200:
                data = response.json()
                
                for item in data.get('indicators', []):
                    if item['type'] == 'ip':
                        ioc = {
                            'type': 'ip',
                            'value': item['value'],
                            'source': source_url,
                            'confidence': item.get('confidence', 'medium'),
                            'first_seen': item.get('first_seen'),
                            'last_seen': item.get('last_seen'),
                            'tags': item.get('tags', []),
                            'collected_at': datetime.now()
                        }
                        self.iocs.append(ioc)
                
                return len(self.iocs)
        except Exception as e:
            print(f"收集 IP IOC 失败: {e}")
            return 0
    
    def collect_domain_iocs(self, source_url):
        """收集域名 IOC"""
        try:
            response = requests.get(source_url)
            if response.status_code == 200:
                data = response.json()
                
                for item in data.get('indicators', []):
                    if item['type'] == 'domain':
                        ioc = {
                            'type': 'domain',
                            'value': item['value'],
                            'source': source_url,
                            'confidence': item.get('confidence', 'medium'),
                            'first_seen': item.get('first_seen'),
                            'last_seen': item.get('last_seen'),
                            'tags': item.get('tags', []),
                            'collected_at': datetime.now()
                        }
                        self.iocs.append(ioc)
                
                return len(self.iocs)
        except Exception as e:
            print(f"收集域名 IOC 失败: {e}")
            return 0
    
    def collect_hash_iocs(self, file_path):
        """收集文件哈希 IOC"""
        try:
            # 计算文件哈希
            md5_hash = self.calculate_hash(file_path, 'md5')
            sha256_hash = self.calculate_hash(file_path, 'sha256')
            
            iocs = [
                {
                    'type': 'hash',
                    'algorithm': 'md5',
                    'value': md5_hash,
                    'file_path': file_path,
                    'collected_at': datetime.now()
                },
                {
                    'type': 'hash',
                    'algorithm': 'sha256',
                    'value': sha256_hash,
                    'file_path': file_path,
                    'collected_at': datetime.now()
                }
            ]
            
            self.iocs.extend(iocs)
            return len(iocs)
        except Exception as e:
            print(f"收集哈希 IOC 失败: {e}")
            return 0
    
    def calculate_hash(self, file_path, algorithm):
        """计算文件哈希"""
        hash_func = hashlib.new(algorithm)
        
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                hash_func.update(chunk)
        
        return hash_func.hexdigest()
    
    def export_iocs(self, format='json'):
        """导出 IOC"""
        if format == 'json':
            import json
            return json.dumps(self.iocs, indent=2, default=str)
        elif format == 'stix':
            return self.export_stix()
        return None
    
    def export_stix(self):
        """导出 STIX 格式"""
        stix_objects = []
        
        for ioc in self.iocs:
            stix_obj = {
                'type': 'indicator',
                'id': f"indicator--{hashlib.md5(ioc['value'].encode()).hexdigest()}",
                'created': ioc['collected_at'].isoformat(),
                'modified': ioc['collected_at'].isoformat(),
                'pattern': f"[file:hashes.MD5 = '{ioc['value']}']",
                'valid_from': ioc['collected_at'].isoformat()
            }
            stix_objects.append(stix_obj)
        
        return stix_objects

# 使用示例
collector = IOCCollector()
collector.collect_ip_iocs('https://api.threatintel.example.com/ip')
collector.collect_domain_iocs('https://api.threatintel.example.com/domain')
collector.collect_hash_iocs('/path/to/suspicious/file.exe')

print(f"收集到 {len(collector.iocs)} 个 IOC")

TTP 收集

# TTP 收集器
class TTPCollector:
    def __init__(self):
        self.ttps = []
    
    def collect_from_mitre(self):
        """从 MITRE ATT&CK 收集 TTP"""
        # MITRE ATT&CK Enterprise Tactics
        tactics = [
            'Reconnaissance',
            'Resource Development',
            'Initial Access',
            'Execution',
            'Persistence',
            'Privilege Escalation',
            'Defense Evasion',
            'Credential Access',
            'Discovery',
            'Lateral Movement',
            'Collection',
            'Command and Control',
            'Exfiltration',
            'Impact'
        ]
        
        for tactic in tactics:
            ttp = {
                'tactic': tactic,
                'techniques': self.get_techniques_for_tactic(tactic),
                'source': 'MITRE ATT&CK',
                'collected_at': datetime.now()
            }
            self.ttps.append(ttp)
        
        return len(self.ttps)
    
    def get_techniques_for_tactic(self, tactic):
        """获取战术对应的技术"""
        # 这里应该是从 MITRE ATT&CK API 获取
        # 示例数据
        techniques_map = {
            'Initial Access': [
                'Spearphishing Link',
                'Spearphishing Attachment',
                'Drive-by Compromise',
                'Exploit Public-Facing Application',
                'Supply Chain Compromise'
            ],
            'Execution': [
                'Command and Scripting Interpreter',
                'PowerShell',
                'Windows Management Instrumentation',
                'User Execution'
            ],
            'Persistence': [
                'Account Manipulation',
                'BITS Jobs',
                'Boot or Logon Autostart Execution',
                'Create Account'
            ]
        }
        
        return techniques_map.get(tactic, [])
    
    def analyze_ttp_patterns(self):
        """分析 TTP 模式"""
        patterns = {}
        
        for ttp in self.ttps:
            tactic = ttp['tactic']
            techniques = ttp['techniques']
            
            for technique in techniques:
                if technique not in patterns:
                    patterns[technique] = {
                        'count': 0,
                        'tactics': set()
                    }
                
                patterns[technique]['count'] += 1
                patterns[technique]['tactics'].add(tactic)
        
        # 转换 set 为 list
        for technique in patterns:
            patterns[technique]['tactics'] = list(patterns[technique]['tactics'])
        
        return patterns

# 使用示例
ttp_collector = TTPCollector()
ttp_collector.collect_from_mitre()
patterns = ttp_collector.analyze_ttp_patterns()

print("TTP 模式分析:")
for technique, data in patterns.items():
    print(f"  {technique}: {data['count']} 次, 战术: {data['tactics']}")

威胁情报分析

情报验证

# 威胁情报验证器
class ThreatIntelValidator:
    def __init__(self):
        self.validation_rules = {
            'ip': self.validate_ip,
            'domain': self.validate_domain,
            'hash': self.validate_hash,
            'url': self.validate_url
        }
    
    def validate_ioc(self, ioc):
        """验证 IOC"""
        ioc_type = ioc['type']
        
        if ioc_type in self.validation_rules:
            return self.validation_rules[ioc_type](ioc)
        
        return {'valid': False, 'reason': '未知类型'}
    
    def validate_ip(self, ioc):
        """验证 IP 地址"""
        import ipaddress
        
        try:
            ip = ipaddress.ip_address(ioc['value'])
            
            # 检查是否为私有 IP
            if ip.is_private:
                return {
                    'valid': True,
                    'confidence': 'low',
                    'reason': '私有 IP 地址'
                }
            
            # 检查是否为保留 IP
            if ip.is_reserved:
                return {
                    'valid': False,
                    'confidence': 'low',
                    'reason': '保留 IP 地址'
                }
            
            return {
                'valid': True,
                'confidence': 'medium',
                'reason': '有效 IP 地址'
            }
        except ValueError:
            return {
                'valid': False,
                'confidence': 'low',
                'reason': '无效 IP 地址'
            }
    
    def validate_domain(self, ioc):
        """验证域名"""
        import re
        
        domain = ioc['value']
        
        # 基本域名验证
        domain_pattern = r'^[a-zA-Z0-9][a-zA-Z0-9-]{0,61}[a-zA-Z0-9]\.[a-zA-Z]{2,}$'
        
        if re.match(domain_pattern, domain):
            # 检查 TLD
            tld = domain.split('.')[-1]
            
            if len(tld) > 10:
                return {
                    'valid': False,
                    'confidence': 'low',
                    'reason': '可疑 TLD'
                }
            
            return {
                'valid': True,
                'confidence': 'medium',
                'reason': '有效域名'
            }
        
        return {
            'valid': False,
            'confidence': 'low',
            'reason': '无效域名'
        }
    
    def validate_hash(self, ioc):
        """验证哈希值"""
        hash_value = ioc['value']
        algorithm = ioc.get('algorithm', 'md5')
        
        if algorithm == 'md5':
            if len(hash_value) == 32:
                return {'valid': True, 'confidence': 'high'}
        elif algorithm == 'sha256':
            if len(hash_value) == 64:
                return {'valid': True, 'confidence': 'high'}
        
        return {
            'valid': False,
            'confidence': 'low',
            'reason': '无效哈希值'
        }
    
    def validate_url(self, ioc):
        """验证 URL"""
        from urllib.parse import urlparse
        
        try:
            url = urlparse(ioc['value'])
            
            if not url.scheme or not url.netloc:
                return {
                    'valid': False,
                    'confidence': 'low',
                    'reason': '无效 URL'
                }
            
            return {
                'valid': True,
                'confidence': 'medium',
                'reason': '有效 URL'
            }
        except Exception:
            return {
                'valid': False,
                'confidence': 'low',
                'reason': '无效 URL'
            }

# 使用示例
validator = ThreatIntelValidator()

ioc = {
    'type': 'ip',
    'value': '192.168.1.100'
}

result = validator.validate_ioc(ioc)
print(f"验证结果: {result}")

情报关联分析

# 情报关联分析器
class IntelCorrelationAnalyzer:
    def __init__(self):
        self.relationships = []
    
    def correlate_iocs(self, iocs):
        """关联分析 IOC"""
        correlations = []
        
        # IP-域名关联
        ip_domain_pairs = self.find_ip_domain_correlations(iocs)
        correlations.extend(ip_domain_pairs)
        
        # 域名-URL 关联
        domain_url_pairs = self.find_domain_url_correlations(iocs)
        correlations.extend(domain_url_pairs)
        
        # 哈希-文件关联
        hash_file_pairs = self.find_hash_file_correlations(iocs)
        correlations.extend(hash_file_pairs)
        
        return correlations
    
    def find_ip_domain_correlations(self, iocs):
        """查找 IP-域名关联"""
        correlations = []
        
        ips = [ioc for ioc in iocs if ioc['type'] == 'ip']
        domains = [ioc for ioc in iocs if ioc['type'] == 'domain']
        
        for ip in ips:
            for domain in domains:
                # 检查 DNS 解析
                if self.check_dns_resolution(domain['value'], ip['value']):
                    correlation = {
                        'type': 'ip_domain',
                        'ip': ip['value'],
                        'domain': domain['value'],
                        'confidence': 'high',
                        'discovered_at': datetime.now()
                    }
                    correlations.append(correlation)
        
        return correlations
    
    def find_domain_url_correlations(self, iocs):
        """查找域名-URL 关联"""
        correlations = []
        
        domains = [ioc for ioc in iocs if ioc['type'] == 'domain']
        urls = [ioc for ioc in iocs if ioc['type'] == 'url']
        
        for domain in domains:
            for url in urls:
                if domain['value'] in url['value']:
                    correlation = {
                        'type': 'domain_url',
                        'domain': domain['value'],
                        'url': url['value'],
                        'confidence': 'high',
                        'discovered_at': datetime.now()
                    }
                    correlations.append(correlation)
        
        return correlations
    
    def find_hash_file_correlations(self, iocs):
        """查找哈希-文件关联"""
        correlations = []
        
        hashes = [ioc for ioc in iocs if ioc['type'] == 'hash']
        
        for hash_ioc in hashes:
            # 检查文件路径
            if 'file_path' in hash_ioc:
                correlation = {
                    'type': 'hash_file',
                    'hash': hash_ioc['value'],
                    'file_path': hash_ioc['file_path'],
                    'confidence': 'high',
                    'discovered_at': datetime.now()
                }
                correlations.append(correlation)
        
        return correlations
    
    def check_dns_resolution(self, domain, ip):
        """检查 DNS 解析"""
        import socket
        
        try:
            resolved_ips = socket.gethostbyname_ex(domain)[2]
            return ip in resolved_ips
        except socket.gaierror:
            return False
    
    def build_threat_graph(self, iocs, correlations):
        """构建威胁图谱"""
        graph = {
            'nodes': [],
            'edges': []
        }
        
        # 添加节点
        for ioc in iocs:
            node = {
                'id': ioc['value'],
                'type': ioc['type'],
                'label': ioc['value']
            }
            graph['nodes'].append(node)
        
        # 添加边
        for correlation in correlations:
            edge = {
                'source': correlation.get('ip') or correlation.get('domain') or correlation.get('hash'),
                'target': correlation.get('domain') or correlation.get('url') or correlation.get('file_path'),
                'type': correlation['type'],
                'confidence': correlation['confidence']
            }
            graph['edges'].append(edge)
        
        return graph

# 使用示例
analyzer = IntelCorrelationAnalyzer()

iocs = [
    {'type': 'ip', 'value': '192.168.1.100'},
    {'type': 'domain', 'value': 'malicious.example.com'},
    {'type': 'url', 'value': 'http://malicious.example.com/payload.exe'},
    {'type': 'hash', 'value': '5d41402abc4b2a76b9719d911017c592', 'file_path': '/tmp/payload.exe'}
]

correlations = analyzer.correlate_iocs(iocs)
graph = analyzer.build_threat_graph(iocs, correlations)

print(f"发现 {len(correlations)} 个关联关系")
print(f"威胁图谱: {graph}")

威胁评分

# 威胁评分系统
class ThreatScoringSystem:
    def __init__(self):
        self.scoring_rules = {
            'confidence': self.score_confidence,
            'severity': self.score_severity,
            'relevance': self.score_relevance,
            'freshness': self.score_freshness
        }
    
    def calculate_threat_score(self, ioc):
        """计算威胁评分"""
        scores = {}
        
        for rule_name, rule_func in self.scoring_rules.items():
            scores[rule_name] = rule_func(ioc)
        
        # 计算综合评分
        total_score = sum(scores.values()) / len(scores)
        
        return {
            'total_score': total_score,
            'component_scores': scores,
            'risk_level': self.determine_risk_level(total_score)
        }
    
    def score_confidence(self, ioc):
        """评分置信度"""
        confidence_map = {
            'high': 0.9,
            'medium': 0.6,
            'low': 0.3
        }
        
        return confidence_map.get(ioc.get('confidence', 'medium'), 0.5)
    
    def score_severity(self, ioc):
        """评分严重性"""
        severity_map = {
            'critical': 1.0,
            'high': 0.8,
            'medium': 0.5,
            'low': 0.2
        }
        
        return severity_map.get(ioc.get('severity', 'medium'), 0.5)
    
    def score_relevance(self, ioc):
        """评分相关性"""
        # 基于标签评分
        tags = ioc.get('tags', [])
        
        high_risk_tags = ['apt', 'malware', 'ransomware', 'trojan']
        medium_risk_tags = ['phishing', 'spam', 'botnet']
        
        for tag in tags:
            if tag.lower() in high_risk_tags:
                return 0.9
            elif tag.lower() in medium_risk_tags:
                return 0.6
        
        return 0.4
    
    def score_freshness(self, ioc):
        """评分时效性"""
        from datetime import timedelta
        
        last_seen = ioc.get('last_seen')
        
        if not last_seen:
            return 0.3
        
        if isinstance(last_seen, str):
            last_seen = datetime.fromisoformat(last_seen)
        
        age = datetime.now() - last_seen
        
        if age < timedelta(days=7):
            return 1.0
        elif age < timedelta(days=30):
            return 0.7
        elif age < timedelta(days=90):
            return 0.4
        else:
            return 0.2
    
    def determine_risk_level(self, score):
        """确定风险等级"""
        if score >= 0.8:
            return 'critical'
        elif score >= 0.6:
            return 'high'
        elif score >= 0.4:
            return 'medium'
        else:
            return 'low'

# 使用示例
scorer = ThreatScoringSystem()

ioc = {
    'type': 'ip',
    'value': '192.168.1.100',
    'confidence': 'high',
    'severity': 'high',
    'tags': ['apt', 'malware'],
    'last_seen': datetime.now()
}

score_result = scorer.calculate_threat_score(ioc)
print(f"威胁评分: {score_result}")

威胁情报应用

威胁狩猎

# 威胁狩猎工具
class ThreatHunter:
    def __init__(self, threat_intel):
        self.threat_intel = threat_intel
        self.hypotheses = []
    
    def create_hypothesis(self, description, iocs):
        """创建狩猎假设"""
        hypothesis = {
            'id': f"HYP-{len(self.hypotheses) + 1}",
            'description': description,
            'iocs': iocs,
            'status': 'active',
            'created_at': datetime.now(),
            'findings': []
        }
        self.hypotheses.append(hypothesis)
        return hypothesis
    
    def hunt_for_iocs(self, hypothesis_id, data_source):
        """狩猎 IOC"""
        hypothesis = self.get_hypothesis(hypothesis_id)
        
        if not hypothesis:
            return None
        
        findings = []
        
        for ioc in hypothesis['iocs']:
            matches = self.search_ioc(ioc, data_source)
            
            if matches:
                finding = {
                    'ioc': ioc,
                    'matches': matches,
                    'timestamp': datetime.now()
                }
                findings.append(finding)
        
        hypothesis['findings'] = findings
        return findings
    
    def search_ioc(self, ioc, data_source):
        """搜索 IOC"""
        matches = []
        
        if ioc['type'] == 'ip':
            matches = self.search_ip(ioc['value'], data_source)
        elif ioc['type'] == 'domain':
            matches = self.search_domain(ioc['value'], data_source)
        elif ioc['type'] == 'hash':
            matches = self.search_hash(ioc['value'], data_source)
        
        return matches
    
    def search_ip(self, ip, data_source):
        """搜索 IP 地址"""
        # 实现具体的搜索逻辑
        # 这里是示例
        matches = []
        
        # 搜索日志
        log_matches = self.search_logs_for_ip(ip, data_source)
        matches.extend(log_matches)
        
        # 搜索网络流量
        traffic_matches = self.search_traffic_for_ip(ip, data_source)
        matches.extend(traffic_matches)
        
        return matches
    
    def search_domain(self, domain, data_source):
        """搜索域名"""
        matches = []
        
        # 搜索 DNS 日志
        dns_matches = self.search_dns_for_domain(domain, data_source)
        matches.extend(dns_matches)
        
        # 搜索代理日志
        proxy_matches = self.search_proxy_for_domain(domain, data_source)
        matches.extend(proxy_matches)
        
        return matches
    
    def search_hash(self, hash_value, data_source):
        """搜索哈希值"""
        matches = []
        
        # 搜索文件系统
        file_matches = self.search_files_for_hash(hash_value, data_source)
        matches.extend(file_matches)
        
        # 搜索进程
        process_matches = self.search_processes_for_hash(hash_value, data_source)
        matches.extend(process_matches)
        
        return matches
    
    def generate_hunting_report(self, hypothesis_id):
        """生成狩猎报告"""
        hypothesis = self.get_hypothesis(hypothesis_id)
        
        if not hypothesis:
            return None
        
        report = {
            'hypothesis_id': hypothesis['id'],
            'description': hypothesis['description'],
            'status': hypothesis['status'],
            'total_findings': len(hypothesis['findings']),
            'findings': hypothesis['findings'],
            'generated_at': datetime.now()
        }
        
        return report
    
    def get_hypothesis(self, hypothesis_id):
        """获取假设"""
        for hypothesis in self.hypotheses:
            if hypothesis['id'] == hypothesis_id:
                return hypothesis
        return None

# 使用示例
hunter = ThreatHunter(threat_intel={})

# 创建狩猎假设
hypothesis = hunter.create_hypothesis(
    "检测 APT 组织活动",
    [
        {'type': 'ip', 'value': '192.168.1.100'},
        {'type': 'domain', 'value': 'malicious.example.com'}
    ]
)

# 执行威胁狩猎
findings = hunter.hunt_for_iocs(hypothesis['id'], 'log_source')

# 生成报告
report = hunter.generate_hunting_report(hypothesis['id'])
print(f"威胁狩猎报告: {report}")

威胁情报共享

# 威胁情报共享平台
class ThreatIntelSharing:
    def __init__(self):
        self.shared_intel = []
        self.partners = []
    
    def add_partner(self, partner_info):
        """添加合作伙伴"""
        partner = {
            'id': f"PARTNER-{len(self.partners) + 1}",
            'name': partner_info['name'],
            'contact': partner_info['contact'],
            'trust_level': partner_info.get('trust_level', 'medium'),
            'sharing_level': partner_info.get('sharing_level', 'restricted'),
            'added_at': datetime.now()
        }
        self.partners.append(partner)
        return partner
    
    def share_intel(self, intel, partner_id):
        """共享威胁情报"""
        partner = self.get_partner(partner_id)
        
        if not partner:
            return {'success': False, 'message': '合作伙伴不存在'}
        
        # 检查共享级别
        if partner['sharing_level'] == 'none':
            return {'success': False, 'message': '不允许共享'}
        
        # 记录共享
        shared_item = {
            'intel': intel,
            'partner_id': partner_id,
            'shared_at': datetime.now(),
            'status': 'shared'
        }
        self.shared_intel.append(shared_item)
        
        return {'success': True, 'message': '情报已共享'}
    
    def receive_intel(self, intel, partner_id):
        """接收威胁情报"""
        partner = self.get_partner(partner_id)
        
        if not partner:
            return {'success': False, 'message': '合作伙伴不存在'}
        
        # 验证情报
        validation_result = self.validate_received_intel(intel, partner)
        
        if not validation_result['valid']:
            return {'success': False, 'message': '情报验证失败'}
        
        # 记录接收
        received_item = {
            'intel': intel,
            'partner_id': partner_id,
            'received_at': datetime.now(),
            'status': 'received',
            'trust_score': partner['trust_level']
        }
        self.shared_intel.append(received_item)
        
        return {'success': True, 'message': '情报已接收'}
    
    def validate_received_intel(self, intel, partner):
        """验证接收的情报"""
        # 基本验证
        if not intel or not isinstance(intel, dict):
            return {'valid': False, 'reason': '无效情报格式'}
        
        # 信任级别验证
        if partner['trust_level'] == 'low':
            return {'valid': False, 'reason': '信任级别过低'}
        
        return {'valid': True}
    
    def get_partner(self, partner_id):
        """获取合作伙伴"""
        for partner in self.partners:
            if partner['id'] == partner_id:
                return partner
        return None
    
    def get_sharing_history(self, partner_id=None):
        """获取共享历史"""
        if partner_id:
            return [item for item in self.shared_intel if item['partner_id'] == partner_id]
        return self.shared_intel

# 使用示例
sharing = ThreatIntelSharing()

# 添加合作伙伴
partner = sharing.add_partner({
    'name': '安全联盟',
    'contact': 'contact@security-alliance.com',
    'trust_level': 'high',
    'sharing_level': 'full'
})

# 共享情报
intel = {
    'type': 'ip',
    'value': '192.168.1.100',
    'threat_type': 'apt',
    'confidence': 'high'
}

result = sharing.share_intel(intel, partner['id'])
print(f"共享结果: {result}")

实际应用示例

构建威胁情报平台

# 威胁情报平台核心
from dataclasses import dataclass
from typing import List, Dict, Optional
import json

@dataclass
class ThreatIntel:
    id: str
    type: str
    value: str
    source: str
    confidence: str
    severity: str
    tags: List[str]
    first_seen: datetime
    last_seen: datetime
    created_at: datetime
    updated_at: datetime
    metadata: Dict

class ThreatIntelPlatform:
    def __init__(self):
        self.intel_db = {}
        self.sources = {}
        self.analyzers = {}
        self.subscribers = {}
    
    def ingest_intel(self, intel_data: Dict) -> ThreatIntel:
        """摄取威胁情报"""
        intel_id = self.generate_intel_id(intel_data)
        
        intel = ThreatIntel(
            id=intel_id,
            type=intel_data['type'],
            value=intel_data['value'],
            source=intel_data['source'],
            confidence=intel_data.get('confidence', 'medium'),
            severity=intel_data.get('severity', 'medium'),
            tags=intel_data.get('tags', []),
            first_seen=intel_data.get('first_seen', datetime.now()),
            last_seen=intel_data.get('last_seen', datetime.now()),
            created_at=datetime.now(),
            updated_at=datetime.now(),
            metadata=intel_data.get('metadata', {})
        )
        
        self.intel_db[intel_id] = intel
        
        # 触发分析
        self.analyze_intel(intel)
        
        # 通知订阅者
        self.notify_subscribers(intel)
        
        return intel
    
    def generate_intel_id(self, intel_data: Dict) -> str:
        """生成情报 ID"""
        import hashlib
        unique_string = f"{intel_data['type']}:{intel_data['value']}:{intel_data['source']}"
        return hashlib.md5(unique_string.encode()).hexdigest()
    
    def analyze_intel(self, intel: ThreatIntel):
        """分析威胁情报"""
        # 验证情报
        validation_result = self.validate_intel(intel)
        
        # 计算威胁评分
        threat_score = self.calculate_threat_score(intel)
        
        # 关联分析
        correlations = self.correlate_intel(intel)
        
        # 更新元数据
        intel.metadata.update({
            'validation': validation_result,
            'threat_score': threat_score,
            'correlations': correlations
        })
        
        intel.updated_at = datetime.now()
    
    def validate_intel(self, intel: ThreatIntel) -> Dict:
        """验证威胁情报"""
        # 实现验证逻辑
        return {
            'valid': True,
            'confidence': intel.confidence
        }
    
    def calculate_threat_score(self, intel: ThreatIntel) -> float:
        """计算威胁评分"""
        # 实现评分逻辑
        score = 0.5
        
        if intel.confidence == 'high':
            score += 0.3
        elif intel.confidence == 'low':
            score -= 0.2
        
        if intel.severity == 'critical':
            score += 0.4
        elif intel.severity == 'high':
            score += 0.2
        
        return min(score, 1.0)
    
    def correlate_intel(self, intel: ThreatIntel) -> List[Dict]:
        """关联威胁情报"""
        correlations = []
        
        for existing_intel in self.intel_db.values():
            if existing_intel.id == intel.id:
                continue
            
            # 检查关联
            if self.check_correlation(intel, existing_intel):
                correlation = {
                    'intel_id': existing_intel.id,
                    'type': 'related',
                    'confidence': 'medium'
                }
                correlations.append(correlation)
        
        return correlations
    
    def check_correlation(self, intel1: ThreatIntel, intel2: ThreatIntel) -> bool:
        """检查关联"""
        # 实现关联检查逻辑
        if intel1.tags and intel2.tags:
            common_tags = set(intel1.tags) & set(intel2.tags)
            if len(common_tags) > 0:
                return True
        
        return False
    
    def search_intel(self, query: Dict) -> List[ThreatIntel]:
        """搜索威胁情报"""
        results = []
        
        for intel in self.intel_db.values():
            match = True
            
            # 类型匹配
            if 'type' in query and intel.type != query['type']:
                match = False
            
            # 值匹配
            if 'value' in query and query['value'] not in intel.value:
                match = False
            
            # 标签匹配
            if 'tags' in query:
                query_tags = set(query['tags'])
                intel_tags = set(intel.tags)
                if not query_tags.issubset(intel_tags):
                    match = False
            
            if match:
                results.append(intel)
        
        return results
    
    def subscribe(self, subscriber_id: str, criteria: Dict):
        """订阅威胁情报"""
        self.subscribers[subscriber_id] = {
            'criteria': criteria,
            'subscribed_at': datetime.now()
        }
    
    def notify_subscribers(self, intel: ThreatIntel):
        """通知订阅者"""
        for subscriber_id, subscription in self.subscribers.items():
            if self.match_criteria(intel, subscription['criteria']):
                # 发送通知
                self.send_notification(subscriber_id, intel)
    
    def match_criteria(self, intel: ThreatIntel, criteria: Dict) -> bool:
        """匹配订阅条件"""
        # 实现匹配逻辑
        if 'type' in criteria and intel.type != criteria['type']:
            return False
        
        if 'severity' in criteria and intel.severity != criteria['severity']:
            return False
        
        return True
    
    def send_notification(self, subscriber_id: str, intel: ThreatIntel):
        """发送通知"""
        # 实现通知逻辑
        print(f"通知 {subscriber_id}: 新威胁情报 {intel.id}")
    
    def export_intel(self, format: str = 'json') -> str:
        """导出威胁情报"""
        if format == 'json':
            return json.dumps(
                [self.intel_to_dict(intel) for intel in self.intel_db.values()],
                indent=2,
                default=str
            )
        elif format == 'stix':
            return self.export_stix()
        return None
    
    def intel_to_dict(self, intel: ThreatIntel) -> Dict:
        """转换为字典"""
        return {
            'id': intel.id,
            'type': intel.type,
            'value': intel.value,
            'source': intel.source,
            'confidence': intel.confidence,
            'severity': intel.severity,
            'tags': intel.tags,
            'first_seen': intel.first_seen.isoformat(),
            'last_seen': intel.last_seen.isoformat(),
            'metadata': intel.metadata
        }
    
    def export_stix(self) -> str:
        """导出 STIX 格式"""
        stix_bundle = {
            'type': 'bundle',
            'id': 'bundle--' + str(hash('threat-intel')),
            'objects': []
        }
        
        for intel in self.intel_db.values():
            stix_object = {
                'type': 'indicator',
                'id': f"indicator--{intel.id}",
                'created': intel.created_at.isoformat(),
                'modified': intel.updated_at.isoformat(),
                'pattern': f"[{intel.type}:value = '{intel.value}']",
                'valid_from': intel.first_seen.isoformat()
            }
            stix_bundle['objects'].append(stix_object)
        
        return json.dumps(stix_bundle, indent=2)

# 创建平台实例
platform = ThreatIntelPlatform()

# 摄取威胁情报
intel1 = platform.ingest_intel({
    'type': 'ip',
    'value': '192.168.1.100',
    'source': 'internal',
    'confidence': 'high',
    'severity': 'high',
    'tags': ['apt', 'malware']
})

intel2 = platform.ingest_intel({
    'type': 'domain',
    'value': 'malicious.example.com',
    'source': 'external',
    'confidence': 'medium',
    'severity': 'medium',
    'tags': ['phishing']
})

# 搜索威胁情报
results = platform.search_intel({'type': 'ip'})
print(f"搜索结果: {len(results)} 个")

# 导出威胁情报
exported = platform.export_intel('json')
print(f"导出的威胁情报: {exported[:200]}...")

总结

本教程详细介绍了威胁情报与分析:

  1. 威胁情报基础

    • 威胁情报定义
    • 威胁情报层次
    • 威胁情报价值
  2. 威胁情报收集

    • 数据源分类
    • IOC 收集
    • TTP 收集
    • 情报摄取
  3. 威胁情报分析

    • 情报验证
    • 关联分析
    • 威胁评分
    • 模式识别
  4. 威胁情报应用

    • 威胁狩猎
    • 情报共享
    • 预防检测
    • 响应支持
  5. 实际应用

    • 威胁情报平台
    • 自动化工具
    • 集成方案
  6. 最佳实践

    • 多源情报
    • 持续更新
    • 质量控制
    • 隐私保护

威胁情报是网络安全运营的重要能力,建立完善的威胁情报体系对于提高安全防护能力至关重要。

下一步

在下一教程中,我们将学习安全运营中心(SOC)建设,包括:

  • SOC 架构设计

  • SOC 人员配置

  • SOC 工具平台

  • SOC 运营流程

  • SOC 最佳实践

继续学习网络安全运营,掌握这门重要领域的更多知识!