<a href="https://discord.gg/YSYfHMbfZQ" rel="nofollow"><img src="https://camo.githubusercontent.com/7c91d085a11cd48b90ec132c430500e7c07633327afcbe07209476b65f23d84d/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f727573742d2532333341334233432e7376673f6c6162656c3d6275696c742077697468266c6f676f3d72757374266c6f676f436f6c6f723d66666666666626636f6c6f723d423135313345266c6162656c436f6c6f723d306431313137" data-canonical-src="https://img.shields.io/badge/rust-%233A3B3C.svg?label=built with&logo=rust&logoColor=ffffff&color=B1513E&labelColor=0d1117" style="max-width: 100%;"></a>
<a href="#"><img src="https://camo.githubusercontent.com/063ca0ed69047380b66163b1df4617edca294bdc85f4c79a7b53bd8649c0fddd/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f6465706c6f7973253230746f2d6177732d2532334646393930302e7376673f6c6f676f3d616d617a6f6e2d617773266c6f676f436f6c6f723d7768697465266c6162656c436f6c6f723d323332463345" data-canonical-src="https://img.shields.io/badge/deploys%20to-aws-%23FF9900.svg?logo=amazon-aws&logoColor=white&labelColor=232F3E" style="max-width: 100%;"></a>
Matano 开源安全数据湖是一个开源云原生安全数据湖,专为 AWS 上的安全团队构建。
笔记
Matano 为完整的企业安全运营平台提供商业托管云 SIEM。了解更多。
- 安全数据湖:将非结构化安全日志规范化到您的 AWS 账户中的结构化实时数据湖中。
- 收集所有日志:开箱即用,集成50 多个安全日志源,并且可以轻松使用自定义源进行扩展。
- 检测即代码:使用 Python 构建实时检测代码。支持将Sigma检测自动导入 Matano。
- 日志转换管道:支持自定义 VRL(矢量重映射语言)脚本,以便在提取日志时解析、丰富、规范化和转换日志,而无需管理任何服务器。
- 无供应商锁定:使用开放表格式(Apache Iceberg)和开放模式标准(ECS),让您以与供应商无关的格式完全拥有您的安全数据。
- 自带分析:直接从任何与 Iceberg 兼容的引擎(AWS Athena、Snowflake、Spark、Trino 等)查询您的安全湖,而无需复制数据。
- 无服务器:完全无服务器,专为 AWS 设计,专注于实现高规模、低成本和零操作。
- 降低 SIEM 成本。
- 使用安全数据湖增强您的 SIEM,以便在调查期间获得更多背景信息。
- 使用 Python 编写检测代码来检测可疑行为并创建情境化警报。
- 与 ECS 兼容的 ELK / Elastic Security 堆栈无服务器替代品。
- 亚马逊AWS CloudTrail
- AWS Route53
- AWS VPC 流
- AWS Config
- AWS ELB
- Amazon S3 服务器访问
- Amazon S3 库存报告
- 亚马逊检查员
- 亚马逊 WAF
- Cloudflare
- 人群打击
- 双人
- 奥克塔
- GitHub
- Google Workspace
- 办公室 365
- 斯尼克
- 苏里卡塔
- 泽克
- 自定义🔧
- Amazon Athena(默认)
- 雪花(预览)
- 火花
- 特里诺
- BigQuery Omni(BigLake)
- 德雷米奥
安装 matano CLI 将 Matano 部署到您的 AWS 账户并管理您的部署。
Linux
curl -OL https://github.com/matanolabs/matano/releases/download/nightly/matano-linux-x64.sh chmod +x matano-linux-x64.sh sudo ./matano-linux-x64.sh
苹果系统
curl -OL https://github.com/matanolabs/matano/releases/download/nightly/matano-macos-x64.sh chmod +x matano-macos-x64.sh sudo ./matano-macos-x64.sh
首先,运行matano init
命令。
- 确保您的环境中(或 AWS CLI 配置文件中)有 AWS 凭证。
- 交互式 CLI 向导将引导您完成入门步骤,为您生成初始Matano 目录、初始化您的 AWS 账户并部署到您的 AWS 账户中。
- 初始部署需要几分钟。
初始化后,您的Matano 目录将用于控制和管理项目中的所有资源,例如日志源、检测和其他配置。其结构如下:
➜ example-matano-dir git:(main) tree ├── detections │ └── aws_root_credentials │ ├── detect.py │ └── detection.yml ├── log_sources │ ├── cloudtrail │ │ ├── log_source.yml │ │ └── tables │ │ └── default.yml │ └── zeek │ ├── log_source.yml │ └── tables │ └── dns.yml ├── matano.config.yml └── matano.context.json
当加入新的日志源或创作检测时,可以matano deploy
从项目中的任何位置运行以将更改部署到您的帐户。
向量重映射语言 (VRL)允许您轻松加入自定义日志源,并鼓励您根据Elastic Common Schema (ECS)对字段进行规范化,以便在您的安全数据湖中实现对 IOC 的增强透视和批量搜索。
用户可以定义自定义 VRL 程序来解析和转换非结构化日志,因为它们是通过日志源支持的机制之一(例如 S3、SQS)提取的。
VRL 是一种面向表达式的语言,旨在以安全高效的方式转换可观察性数据(例如日志)。它具有简单的语法和一组丰富的内置函数,专门针对可观察性用例量身定制。
让我们看一个简单的例子。假设您正在处理如下 HTTP 日志事件:
{ "line": "{\"status\":200,\"srcIpAddress\":\"1.1.1.1\",\"message\":\"SUCCESS\",\"username\":\"ub40fan4life\"}" }
您想要将这些更改应用到每个事件:
- 将原始
line
字符串解析为 JSON,并将字段展开到顶层 - 重命名
srcIpAddress
为source.ip
ECS 字段 - 移除
username
字段 - 转换
message
为小写
将此 VRL 程序添加到您的日志源作为一个transform
步骤将完成所有这些:
transform: | . = object!(parse_json!(string!(.json.line))) .source.ip = del(.srcIpAddress) del(.username) .message = downcase(string!(.message)) schema: ecs_field_names: - source.ip - http.status
由此产生的事件🎉:
{ "message": "success", "status": 200, "source": { "ip": "1.1.1.1" } }
使用检测来定义可以对安全日志中的威胁发出警报的规则。检测是一个 Python 程序,它使用来自日志源的数据实时调用并可以创建警报。
def detect(record): return ( record.deepget("event.action") == "CreateInstanceExportTask" and record.deepget("event.provider") == "ec2.amazonaws.com" and record.deepget("event.outcome") == "failure" )
def detect(r): return ( "authentication" in r.deepget("event.category", []) and r.deepget("event.outcome") == "failure" )def title(r): return f"Multiple failed logins from {r.deepget('user.full_name')} - {r.deepget('source.ip')}"
def dedupe(r): return r.deepget("source.ip")
--- tables: - aws_cloudtrail - okta_system - o365_audit alert: severity: medium threshold: 5 deduplication_window_minutes: 15 destinations: - slack_my_team
from detection import remotecache# a cache of user -> ip[] user_to_ips = remotecache("user_ip")
def detect(record): if ( record.deepget("event.action") == "ConsoleLogin" and record.deepget("event.outcome") == "success" ): # A unique key on the user name user = record.deepget("user.name")
<span class="pl-s1">existing_ips</span> <span class="pl-c1">=</span> <span class="pl-s1">user_to_ips</span>[<span class="pl-s1">user</span>] <span class="pl-c1">or</span> [] <span class="pl-s1">updated_ips</span> <span class="pl-c1">=</span> <span class="pl-s1">user_to_ips</span>.<span class="pl-en">add_to_string_set</span>( <span class="pl-s1">user</span>, <span class="pl-s1">record</span>.<span class="pl-en">deepget</span>(<span class="pl-s">"source.ip"</span>) ) <span class="pl-c"># Alert on new IPs</span> <span class="pl-s1">new_ips</span> <span class="pl-c1">=</span> <span class="pl-en">set</span>(<span class="pl-s1">updated_ips</span>) <span class="pl-c1">-</span> <span class="pl-en">set</span>(<span class="pl-s1">existing_ips</span>) <span class="pl-k">if</span> <span class="pl-s1">existing_ips</span> <span class="pl-c1">and</span> <span class="pl-s1">new_ips</span>: <span class="pl-k">return</span> <span class="pl-c1">True</span></pre><div class="zeroclipboard-container">
所有警报都会自动存储在名为 的 Matano 表中matano_alerts
。警报和规则匹配已规范化为 ECS,并包含有关触发规则匹配的原始事件的上下文以及警报和规则数据。
示例查询
汇总上周激活的警报(超过阈值)
select matano.alert.id as alert_id, matano.alert.rule.name as rule_name, max(matano.alert.title) as title, count(*) as match_count, min(matano.alert.first_matched_at) as first_matched_at, max(ts) as last_matched_at, array_distinct(flatten(array_agg(related.ip))) as related_ip, array_distinct(flatten(array_agg(related.user))) as related_user, array_distinct(flatten(array_agg(related.hosts))) as related_hosts, array_distinct(flatten(array_agg(related.hash))) as related_hash from matano_alerts where matano.alert.first_matched_at > (current_timestamp - interval '7' day) and matano.alert.activated = true group by matano.alert.rule.name, matano.alert.id order by last_matched_at desc
您可以将警报发送到外部系统。您可以使用警报 SNS 主题将警报发送到电子邮件、Slack 和其他服务。
有关使用方面的一般帮助,请参阅官方文档。 如需更多帮助,请随时使用以下渠道之一提出问题:
感谢这些出色的人(表情符号键):
沙伊克·艾哈迈德 🚧 |
三花 🚧 |
凯·埃雷拉 💻🤔🚇 |
公羊 🐛🤔📓 |
扎克·莫雷 🤔🐛📓 |
marcin-kwasnicki 📓 🐛 🤔 |
格雷格·拉普 🐛🤔 |
马修 X.埃科诺莫 🐛 |
贾瑞特·雷姆 🐛 |
马特·弗朗茨 🐛 |
弗朗西斯科·费恩齐 🤔 |
Nishant Das Patnaik🤔 |
蒂姆·奥金 🤔🐛💻 |
Francesco R.🐛 |
约书亚·索伦森 💻📖 |
克里斯·史密斯 💻 |
本项目遵循全贡献者规范。欢迎任何形式的贡献!