概述

安全合规与审计是网络安全运营的重要组成部分,确保组织的安全实践符合法律法规和行业标准要求。本教程将详细介绍合规框架、审计流程和持续合规管理方法。

合规框架与标准

主要合规框架

┌─────────────────────────────────────────────┐
│          主要合规框架                       │
├─────────────────────────────────────────────┤
│                                             │
│  ┌─────────────────────────────────────┐  │
│  │      国际标准                       │  │
│  │  - ISO 27001/27002                 │  │
│  │  - NIST CSF                        │  │
│  │  - COBIT                           │  │
│  │  - ITIL                            │  │
│  └─────────────────────────────────────┘  │
│                                             │
│  ┌─────────────────────────────────────┐  │
│  │      行业标准                       │  │
│  │  - PCI DSS (支付卡)                │  │
│  │  - HIPAA (医疗)                    │  │
│  │  - GLBA (金融)                     │  │
│  │  - SOX (上市公司)                  │  │
│  └─────────────────────────────────────┘  │
│                                             │
│  ┌─────────────────────────────────────┐  │
│  │      地方法规                       │  │
│  │  - GDPR (欧盟)                      │  │
│  │  - CCPA (加州)                      │  │
│  │  - 网络安全法 (中国)                │  │
│  │  - 数据安全法 (中国)                │  │
│  └─────────────────────────────────────┘  │
│                                             │
└─────────────────────────────────────────────┘

ISO 27001 框架

# ISO 27001 合规管理
class ISO27001Compliance:
    def __init__(self):
        self.controls = self.define_controls()
        self.policies = {}
        self.evidence = {}
    
    def define_controls(self):
        """定义控制措施"""
        return {
            'A.5 信息安全策略': [
                '5.1 信息安全策略',
                '5.2 信息安全角色和职责'
            ],
            'A.6 信息安全组织': [
                '6.1 内部组织',
                '6.2 移动设备和远程工作'
            ],
            'A.7 人力资源安全': [
                '7.1 任用前',
                '7.2 任用中',
                '7.3 任用终止或变更'
            ],
            'A.8 资产管理': [
                '8.1 资产责任',
                '8.2 信息分类',
                '8.3 媒体处理'
            ],
            'A.9 访问控制': [
                '9.1 访问控制业务要求',
                '9.2 用户访问管理',
                '9.3 用户责任',
                '9.4 系统和应用访问控制'
            ],
            'A.10 密码学': [
                '10.1 密码控制'
            ],
            'A.11 物理和环境安全': [
                '11.1 安全区域',
                '11.2 设备安全'
            ],
            'A.12 运行安全': [
                '12.1 操作程序和责任',
                '12.2 恶意软件防护',
                '12.3 备份',
                '12.4 日志和监控',
                '12.5 运行软件控制',
                '12.6 技术漏洞管理',
                '12.7 审计信息系统',
                '12.8 运行变更控制'
            ],
            'A.13 通信安全': [
                '13.1 网络安全管理',
                '13.2 信息传输'
            ],
            'A.14 系统获取、开发和维护': [
                '14.1 信息安全要求',
                '14.2 开发和支持过程中的安全',
                '14.3 测试数据'
            ],
            'A.15 供应商关系': [
                '15.1 供应商关系中的信息安全',
                '15.2 供应商服务交付管理'
            ],
            'A.16 信息安全事件管理': [
                '16.1 信息安全事件管理',
                '16.2 信息安全改进'
            ],
            'A.17 业务连续性管理中的信息安全': [
                '17.1 信息安全连续性',
                '17.2 冗余'
            ],
            'A.18 合规性': [
                '18.1 法律法规要求',
                '18.2 知识产权',
                '18.3 记录',
                '18.4 独立审查'
            ]
        }
    
    def assess_compliance(self, control_id):
        """评估合规性"""
        if control_id not in self.controls:
            return {'status': 'error', 'message': '控制不存在'}
        
        # 检查策略
        policy_status = self.check_policy(control_id)
        
        # 检查实施
        implementation_status = self.check_implementation(control_id)
        
        # 检查证据
        evidence_status = self.check_evidence(control_id)
        
        # 综合评估
        overall_status = self.calculate_overall_status(
            policy_status,
            implementation_status,
            evidence_status
        )
        
        return {
            'control_id': control_id,
            'policy_status': policy_status,
            'implementation_status': implementation_status,
            'evidence_status': evidence_status,
            'overall_status': overall_status,
            'assessed_at': datetime.now()
        }
    
    def check_policy(self, control_id):
        """检查策略"""
        if control_id in self.policies:
            return {
                'status': 'compliant',
                'policy': self.policies[control_id],
                'last_reviewed': self.policies[control_id].get('last_reviewed')
            }
        else:
            return {
                'status': 'non_compliant',
                'message': '策略未定义'
            }
    
    def check_implementation(self, control_id):
        """检查实施"""
        # 实现实施检查逻辑
        return {
            'status': 'compliant',
            'implementation_date': datetime.now() - timedelta(days=180)
        }
    
    def check_evidence(self, control_id):
        """检查证据"""
        if control_id in self.evidence:
            return {
                'status': 'compliant',
                'evidence_count': len(self.evidence[control_id]),
                'last_evidence_date': max(
                    e['date'] for e in self.evidence[control_id]
                )
            }
        else:
            return {
                'status': 'non_compliant',
                'message': '无证据'
            }
    
    def calculate_overall_status(self, policy_status, implementation_status, evidence_status):
        """计算总体状态"""
        statuses = [
            policy_status['status'],
            implementation_status['status'],
            evidence_status['status']
        ]
        
        if all(s == 'compliant' for s in statuses):
            return 'compliant'
        elif any(s == 'non_compliant' for s in statuses):
            return 'non_compliant'
        else:
            return 'partial_compliant'
    
    def add_policy(self, control_id, policy_data):
        """添加策略"""
        self.policies[control_id] = {
            'title': policy_data['title'],
            'content': policy_data['content'],
            'created_at': datetime.now(),
            'last_reviewed': None
        }
    
    def add_evidence(self, control_id, evidence_data):
        """添加证据"""
        if control_id not in self.evidence:
            self.evidence[control_id] = []
        
        evidence = {
            'id': f"EVI-{len(self.evidence[control_id]) + 1}",
            'type': evidence_data['type'],
            'description': evidence_data['description'],
            'file_path': evidence_data.get('file_path'),
            'date': datetime.now(),
            'added_by': evidence_data.get('added_by', 'system')
        }
        
        self.evidence[control_id].append(evidence)
        return evidence

