最近在做基线相关的入侵检测,需要检出IP异常、UA异常、频率异常、请求流量异常四类场景,搞完发现效果不错,能有效压制正常的业务docker重启导致的ip变动,测试攻击0漏报,日误报量<20,可通过动态基线自学习持续降低,整理了一下关键算法分享出来。

告警降噪部分

1. 告警风暴检测

问题分析

在动态环境中,容易出现告警风暴:

  • 容器重启导致短时间内大量IP_PROJECT_ANOMALY
  • 业务变化导致批量的UA_ANOMALY
  • 这些告警具有时间聚集性和模式相似性

解决思路

告警风暴识别:

  • 基于时间窗口分析告警密度
  • 基于模式相似性判断告警关联性
  • 识别为风暴后,启动学习会话进行抑制

学习会话管理:

  • 每种告警模式创建独立的学习会话
  • 会话期间抑制相似告警,避免重复
  • 会话结束后更新学习结果

2. 业务模式学习

问题分析

正常业务具有一定的模式性:

  • 特定时间段的访问高峰
  • 特定IP段的集中访问
  • 特定项目的访问特征

解决思路

时间模式学习:

  • 分析每个项目的访问时间分布
  • 建立时间基线,识别异常时间访问
  • 考虑工作时间、周末、节假日的差异

IP模式学习:

  • 分析项目的IP访问分布
  • 识别内网IP段和外网IP的访问模式
  • 建立IP信任度评估机制

访问模式学习:

  • 分析请求频率、间隔、User-Agent等特征
  • 建立多维度的访问模式画像
  • 识别正常的业务突发和异常的攻击行为

基线机制

1. 分层基线设计

设计思路

基线数据需要平衡稳定性和灵活性:

  • 持久化基线:长期稳定的基线数据,存储在数据库,系统重启后仍有效
  • 临时基线:快速学习的临时数据,存储在内存,用于快速适应
  • 动态基线:基于滑动窗口的实时基线,用于动态阈值调整

基线数据结构

持久化基线 = {
    项目UA白名单: {项目 → UA集合}
    IP项目映射: {IP → 项目集合}
    请求间隔统计: {(IP,项目) → 统计信息}
    IP项目计数: {IP → 最大项目数}
}

临时基线 = {
    临时IP项目映射: {IP → 项目集合}
    临时UA白名单: {项目 → UA集合}
}

动态基线 = {
    24小时滑动窗口数据
    实时统计信息
    动态阈值参数
}

2. 基线更新策略

更新时机分析

不同类型的基线需要不同的更新策略:

持久化基线更新:

  • 容器重启学习:立即更新
  • 快速基线更新:批量更新(5个更新或1分钟间隔)
  • 定期合并:将稳定的临时基线数据合并进来

临时基线更新:

  • 渐进式学习:立即更新
  • 告警抑制学习:实时更新
  • 系统重启时清空

动态基线更新:

  • 高流量:每5分钟更新
  • 中等流量:每10分钟更新
  • 低流量:每30分钟更新

数据合并策略

临时基线数据需要定期合并到持久化基线:

合并条件判断:
1. 访问频率:同一IP-项目组合 ≥ 3次访问
2. 时间跨度:学习时间 ≥ 10分钟
3. 模式稳定性:访问间隔方差 < 阈值
4. 业务合理性:IP属于内网段或已知安全IP

合并操作:
1. 数据验证:确保合并数据的安全性
2. 冲突处理:处理新旧数据的冲突
3. 质量控制:确保合并后基线质量
4. 持久化保存:更新数据库中的基线数据

具体算法(伪代码)

1. IP-项目映射检测算法

算法思路

通过多维度风险评估,区分正常的IP变化和异常的访问行为:

算法流程:

输入:access_log = (src_ip, project_name, timestamp, user_agent)
输出:alert_or_null

步骤1:基线检查
  persistent_projects ← baseline_data[src_ip]
  temp_projects ← temp_baseline[src_ip]
  known_projects ← persistent_projects ∪ temp_projects
  
步骤2:映射检查
  IF project_name ∈ known_projects THEN
    返回 null  // 已知映射,无需告警
  
步骤3:风险评估
  risk_score ← calculate_risk_factors(access_log, known_projects)
  
步骤4:告警判断
  IF risk_score > threshold THEN
    trigger_learning_mechanism(access_log, risk_score)
    返回 create_alert(src_ip, project_name, risk_score)
  ELSE
    返回 null

风险评估公式:

风险评分 = Σ(维度权重 × 维度评分)

