Module 开发
Module 用于自动消费 Redis Stream 中的原始告警,并把它们转换为 ASP 工作台中的 Case / Alert / Artifact。
脚本位置
text
backend/modules/Module 文件由后端自动发现。每个文件需要定义一个 Module 类,继承 BaseModule,并设置 STREAM_NAME。
基本结构
python
from apps.agentic.runtime.base import BaseModule
class Module(BaseModule):
NAME = "Human readable name"
DESC = "Short description"
STREAM_NAME = "SIEM-Rule-Or-Stream-Name"
THREAD_NUM = 1
def run(self, message):
# message 是从 Redis Stream 读取到的 raw alert dict
...运行方式
Module worker 会扫描 backend\modules\*.py,发现其中继承 BaseModule 的 Module 类,然后按 STREAM_NAME 消费 Redis Stream。
bash
python manage.py run_agentic_module_worker默认 consumer group 为 agentic-modules。Webhook 和 ELK Index Action 写入的 Stream 名称需要与 Module 的 STREAM_NAME 对应。
处理流程
- 读取 raw alert。
- 解析事件时间,建议使用
parse_event_time()。 - 提取关键字段和 Artifact,例如 IP、域名、账号、主机、文件、进程。
- 设计
correlation_uid,建议使用generate_correlation_uid()。 - 使用
create_alert_with_context()创建或关联 Case、Alert、Artifact 和 Enrichment。 - 必要时触发 Case AI 分析。
当前示例
aws_iam_privilege_escalation_attach_user_policy.py- 处理 AWS CloudTrail
AttachUserPolicy高危 IAM 行为。 - 示例重点:云账号、主体身份、目标用户、权限策略和来源 IP。
- 处理 AWS CloudTrail
edr_vssadmin_delete_shadows.py- 处理 EDR 中
vssadmin delete shadows类勒索前置行为。 - 示例重点:主机、用户、进程、命令行和文件哈希。
- 处理 EDR 中
mail_user_report_phishing.py- 处理用户上报钓鱼邮件。
- 示例重点:发件人、收件人、上报人、主题、URL、域名和附件。
推荐数据落点
Module 应尽量输出或关联:
- Case:同一事件的处置入口。
- Alert:检测上下文和原始日志。
- Artifact:IOC、账号、主机、文件、进程等实体。
- Enrichment:可复用的外部上下文。
设计建议
STREAM_NAME与 SIEM Rule / Redis Stream 名称保持一致。- 不要把随机字段作为聚合键,例如 request id、session id、精确时间戳。
- 一个 Case 应代表一次可调查、可处置的安全事件,而不是单条日志。
- Artifact 尽量拆成可调查的原子实体。
- raw alert 原文应保存在 Alert 的
raw_data中,未映射但有价值的字段放入unmapped。 - 可以使用 Module Creator Skill 辅助生成 Module 草案。