# 使用示例
iso_compliance = ISO27001Compliance()

# 添加策略
iso_compliance.add_policy('A.9.1', {
    'title': '访问控制业务要求',
    'content': '建立访问控制策略,确保只有授权人员才能访问信息系统'
})

# 添加证据
iso_compliance.add_evidence('A.9.1', {
    'type': 'document',
    'description': '访问控制策略文档',
    'file_path': '/policies/access-control.pdf'
})

# 评估合规性
assessment = iso_compliance.assess_compliance('A.9.1')
print(f"合规性评估: {assessment}")

NIST CSF 框架

# NIST CSF 合规管理
class NISTCSFCompliance:
    def __init__(self):
        self.functions = self.define_functions()
        self.implementations = {}
    
    def define_functions(self):
        """定义功能"""
        return {
            '识别 (Identify)': {
                'categories': [
                    '资产管理',
                    '商业环境',
                    '治理',
                    '风险评估',
                    '风险管理策略'
                ]
            },
            '保护 (Protect)': {
                'categories': [
                    '访问控制',
                    '意识和培训',
                    '数据安全',
                    '信息保护流程和程序',
                    '维护',
                    '保护技术'
                ]
            },
            '检测 (Detect)': {
                'categories': [
                    '异常和事件',
                    '安全持续监控',
                    '检测流程'
                ]
            },
            '响应 (Respond)': {
                'categories': [
                    '响应规划',
                    '通信',
                    '分析',
                    '缓解',
                    '改进'
                ]
            },
            '恢复 (Recover)': {
                'categories': [
                    '恢复规划',
                    '改进',
                    '通信'
                ]
            }
        }
    
    def assess_maturity(self, function_name):
        """评估成熟度"""
        if function_name not in self.functions:
            return {'status': 'error', 'message': '功能不存在'}
        
        function = self.functions[function_name]
        
        # 评估每个类别
        category_assessments = {}
        for category in function['categories']:
            category_assessments[category] = self.assess_category(
                function_name,
                category
            )
        
        # 计算总体成熟度
        overall_maturity = self.calculate_maturity(category_assessments)
        
        return {
            'function': function_name,
            'categories': category_assessments,
            'overall_maturity': overall_maturity,
            'assessed_at': datetime.now()
        }
    
    def assess_category(self, function_name, category_name):
        """评估类别"""
        # 实现类别评估逻辑
        maturity_levels = {
            1: '部分实施',
            2: '风险告知',
            3: '可重复',
            4: '适应',
            5: '优化'
        }
        
        # 模拟评估结果
        maturity_level = random.randint(1, 5)
        
        return {
            'category': category_name,
            'maturity_level': maturity_level,
            'maturity_description': maturity_levels[maturity_level],
            'gaps': self.identify_gaps(category_name, maturity_level)
        }
    
    def identify_gaps(self, category_name, current_level):
        """识别差距"""
        gaps = []
        
        if current_level < 5:
            gaps.append({
                'current_level': current_level,
                'target_level': 5,
                'description': '需要达到优化级别'
            })
        
        return gaps
    
    def calculate_maturity(self, category_assessments):
        """计算成熟度"""
        maturity_levels = [
            assessment['maturity_level']
            for assessment in category_assessments.values()
        ]
        
        if not maturity_levels:
            return 0
        
        return sum(maturity_levels) / len(maturity_levels)
    
    def generate_roadmap(self, function_name):
        """生成改进路线图"""
        assessment = self.assess_maturity(function_name)
        
        roadmap = {
            'function': function_name,
            'current_maturity': assessment['overall_maturity'],
            'target_maturity': 5,
            'improvements': []
        }
        
        # 为每个类别生成改进计划
        for category, assessment in assessment['categories'].items():
            if assessment['maturity_level'] < 5:
                improvement = {
                    'category': category,
                    'current_level': assessment['maturity_level'],
                    'target_level': 5,
                    'actions': self.generate_improvement_actions(
                        category,
                        assessment['maturity_level']
                    ),
                    'estimated_time': self.estimate_improvement_time(
                        assessment['maturity_level']
                    )
                }
                roadmap['improvements'].append(improvement)
        
        return roadmap
    
    def generate_improvement_actions(self, category, current_level):
        """生成改进行动"""
        actions = []
        
        for level in range(current_level + 1, 6):
            actions.append({
                'target_level': level,
                'description': f'达到级别 {level} 的要求',
                'tasks': self.get_level_tasks(category, level)
            })
        
        return actions
    
    def get_level_tasks(self, category, level):
        """获取级别任务"""
        # 实现任务获取逻辑
        return [
            '审查当前实践',
            '识别改进机会',
            '实施改进措施',
            '验证改进效果'
        ]
    
    def estimate_improvement_time(self, current_level):
        """估算改进时间"""
        # 基于当前级别估算时间
        time_map = {
            1: 12,  # 12 个月
            2: 9,   # 9 个月
            3: 6,   # 6 个月
            4: 3    # 3 个月
        }
        
        return time_map.get(current_level, 6)

