最近在做基线相关的入侵检测,需要检出IP异常、UA异常、频率异常、请求流量异常四类场景,搞完发现效果不错,能有效压制正常的业务docker重启导致的ip变动,测试攻击0漏报,日误报量<20,可通过动态基线自学习持续降低,整理了一下关键算法分享出来。
告警降噪部分
1. 告警风暴检测
问题分析
在动态环境中,容易出现告警风暴:
- 容器重启导致短时间内大量IP_PROJECT_ANOMALY
- 业务变化导致批量的UA_ANOMALY
- 这些告警具有时间聚集性和模式相似性
解决思路
告警风暴识别:
- 基于时间窗口分析告警密度
- 基于模式相似性判断告警关联性
- 识别为风暴后,启动学习会话进行抑制
学习会话管理:
- 每种告警模式创建独立的学习会话
- 会话期间抑制相似告警,避免重复
- 会话结束后更新学习结果
2. 业务模式学习
问题分析
正常业务具有一定的模式性:
- 特定时间段的访问高峰
- 特定IP段的集中访问
- 特定项目的访问特征
解决思路
时间模式学习:
- 分析每个项目的访问时间分布
- 建立时间基线,识别异常时间访问
- 考虑工作时间、周末、节假日的差异
IP模式学习:
- 分析项目的IP访问分布
- 识别内网IP段和外网IP的访问模式
- 建立IP信任度评估机制
访问模式学习:
- 分析请求频率、间隔、User-Agent等特征
- 建立多维度的访问模式画像
- 识别正常的业务突发和异常的攻击行为
基线机制
1. 分层基线设计
设计思路
基线数据需要平衡稳定性和灵活性:
- 持久化基线:长期稳定的基线数据,存储在数据库,系统重启后仍有效
- 临时基线:快速学习的临时数据,存储在内存,用于快速适应
- 动态基线:基于滑动窗口的实时基线,用于动态阈值调整
基线数据结构
持久化基线 = {
项目UA白名单: {项目 → UA集合}
IP项目映射: {IP → 项目集合}
请求间隔统计: {(IP,项目) → 统计信息}
IP项目计数: {IP → 最大项目数}
}
临时基线 = {
临时IP项目映射: {IP → 项目集合}
临时UA白名单: {项目 → UA集合}
}
动态基线 = {
24小时滑动窗口数据
实时统计信息
动态阈值参数
}
2. 基线更新策略
更新时机分析
不同类型的基线需要不同的更新策略:
持久化基线更新:
- 容器重启学习:立即更新
- 快速基线更新:批量更新(5个更新或1分钟间隔)
- 定期合并:将稳定的临时基线数据合并进来
临时基线更新:
- 渐进式学习:立即更新
- 告警抑制学习:实时更新
- 系统重启时清空
动态基线更新:
- 高流量:每5分钟更新
- 中等流量:每10分钟更新
- 低流量:每30分钟更新
数据合并策略
临时基线数据需要定期合并到持久化基线:
合并条件判断:
1. 访问频率:同一IP-项目组合 ≥ 3次访问
2. 时间跨度:学习时间 ≥ 10分钟
3. 模式稳定性:访问间隔方差 < 阈值
4. 业务合理性:IP属于内网段或已知安全IP
合并操作:
1. 数据验证:确保合并数据的安全性
2. 冲突处理:处理新旧数据的冲突
3. 质量控制:确保合并后基线质量
4. 持久化保存:更新数据库中的基线数据
具体算法(伪代码)
1. IP-项目映射检测算法
算法思路
通过多维度风险评估,区分正常的IP变化和异常的访问行为:
算法流程:
输入:access_log = (src_ip, project_name, timestamp, user_agent)
输出:alert_or_null
步骤1:基线检查
persistent_projects ← baseline_data[src_ip]
temp_projects ← temp_baseline[src_ip]
known_projects ← persistent_projects ∪ temp_projects
步骤2:映射检查
IF project_name ∈ known_projects THEN
返回 null // 已知映射,无需告警
步骤3:风险评估
risk_score ← calculate_risk_factors(access_log, known_projects)
步骤4:告警判断
IF risk_score > threshold THEN
trigger_learning_mechanism(access_log, risk_score)
返回 create_alert(src_ip, project_name, risk_score)
ELSE
返回 null
风险评估公式:
风险评分 = Σ(维度权重 × 维度评分)
维度评分计算:
- 首次访问评分 = has_known_projects ? 0 : 2
- IP新鲜度评分 = is_first_time_ip ? 3 : 0
- 内网IP评分 = is_internal_ip ? -1 : 1
- 时间合理性评分 = is_business_hours ? -1 : 1
- 项目活跃度评分 = log(project_access_count) - 2
- 批量行为评分 = is_batch_behavior ? 2 : 0
最终评分 = max(1, 基础分数 + 各维度评分之和)
2. 批量行为检测算法
算法思路
分析项目级的多IP访问模式,识别正常的分布式访问:
频率异常检测:
输入:access_log, time_window = 5分钟
输出:anomaly_score
步骤1:频率统计
recent_requests ← count_requests_in_window(src_ip, time_window)
步骤2:基线对比
baseline_frequency ← get_baseline_frequency(src_ip, project_name)
步骤3:Z-score计算
z_score ← (recent_requests - baseline_frequency.mean) / baseline_frequency.std
步骤4:异常评分映射
IF z_score > 3 THEN
anomaly_score ← 0.9 + 0.1 * min(z_score - 3, 5) / 5
ELSE IF z_score > 2 THEN
anomaly_score ← 0.7 + 0.2 * (z_score - 2)
ELSE
anomaly_score ← max(0, 0.5 + 0.2 * z_score)
项目级模式分析:
输入:project_name, src_ip, current_frequency
输出:pattern_analysis
步骤1:获取项目活跃IP
active_ips ← get_active_ips(project_name, time_window = 10分钟)
active_ips ← filter(ip | request_count[ip] ≥ 8)
步骤2:频率相似度计算
similarity_scores ← []
FOR each ip IN active_ips:
freq_diff ← |current_frequency - frequency[ip]|
similarity ← max(0, 1 - freq_diff / max(current_frequency, frequency[ip]))
similarity_scores.append(similarity)
步骤3:模式判断
similar_count ← count(score | score > 0.7)
total_count ← len(active_ips)
IF similar_count ≥ 2 AND similar_count / total_count > 0.5 THEN
pattern_type ← "NORMAL_MULTI_IP_PATTERN"
ELSE
pattern_type ← "ANOMALY_PATTERN"
3. 容器重启学习算法
算法思路
检测短时间内的多IP异常聚集,识别容器重启模式:
聚集检测算法:
输入:project_name, src_ip, timestamp
输出:is_container_restart
步骤1:告警跟踪
add_alert_to_tracking(project_name, src_ip, timestamp)
步骤2:时间窗口过滤
recent_alerts ← get_alerts_in_window(project_name, 15分钟)
步骤3:IP唯一性统计
unique_ips ← distinct(alert.src_ip for alert in recent_alerts)
步骤4:容器重启判断
IF len(unique_ips) ≥ 2 THEN
内网IP比例 ← count_internal_ips(unique_ips) / len(unique_ips)
IF 内网IP比例 ≥ 0.8 THEN
返回 true
返回 false
批量学习机制:
输入:project_name, alert_ips
输出:learning_session
步骤1:学习会话创建
session_key ← "CONTAINER_RESTART_" + project_name
session ← create_learning_session(session_key, duration = 10分钟)
步骤2:IP映射学习
FOR each ip IN alert_ips:
baseline_data[ip].projects.add(project_name)
// 直接更新持久化基线
步骤3:学习抑制
FOR each ip IN alert_ips:
add_to_suppression_list(ip, project_name, session.duration)
步骤4:学习统计
record_learning_event("CONTAINER_RESTART", project_name, len(alert_ips))
4. 渐进式学习算法
算法思路
基于已知IP分析网段分布,确定合理的学习范围:
网段分析算法:
输入:project_name, trigger_ip, baseline_data
输出:learning_scope
步骤1:已知IP获取
known_ips ← get_project_known_ips(project_name, baseline_data)
步骤2:网段统计
b_segments ← extract_b_segments(known_ips)
c_segments ← extract_c_segments(known_ips)
b_segment_ratio ← len(distinct(b_segments)) / len(known_ips)
c_segment_ratio ← len(distinct(c_segments)) / len(known_ips)
步骤3:学习范围确定
trigger_b_segment ← extract_b_segment(trigger_ip)
trigger_c_segment ← extract_c_segment(trigger_ip)
IF b_segment_ratio > 0.8 AND trigger_b_segment ∈ b_segments THEN
返回 {type: "B_SEGMENT", pattern: trigger_b_segment + ".*"}
ELSE IF c_segment_ratio > 0.6 AND trigger_c_segment ∈ c_segments THEN
返回 {type: "C_SEGMENT", pattern: trigger_c_segment + ".*"}
ELSE
返回 null // 不适合学习
学习范围应用:
输入:learning_scope, project_name, duration = 10分钟
输出:learning_session
步骤1:学习会话创建
session_key ← "PROGRESSIVE_" + project_name + "_" + learning_scope.type
session ← create_learning_session(session_key, duration)
步骤2:IP范围学习
FOR each new_ip matching learning_scope.pattern:
temp_baseline[new_ip].projects.add(project_name)
// 添加到临时基线
步骤3:学习监控
session.learned_ips ← track_learned_ips(learning_scope.pattern)
session.effectiveness ← calculate_suppression_rate()
5. 快速基线更新算法
算法思路
定期将稳定的临时基线数据合并到持久化基线:
更新条件判断:
输入:temp_baseline, persistent_baseline
输出:update_candidates
步骤1:候选数据筛选
candidates ← []
FOR each (ip, projects) IN temp_baseline:
FOR each project IN projects:
access_count ← count_accesses(ip, project)
time_span ← get_time_span(ip, project)
stability ← calculate_stability(ip, project)
IF access_count ≥ 3 AND time_span ≥ 10分钟 AND stability < threshold THEN
candidates.append((ip, project, access_count, stability))
步骤2:质量评估
FOR each candidate IN candidates:
safety_score ← evaluate_safety(candidate.ip, candidate.project)
business_score ← evaluate_business_reasonableness(candidate.ip)
IF safety_score > 0.6 AND business_score > 0.5 THEN
candidate.qualified ← true
返回 filter(candidates, qualified = true)
批量更新机制:
输入:update_candidates, batch_size = 5
输出:update_result
步骤1:批量分组
IF len(update_candidates) ≥ batch_size OR time_since_last_update > 1分钟 THEN
perform_batch_update ← true
步骤2:数据合并
FOR each candidate IN update_candidates:
persistent_baseline[candidate.ip].projects.add(candidate.project)
update_baseline_statistics(candidate.ip, candidate.project)
步骤3:临时基线清理
FOR each candidate IN update_candidates:
remove_from_temp_baseline(candidate.ip, candidate.project)
步骤4:更新统计
record_update_event("FAST_BASELINE_UPDATE", len(update_candidates))
6. 动态阈值算法
算法思路
基于流量状态自适应调整检测阈值和更新间隔:
流量状态评估:
输入:recent_logs, time_window = 1小时
输出:traffic_state
步骤1:流量统计
request_count ← count_requests_in_window(time_window)
unique_ips ← count_unique_ips_in_window(time_window)
unique_projects ← count_unique_projects_in_window(time_window)
步骤2:流量分级
traffic_density ← request_count / time_window_minutes
IF traffic_density > 500 THEN
traffic_level ← "HIGH"
update_interval ← 5分钟
ELSE IF traffic_density > 100 THEN
traffic_level ← "MEDIUM"
update_interval ← 10分钟
ELSE
traffic_level ← "LOW"
update_interval ← 30分钟
步骤3:阈值调整
base_threshold ← 0.7
IF traffic_level = "HIGH" THEN
anomaly_threshold ← base_threshold + 0.1 // 提高阈值,减少误报
ELSE IF traffic_level = "LOW" THEN
anomaly_threshold ← base_threshold - 0.1 // 降低阈值,增加敏感度
ELSE
anomaly_threshold ← base_threshold
自适应更新间隔:
输入:current_traffic_state, learning_effectiveness
输出:optimal_interval
步骤1:基础间隔计算
base_interval ← traffic_state.update_interval
步骤2:学习效果调整
IF learning_effectiveness > 0.8 THEN
interval_multiplier ← 1.2 // 学习效果好,可以延长间隔
ELSE IF learning_effectiveness < 0.5 THEN
interval_multiplier ← 0.8 // 学习效果差,缩短间隔
ELSE
interval_multiplier ← 1.0
步骤3:最终间隔
optimal_interval ← base_interval * interval_multiplier
optimal_interval ← clamp(optimal_interval, 1分钟, 60分钟)