维度评分计算:
- 首次访问评分 = has_known_projects ? 0 : 2
- IP新鲜度评分 = is_first_time_ip ? 3 : 0  
- 内网IP评分 = is_internal_ip ? -1 : 1
- 时间合理性评分 = is_business_hours ? -1 : 1
- 项目活跃度评分 = log(project_access_count) - 2
- 批量行为评分 = is_batch_behavior ? 2 : 0

最终评分 = max(1, 基础分数 + 各维度评分之和)

2. 批量行为检测算法

算法思路

分析项目级的多IP访问模式,识别正常的分布式访问:

频率异常检测:

输入:access_log, time_window = 5分钟
输出:anomaly_score

步骤1:频率统计
  recent_requests ← count_requests_in_window(src_ip, time_window)
  
步骤2:基线对比
  baseline_frequency ← get_baseline_frequency(src_ip, project_name)
  
步骤3:Z-score计算
  z_score ← (recent_requests - baseline_frequency.mean) / baseline_frequency.std
  
步骤4:异常评分映射
  IF z_score > 3 THEN
    anomaly_score ← 0.9 + 0.1 * min(z_score - 3, 5) / 5
  ELSE IF z_score > 2 THEN
    anomaly_score ← 0.7 + 0.2 * (z_score - 2)
  ELSE
    anomaly_score ← max(0, 0.5 + 0.2 * z_score)

项目级模式分析:

输入:project_name, src_ip, current_frequency
输出:pattern_analysis

步骤1:获取项目活跃IP
  active_ips ← get_active_ips(project_name, time_window = 10分钟)
  active_ips ← filter(ip | request_count[ip] ≥ 8)
  
步骤2:频率相似度计算
  similarity_scores ← []
  FOR each ip IN active_ips:
    freq_diff ← |current_frequency - frequency[ip]|
    similarity ← max(0, 1 - freq_diff / max(current_frequency, frequency[ip]))
    similarity_scores.append(similarity)
  
步骤3:模式判断
  similar_count ← count(score | score > 0.7)
  total_count ← len(active_ips)
  
  IF similar_count ≥ 2 AND similar_count / total_count > 0.5 THEN
    pattern_type ← "NORMAL_MULTI_IP_PATTERN"
  ELSE
    pattern_type ← "ANOMALY_PATTERN"

3. 容器重启学习算法

算法思路

检测短时间内的多IP异常聚集,识别容器重启模式:

聚集检测算法:

输入:project_name, src_ip, timestamp
输出:is_container_restart

步骤1:告警跟踪
  add_alert_to_tracking(project_name, src_ip, timestamp)
  
步骤2:时间窗口过滤
  recent_alerts ← get_alerts_in_window(project_name, 15分钟)
  
步骤3:IP唯一性统计
  unique_ips ← distinct(alert.src_ip for alert in recent_alerts)
  
步骤4:容器重启判断
  IF len(unique_ips) ≥ 2 THEN
    内网IP比例 ← count_internal_ips(unique_ips) / len(unique_ips)
    IF 内网IP比例 ≥ 0.8 THEN
      返回 true
  
  返回 false

批量学习机制:

输入:project_name, alert_ips
输出:learning_session

步骤1:学习会话创建
  session_key ← "CONTAINER_RESTART_" + project_name
  session ← create_learning_session(session_key, duration = 10分钟)
  
步骤2:IP映射学习
  FOR each ip IN alert_ips:
    baseline_data[ip].projects.add(project_name)
    // 直接更新持久化基线
  
步骤3:学习抑制
  FOR each ip IN alert_ips:
    add_to_suppression_list(ip, project_name, session.duration)
  
步骤4:学习统计
  record_learning_event("CONTAINER_RESTART", project_name, len(alert_ips))

4. 渐进式学习算法

算法思路

基于已知IP分析网段分布,确定合理的学习范围:

网段分析算法:

输入:project_name, trigger_ip, baseline_data
输出:learning_scope

步骤1:已知IP获取
  known_ips ← get_project_known_ips(project_name, baseline_data)
  
步骤2:网段统计
  b_segments ← extract_b_segments(known_ips)
  c_segments ← extract_c_segments(known_ips)
  
  b_segment_ratio ← len(distinct(b_segments)) / len(known_ips)
  c_segment_ratio ← len(distinct(c_segments)) / len(known_ips)
  