# 使用示例
nist_compliance = NISTCSFCompliance()

# 评估成熟度
assessment = nist_compliance.assess_maturity('保护 (Protect)')
print(f"成熟度评估: {assessment}")

# 生成改进路线图
roadmap = nist_compliance.generate_roadmap('保护 (Protect)')
print(f"改进路线图: {roadmap}")

审计流程与方法

审计流程

审计流程:
  1. 审计准备:
    - 确定审计范围
    - 制定审计计划
    - 组建审计团队
    - 准备审计工具
    
  2. 审计执行:
    - 收集证据
    - 进行访谈
    - 观察操作
    - 测试控制
    
  3. 审计分析:
    - 分析证据
    - 识别问题
    - 评估风险
    - 形成发现
    
  4. 审计报告:
    - 起草报告
    - 评审报告
    - 发布报告
    - 跟踪整改
    
  5. 后续跟踪:
    - 监控整改
    - 验证效果
    - 更新计划
    - 持续改进

审计工具

# 审计工具集
class AuditToolkit:
    def __init__(self):
        self.tools = {
            'compliance_checker': ComplianceChecker(),
            'evidence_collector': EvidenceCollector(),
            'vulnerability_scanner': VulnerabilityScanner(),
            'configuration_auditor': ConfigurationAuditor()
        }
    
    def run_compliance_audit(self, framework, scope):
        """运行合规审计"""
        print(f"运行 {framework} 合规审计,范围: {scope}")
        
        results = {
            'framework': framework,
            'scope': scope,
            'started_at': datetime.now(),
            'findings': []
        }
        
        # 运行合规检查
        compliance_results = self.tools['compliance_checker'].check(framework, scope)
        results['findings'].extend(compliance_results)
        
        # 收集证据
        evidence = self.tools['evidence_collector'].collect(scope)
        results['evidence'] = evidence
        
        # 漏洞扫描
        vulnerabilities = self.tools['vulnerability_scanner'].scan(scope)
        results['vulnerabilities'] = vulnerabilities
        
        # 配置审计
        config_issues = self.tools['configuration_auditor'].audit(scope)
        results['configuration_issues'] = config_issues
        
        results['completed_at'] = datetime.now()
        results['duration'] = results['completed_at'] - results['started_at']
        
        return results
    
    def generate_audit_report(self, audit_results):
        """生成审计报告"""
        report = {
            'title': f"{audit_results['framework']} 审计报告",
            'executive_summary': self.generate_executive_summary(audit_results),
            'detailed_findings': audit_results['findings'],
            'vulnerabilities': audit_results['vulnerabilities'],
            'configuration_issues': audit_results['configuration_issues'],
            'recommendations': self.generate_recommendations(audit_results),
            'generated_at': datetime.now()
        }
        
        return report
    
    def generate_executive_summary(self, audit_results):
        """生成执行摘要"""
        total_findings = len(audit_results['findings'])
        critical_findings = len([f for f in audit_results['findings'] if f['severity'] == 'critical'])
        high_findings = len([f for f in audit_results['findings'] if f['severity'] == 'high'])
        
        return {
            'total_findings': total_findings,
            'critical_findings': critical_findings,
            'high_findings': high_findings,
            'overall_status': 'pass' if critical_findings == 0 else 'fail',
            'key_risks': self.identify_key_risks(audit_results)
        }
    
    def identify_key_risks(self, audit_results):
        """识别关键风险"""
        risks = []
        
        # 从发现中识别风险
        for finding in audit_results['findings']:
            if finding['severity'] in ['critical', 'high']:
                risks.append({
                    'risk': finding['description'],
                    'severity': finding['severity'],
                    'likelihood': finding.get('likelihood', 'medium'),
                    'impact': finding.get('impact', 'high')
                })
        
        return risks
    
    def generate_recommendations(self, audit_results):
        """生成建议"""
        recommendations = []
        
        # 基于发现生成建议
        for finding in audit_results['findings']:
            recommendation = {
                'finding': finding['description'],
                'recommendation': finding.get('recommendation', '需要进一步调查'),
                'priority': finding['severity'],
                'estimated_effort': finding.get('estimated_effort', 'medium')
            }
            recommendations.append(recommendation)
        
        return recommendations

class ComplianceChecker:
    def check(self, framework, scope):
        """检查合规性"""
        findings = []
        
        # 实现合规检查逻辑
        # 这里是示例
        
        findings.append({
            'control_id': 'A.9.1',
            'description': '访问控制策略未完全实施',
            'severity': 'high',
            'recommendation': '完善访问控制策略并确保全面实施',
            'likelihood': 'medium',
            'impact': 'high',
            'estimated_effort': 'high'
        })
        
        return findings

class EvidenceCollector:
    def collect(self, scope):
        """收集证据"""
        evidence = []
        
        # 实现证据收集逻辑
        evidence.append({
            'type': 'document',
            'description': '安全策略文档',
            'file_path': '/policies/security.pdf',
            'collected_at': datetime.now()
        })
        
        return evidence

class VulnerabilityScanner:
    def scan(self, scope):
        """扫描漏洞"""
        vulnerabilities = []
        
        # 实现漏洞扫描逻辑
        vulnerabilities.append({
            'cve_id': 'CVE-2024-1234',
            'severity': 'high',
            'description': '远程代码执行漏洞',
            'affected_systems': ['server-01', 'server-02'],
            'remediation': '应用安全补丁'
        })
        
        return vulnerabilities

class ConfigurationAuditor:
    def audit(self, scope):
        """审计配置"""
        issues = []
        
        # 实现配置审计逻辑
        issues.append({
            'system': 'firewall-01',
            'issue': '默认密码未更改',
            'severity': 'critical',
            'recommendation': '立即更改默认密码'
        })
        
        return issues