步骤3:学习范围确定
  trigger_b_segment ← extract_b_segment(trigger_ip)
  trigger_c_segment ← extract_c_segment(trigger_ip)
  
  IF b_segment_ratio > 0.8 AND trigger_b_segment ∈ b_segments THEN
    返回 {type: "B_SEGMENT", pattern: trigger_b_segment + ".*"}
  ELSE IF c_segment_ratio > 0.6 AND trigger_c_segment ∈ c_segments THEN
    返回 {type: "C_SEGMENT", pattern: trigger_c_segment + ".*"}
  ELSE
    返回 null  // 不适合学习

学习范围应用:

输入:learning_scope, project_name, duration = 10分钟
输出:learning_session

步骤1:学习会话创建
  session_key ← "PROGRESSIVE_" + project_name + "_" + learning_scope.type
  session ← create_learning_session(session_key, duration)
  
步骤2:IP范围学习
  FOR each new_ip matching learning_scope.pattern:
    temp_baseline[new_ip].projects.add(project_name)
    // 添加到临时基线
  
步骤3:学习监控
  session.learned_ips ← track_learned_ips(learning_scope.pattern)
  session.effectiveness ← calculate_suppression_rate()

5. 快速基线更新算法

算法思路

定期将稳定的临时基线数据合并到持久化基线:

更新条件判断:

输入:temp_baseline, persistent_baseline
输出:update_candidates

步骤1:候选数据筛选
  candidates ← []
  FOR each (ip, projects) IN temp_baseline:
    FOR each project IN projects:
      access_count ← count_accesses(ip, project)
      time_span ← get_time_span(ip, project)
      stability ← calculate_stability(ip, project)
    
      IF access_count ≥ 3 AND time_span ≥ 10分钟 AND stability < threshold THEN
        candidates.append((ip, project, access_count, stability))
  
步骤2:质量评估
  FOR each candidate IN candidates:
    safety_score ← evaluate_safety(candidate.ip, candidate.project)
    business_score ← evaluate_business_reasonableness(candidate.ip)
  
    IF safety_score > 0.6 AND business_score > 0.5 THEN
      candidate.qualified ← true
  
返回 filter(candidates, qualified = true)

批量更新机制:

输入:update_candidates, batch_size = 5
输出:update_result

步骤1:批量分组
  IF len(update_candidates) ≥ batch_size OR time_since_last_update > 1分钟 THEN
    perform_batch_update ← true
  
步骤2:数据合并
  FOR each candidate IN update_candidates:
    persistent_baseline[candidate.ip].projects.add(candidate.project)
    update_baseline_statistics(candidate.ip, candidate.project)
  
步骤3:临时基线清理
  FOR each candidate IN update_candidates:
    remove_from_temp_baseline(candidate.ip, candidate.project)
  
步骤4:更新统计
  record_update_event("FAST_BASELINE_UPDATE", len(update_candidates))

6. 动态阈值算法

算法思路

基于流量状态自适应调整检测阈值和更新间隔:

流量状态评估:

输入:recent_logs, time_window = 1小时
输出:traffic_state

步骤1:流量统计
  request_count ← count_requests_in_window(time_window)
  unique_ips ← count_unique_ips_in_window(time_window)
  unique_projects ← count_unique_projects_in_window(time_window)
  
步骤2:流量分级
  traffic_density ← request_count / time_window_minutes
  
  IF traffic_density > 500 THEN
    traffic_level ← "HIGH"
    update_interval ← 5分钟
  ELSE IF traffic_density > 100 THEN
    traffic_level ← "MEDIUM"
    update_interval ← 10分钟
  ELSE
    traffic_level ← "LOW"
    update_interval ← 30分钟
  
步骤3:阈值调整
  base_threshold ← 0.7
  IF traffic_level = "HIGH" THEN
    anomaly_threshold ← base_threshold + 0.1  // 提高阈值,减少误报
  ELSE IF traffic_level = "LOW" THEN
    anomaly_threshold ← base_threshold - 0.1  // 降低阈值,增加敏感度
  ELSE
    anomaly_threshold ← base_threshold

自适应更新间隔:

输入:current_traffic_state, learning_effectiveness
输出:optimal_interval

步骤1:基础间隔计算
  base_interval ← traffic_state.update_interval
  
步骤2:学习效果调整
  IF learning_effectiveness > 0.8 THEN
    interval_multiplier ← 1.2  // 学习效果好,可以延长间隔
  ELSE IF learning_effectiveness < 0.5 THEN
    interval_multiplier ← 0.8  // 学习效果差,缩短间隔
  ELSE
    interval_multiplier ← 1.0
  
步骤3:最终间隔
  optimal_interval ← base_interval * interval_multiplier
  optimal_interval ← clamp(optimal_interval, 1分钟, 60分钟)
如果觉得我的文章对你有用,请随意赞赏