# 使用示例
audit_toolkit = AuditToolkit()

# 运行审计
audit_results = audit_toolkit.run_compliance_audit('ISO 27001', '全系统')

# 生成报告
report = audit_toolkit.generate_audit_report(audit_results)
print(f"审计报告: {report['executive_summary']}")

合规性检查

自动化合规检查

# 自动化合规检查系统
class AutomatedComplianceChecker:
    def __init__(self):
        self.checks = {}
        self.results = {}
        self.remediations = {}
    
    def add_check(self, check_id, config):
        """添加检查"""
        check = {
            'id': check_id,
            'name': config['name'],
            'description': config['description'],
            'control_id': config['control_id'],
            'check_type': config['check_type'],
            'parameters': config.get('parameters', {}),
            'severity': config.get('severity', 'medium'),
            'remediation': config.get('remediation', '需要手动处理')
        }
        self.checks[check_id] = check
        return check
    
    def run_check(self, check_id):
        """运行检查"""
        if check_id not in self.checks:
            return {'status': 'error', 'message': '检查不存在'}
        
        check = self.checks[check_id]
        check_type = check['check_type']
        
        # 根据检查类型执行
        if check_type == 'configuration':
            result = self.check_configuration(check)
        elif check_type == 'vulnerability':
            result = self.check_vulnerability(check)
        elif check_type == 'access_control':
            result = self.check_access_control(check)
        elif check_type == 'logging':
            result = self.check_logging(check)
        else:
            result = {'status': 'error', 'message': '未知检查类型'}
        
        result['check_id'] = check_id
        result['checked_at'] = datetime.now()
        
        self.results[check_id] = result
        return result
    
    def check_configuration(self, check):
        """检查配置"""
        # 实现配置检查逻辑
        parameters = check['parameters']
        
        # 示例:检查密码策略
        if 'password_policy' in parameters:
            policy = parameters['password_policy']
            
            # 检查最小密码长度
            if policy.get('min_length', 0) < 8:
                return {
                    'status': 'failed',
                    'message': '密码最小长度不足 8 位',
                    'severity': check['severity'],
                    'remediation': '将密码最小长度设置为至少 8 位'
                }
            
            # 检查密码复杂度
            if not policy.get('require_complexity', False):
                return {
                    'status': 'failed',
                    'message': '未启用密码复杂度要求',
                    'severity': check['severity'],
                    'remediation': '启用密码复杂度要求'
                }
        
        return {
            'status': 'passed',
            'message': '配置检查通过'
        }
    
    def check_vulnerability(self, check):
        """检查漏洞"""
        # 实现漏洞检查逻辑
        parameters = check['parameters']
        
        # 示例:检查已知漏洞
        if 'cve_ids' in parameters:
            cve_ids = parameters['cve_ids']
            
            # 检查系统是否存在这些漏洞
            vulnerable_systems = self.scan_for_vulnerabilities(cve_ids)
            
            if vulnerable_systems:
                return {
                    'status': 'failed',
                    'message': f'发现 {len(vulnerable_systems)} 个系统存在漏洞',
                    'affected_systems': vulnerable_systems,
                    'severity': check['severity'],
                    'remediation': check['remediation']
                }
        
        return {
            'status': 'passed',
            'message': '未发现漏洞'
        }
    
    def check_access_control(self, check):
        """检查访问控制"""
        # 实现访问控制检查逻辑
        parameters = check['parameters']
        
        # 示例:检查特权账户
        if 'privilege_accounts' in parameters:
            accounts = parameters['privilege_accounts']
            
            # 检查是否有未授权的特权账户
            unauthorized_accounts = self.check_unauthorized_privileges(accounts)
            
            if unauthorized_accounts:
                return {
                    'status': 'failed',
                    'message': f'发现 {len(unauthorized_accounts)} 个未授权特权账户',
                    'unauthorized_accounts': unauthorized_accounts,
                    'severity': check['severity'],
                    'remediation': '审查并移除未授权的特权账户'
                }
        
        return {
            'status': 'passed',
            'message': '访问控制检查通过'
        }
    
    def check_logging(self, check):
        """检查日志"""
        # 实现日志检查逻辑
        parameters = check['parameters']
        
        # 示例:检查日志配置
        if 'log_sources' in parameters:
            sources = parameters['log_sources']
            
            # 检查日志源是否正确配置
            misconfigured_sources = self.check_log_configuration(sources)
            
            if misconfigured_sources:
                return {
                    'status': 'failed',
                    'message': f'发现 {len(misconfigured_sources)} 个日志源配置错误',
                    'misconfigured_sources': misconfigured_sources,
                    'severity': check['severity'],
                    'remediation': '修正日志源配置'
                }
        
        return {
            'status': 'passed',
            'message': '日志检查通过'
        }
    
    def scan_for_vulnerabilities(self, cve_ids):
        """扫描漏洞"""
        # 实现漏洞扫描逻辑
        # 这里是示例
        return ['server-01', 'server-02']
    
    def check_unauthorized_privileges(self, accounts):
        """检查未授权特权"""
        # 实现特权检查逻辑
        # 这里是示例
        return ['user1', 'user2']
    
    def check_log_configuration(self, sources):
        """检查日志配置"""
        # 实现日志配置检查逻辑
        # 这里是示例
        return ['firewall-01']
    
    def run_all_checks(self):
        """运行所有检查"""
        results = {}
        
        for check_id in self.checks:
            result = self.run_check(check_id)
            results[check_id] = result
        
        return results
    
    def generate_compliance_report(self):
        """生成合规报告"""
        results = self.run_all_checks()
        
        total_checks = len(results)
        passed_checks = len([r for r in results.values() if r['status'] == 'passed'])
        failed_checks = len([r for r in results.values() if r['status'] == 'failed'])
        
        report = {
            'summary': {
                'total_checks': total_checks,
                'passed_checks': passed_checks,
                'failed_checks': failed_checks,
                'compliance_rate': (passed_checks / total_checks * 100) if total_checks > 0 else 0
            },
            'failed_checks': [
                {
                    'check_id': check_id,
                    'name': self.checks[check_id]['name'],
                    'severity': result['severity'],
                    'message': result['message'],
                    'remediation': result.get('remediation')
                }
                for check_id, result in results.items()
                if result['status'] == 'failed'
            ],
            'generated_at': datetime.now()
        }
        
        return report

# 使用示例
compliance_checker = AutomatedComplianceChecker()

# 添加检查
compliance_checker.add_check('password-policy', {
    'name': '密码策略检查',
    'description': '检查密码策略是否符合要求',
    'control_id': 'A.9.4.1',
    'check_type': 'configuration',
    'parameters': {
        'password_policy': {
            'min_length': 6,
            'require_complexity': False
        }
    },
    'severity': 'high',
    'remediation': '更新密码策略'
})

compliance_checker.add_check('vulnerability-scan', {
    'name': '漏洞扫描',
    'description': '检查系统是否存在已知漏洞',
    'control_id': 'A.12.6.1',
    'check_type': 'vulnerability',
    'parameters': {
        'cve_ids': ['CVE-2024-1234', 'CVE-2024-5678']
    },
    'severity': 'critical',
    'remediation': '应用安全补丁'
})

# 生成合规报告
report = compliance_checker.generate_compliance_report()
print(f"合规报告: {report['summary']}")

实际应用示例

构建合规管理平台

# 合规管理平台
class ComplianceManagementPlatform:
    def __init__(self):
        self.frameworks = {}
        self.audits = {}
        self.remediations = {}
        self.reports = {}
    
    def add_framework(self, framework_id, config):
        """添加合规框架"""
        framework = {
            'id': framework_id,
            'name': config['name'],
            'description': config['description'],
            'controls': config['controls'],
            'added_at': datetime.now()
        }
        self.frameworks[framework_id] = framework
        return framework
    
    def create_audit(self, audit_id, config):
        """创建审计"""
        audit = {
            'id': audit_id,
            'name': config['name'],
            'framework_id': config['framework_id'],
            'scope': config['scope'],
            'schedule': config.get('schedule'),
            'status': 'scheduled',
            'created_at': datetime.now()
        }
        self.audits[audit_id] = audit
        return audit
    
    def execute_audit(self, audit_id):
        """执行审计"""
        if audit_id not in self.audits:
            raise ValueError(f"审计不存在: {audit_id}")
        
        audit = self.audits[audit_id]
        audit['status'] = 'in_progress'
        audit['started_at'] = datetime.now()
        
        # 运行合规检查
        framework = self.frameworks[audit['framework_id']]
        results = self.run_compliance_checks(framework, audit['scope'])
        
        audit['results'] = results
        audit['status'] = 'completed'
        audit['completed_at'] = datetime.now()
        audit['duration'] = audit['completed_at'] - audit['started_at']
        
        # 生成报告
        report = self.generate_audit_report(audit)
        self.reports[audit_id] = report
        
        return audit
    
    def run_compliance_checks(self, framework, scope):
        """运行合规检查"""
        results = []
        
        for control in framework['controls']:
            result = self.check_control(control, scope)
            results.append(result)
        
        return results
    
    def check_control(self, control, scope):
        """检查控制"""
        # 实现控制检查逻辑
        return {
            'control_id': control['id'],
            'control_name': control['name'],
            'status': 'compliant',
            'evidence': [],
            'findings': []
        }
    
    def generate_audit_report(self, audit):
        """生成审计报告"""
        total_controls = len(audit['results'])
        compliant_controls = len([r for r in audit['results'] if r['status'] == 'compliant'])
        non_compliant_controls = total_controls - compliant_controls
        
        report = {
            'audit_id': audit['id'],
            'audit_name': audit['name'],
            'framework_id': audit['framework_id'],
            'executive_summary': {
                'total_controls': total_controls,
                'compliant_controls': compliant_controls,
                'non_compliant_controls': non_compliant_controls,
                'compliance_rate': (compliant_controls / total_controls * 100) if total_controls > 0 else 0
            },
            'detailed_results': audit['results'],
            'recommendations': self.generate_recommendations(audit['results']),
            'generated_at': datetime.now()
        }
        
        return report
    
    def generate_recommendations(self, results):
        """生成建议"""
        recommendations = []
        
        for result in results:
            if result['status'] != 'compliant':
                for finding in result['findings']:
                    recommendation = {
                        'control_id': result['control_id'],
                        'finding': finding['description'],
                        'recommendation': finding.get('recommendation', '需要进一步调查'),
                        'priority': finding.get('severity', 'medium'),
                        'estimated_effort': finding.get('estimated_effort', 'medium')
                    }
                    recommendations.append(recommendation)
        
        return recommendations
    
    def create_remediation_plan(self, audit_id):
        """创建整改计划"""
        if audit_id not in self.reports:
            raise ValueError(f"审计报告不存在: {audit_id}")
        
        report = self.reports[audit_id]
        
        remediation_plan = {
            'audit_id': audit_id,
            'recommendations': report['recommendations'],
            'tasks': self.create_remediation_tasks(report['recommendations']),
            'created_at': datetime.now(),
            'status': 'pending'
        }
        
        self.remediations[audit_id] = remediation_plan
        return remediation_plan
    
    def create_remediation_tasks(self, recommendations):
        """创建整改任务"""
        tasks = []
        
        for i, recommendation in enumerate(recommendations):
            task = {
                'task_id': f"TASK-{i + 1}",
                'recommendation': recommendation['recommendation'],
                'priority': recommendation['priority'],
                'estimated_effort': recommendation['estimated_effort'],
                'assigned_to': None,
                'status': 'pending',
                'due_date': None
            }
            tasks.append(task)
        
        return tasks
    
    def schedule_audits(self):
        """调度审计"""
        scheduled_audits = []
        
        for audit_id, audit in self.audits.items():
            if audit['status'] == 'scheduled' and audit.get('schedule'):
                if self.should_run_audit(audit):
                    scheduled_audits.append(audit_id)
        
        return scheduled_audits
    
    def should_run_audit(self, audit):
        """判断是否应该运行审计"""
        schedule = audit.get('schedule')
        
        if not schedule:
            return False
        
        schedule_type = schedule.get('type', 'manual')
        
        if schedule_type == 'daily':
            return True
        elif schedule_type == 'weekly':
            day_of_week = schedule.get('day_of_week', 0)
            return datetime.now().weekday() == day_of_week
        elif schedule_type == 'monthly':
            day_of_month = schedule.get('day_of_month', 1)
            return datetime.now().day == day_of_month
        
        return False

# 创建平台实例
platform = ComplianceManagementPlatform()

# 添加 ISO 27001 框架
platform.add_framework('iso-27001', {
    'name': 'ISO 27001',
    'description': '信息安全管理体系',
    'controls': [
        {'id': 'A.9.1', 'name': '访问控制业务要求'},
        {'id': 'A.12.6', 'name': '技术漏洞管理'},
        {'id': 'A.18.1', 'name': '法律法规要求'}
    ]
})

# 创建审计
audit = platform.create_audit('audit-001', {
    'name': '年度 ISO 27001 审计',
    'framework_id': 'iso-27001',
    'scope': '全系统',
    'schedule': {
        'type': 'yearly',
        'month': 1,
        'day': 1
    }
})

# 执行审计
result = platform.execute_audit('audit-001')
print(f"审计结果: {result['status']}")

# 生成整改计划
remediation_plan = platform.create_remediation_plan('audit-001')
print(f"整改计划: {len(remediation_plan['tasks'])} 个任务")

总结

本教程详细介绍了安全合规与审计:

  1. 合规框架与标准

    • ISO 27001
    • NIST CSF
    • PCI DSS
    • GDPR
    • 其他标准
  2. 审计流程与方法

    • 审计准备
    • 审计执行
    • 审计分析
    • 审计报告
    • 后续跟踪
  3. 合规性检查

    • 自动化检查
    • 手动检查
    • 持续监控
    • 定期审计
  4. 审计工具

    • 合规检查器
    • 证据收集器
    • 漏洞扫描器
    • 配置审计器
  5. 实际应用

    • 合规管理平台
    • 自动化审计
    • 整改管理
  6. 最佳实践

    • 持续合规
    • 文档完善
    • 证据保存
    • 持续改进

安全合规与审计是确保组织安全实践符合要求的重要手段,建立完善的合规管理体系对于降低风险、提高信任至关重要。

下一步

在下一教程中,我们将学习网络安全运营管理最佳实践,包括:

  • 安全运营成熟度模型

  • 关键绩效指标(KPI)

  • 持续改进方法

  • 团队建设与发展

  • 技术趋势与未来展望

继续学习网络安全运营,掌握这门重要领域的更多知识!