MS Sentinel的資料正規化
Microsoft Sentinel 中的資料正規化允許跨多個資料來源實現資料標準化。
甚麼是資料正規化
Microsoft Sentinel 從許多來源取得資料。一起使用各種資料類型和table需要我們了解每種資料類型和table,並為每種類型或模式的分析規則、工作簿(workbook)和搜尋查詢編寫和使用獨特的資料集。
有時,即使資料類型共用通用元素(例如防火牆設備),也需要單獨的規則、工作簿和查詢。在調查和搜尋過程中關聯不同類型的數據也可能具有挑戰性。
ASIM(Advanced Security Information Model)是位於這些不同來源和使用者之間的一層。 ASIM 遵循穩健性(robustness)原則:「發送時嚴格,接受時靈活」。當穩健性原則作為設計模式時,ASIM 會將 Microsoft Sentinel 的不一致且難以使用的來源監控資料轉換為使用者友善的資料。
一般性ASIM的使用
ASIM 透過提供以下功能,為在統一、標準化視圖中處理各種來源提供了無縫體驗:
- Cross source detection。標準化分析規則可跨來源、地端和雲端運行,並偵測諸如暴力破解或跨系統(包括 Okta、AWS 和 Azure)的不可能傳輸等攻擊。
- Source agnostic content。使用 ASIM 的內建和自訂內容的覆蓋範圍會自動擴展到支援 ASIM 的任何來源,即使該來源是在內容建立後新增的。例如,流程事件分析(process event analytics)支援客戶可用於引入資料的任何來源,例如 Microsoft Defender for Endpoint、Windows Events 和 Sysmon。
- 在內建分析中支援我們的自訂來源
- 易於使用。資安人員學習 ASIM 後,編寫查詢會更簡單,因為欄位名稱始終相同。
ASIM 與OSSEM(Open Source Security Events Metadata)通用資訊模型保持一致,允許跨normalized tables進行可預測的實體關聯。
OSSEM 是一個社群主導的專案,主要關注來自不同資料來源和作業系統的安全事件日誌的文件化和標準化。該專案還提供了一個通用資訊模型(CIM-Common Information Model),可供資料工程師在資料標準化流程中使用,以允許資安人員跨不同資料來源查詢和分析資料。
ASIM components
下圖顯示如何將非標準化資料轉換為標準化內容並在 Microsoft Sentinel 中使用。例如,可以從自訂的、特定於產品的非標準化表開始,然後使用Parser和標準化架構將該table轉換為標準化資料。在 Microsoft 和自訂分析、規則、工作簿、查詢等中使用標準化資料。
ASIM 包括以下組件:
- Normalized schemas
涵蓋建構統一功能時可以使用的可預測事件類型的標準集。每個架構定義了表示事件的欄位、規範化的column naming約定以及欄位值的標準格式。 - Parsers
使用 KQL 函數將現有資料映射到normalized schemas。 Microsoft Sentinel 提供了許多開箱即用的 ASIM Parsers。可以從 Microsoft Sentinel GitHub 儲存庫部署更多解析器以及可修改的內建解析器版本。 - Content for each normalized schema
包括分析規則、工作簿、搜尋查詢等。每個 normalized schema的內容適用於任何正規化資料,無需建立特定於來源的內容。
ASIM術語
- Reporting device
將記錄傳送到 Microsoft Sentinel 的系統。該系統可能不是所發送記錄的主要系統。 - Record
從回報設備發送的資料單元。記錄通常稱為日誌(log)、事件(event)或告警(alert),但也可以是其他類型的資料。 - Content或 Content Item
與 Microsoft Sentinel 一起使用的不同的、可自訂的或使用者建立的工件。這些工件的範例包括分析規則、搜尋查詢和工作簿。content item就是這樣的工件之一。
檢視ASIM Parsers
檢視 Microsoft Sentinel 環境中的 ASIM 功能。
- 導覽至 Azure 網站中的 Microsoft Sentinel 工作區
- 從左側導覽中選擇日誌
- 展開左側的架構和篩選器窗格(如果需要,請使用省略號顯示所有工具)
- 選擇功能
- 展開 Microsoft Sentinel
我們將看到以 ASim 和 Im 開頭的函數。
使用ASIM Parsers
在 Microsoft Sentinel 中,解析和正規化(parsing and normalizing)發生在Query中。Parsers建構為 KQL 使用者定義的函數,可將現有table(例如 CommonSecurityLog、自訂日誌表或 Syslog)中的資料轉換為正規化架構。
使用者在查詢中使用 ASIM 解析器(parsers)而不是table names來以正規化格式查看資料,並在Query中包含與架構相關的所有資料。
內建 ASIM 解析器和工作區部署的解析器
許多 ASIM 解析器都是內建的,並且可以在每個 Microsoft Sentinel 工作區中開箱即用。 ASIM 也支援使用 ARM template或手動將解析器從 GitHub 部署到特定工作區。開箱即用的解析器和工作區部署的解析器在功能上是一樣的,但命名約定略有不同,允許兩個解析器集共存於同一個 Microsoft Sentinel 工作區。
每種方法都比另一種方法具有優勢:
對於可以使用內建的解析器的schema,就直接用內建的解析器。
ASIM 包含兩個層級的解析器:統一解析器(unifying parser)和特定於來源的解析器(source-specific parsers)。使用者通常會對相關模式使用統一的解析器,以確保查詢與該模式相關的所有資料。統一解析器依序呼叫特定於來源的解析器來執行實際的解析和正規化,這對於每個來源都是特定的。
對於內建解析器,統一解析器名稱是 _Im_Schema,對於工作區部署的解析器,統一解析器名稱是 imSchema。其中 Schema 代表它所服務的特定模式。特定於來源的解析器也可以獨立使用。例如,在特定於 Infoblox 的工作簿中,使用 vimDnsInfobloxNIOS 特定於來源的解析器。
統一解析器(Unifying parsers)
在Query中使用 ASIM 時,使用統一解析器來組合所有來源,標準化為相同架構,並使用標準化欄位查詢它們。
例如,下面Query的範例使用內建統一 DNS 解析器透過 ResponseCodeName、SrcIpAddr 和 TimeGenerated 標準化欄位來查詢 DNS 事件:
_Im_Dns(starttime=ago(1d), responsecodename='NXDOMAIN')
| summarize count() by SrcIpAddr, bin(TimeGenerated,15m)
此範例使用過濾參數來提高 ASIM 效能。沒有過濾參數的相同範例如下所示:
_Im_Dns
| where TimeGenerated > ago(1d)
| where ResponseCodeName =~ "NXDOMAIN"
| summarize count() by SrcIpAddr, bin(TimeGenerated,15m)
下表列出了可用的統一解析器:
使用參數來優化解析器
使用解析器可能會影響查詢效能,主要是因為解析後過濾結果。因此,許多解析器都有過濾參數,可以在解析之前進行過濾並增強查詢效能。透過查詢最佳化和預先過濾作業,與根本不使用正規化相比,ASIM 解析器通常可以提供更好的效能。
呼叫解析器時,務必透過新增一個或多個命名參數來使用過濾參數,以確保 ASIM 解析器的最佳效能。
每個模式都有一組標準的過濾參數,記錄在相關Schema文件中。過濾參數完全是選用的。以下模式支援過濾參數:
- Authentication
- DNS
- Network Session
- Web Session
每個支援過濾參數的模式都至少支援 starttime 和 enttime 參數,並且使用它們通常對於最佳化效能至關重要。
參數化的KQL函數
呼叫 KQL 函數時,可以提供一組參數。這是建立 ASIM 解析器的一個重要概念,因為它允許在傳回結果之前使用動態值過濾函數結果。
首先,導覽至 Microsoft Sentinel 工作區中的日誌。以下範例函數傳回 Azure Activity log中自特定日期以來且與特定類別相符的所有事件。從使用hardcoded values的Query開始。這驗證了Query是否按預期工作。
AzureActivity
| where CategoryValue == "Administrative"
| where TimeGenerated > todatetime("2024/01/05 5:40:01.032 PM")
接下來,將hardcoded values替換為參數名稱,然後選擇“儲存”,然後選擇“另存為函數”來儲存函數。
AzureActivity
| where CategoryValue == CategoryParam
| where TimeGenerated > DateParam
輸入函數名稱為 AzureActivityByCategory 然後建立兩個參數:
建立一個New Query。然後輸入:
AzureActivityByCategory("Administrative", todatetime("2024/01/05 5:40:01.032 PM"))
建立ASIM 解析器
ASIM 使用者在查詢中使用統一解析器而不是table name,以正規化格式查看資料,並在Query中包含與schema相關的所有資料。反過來,統一解析器使用特定於來源的解析器來處理每個來源的特定細節。
Microsoft Sentinel 為許多資料來源提供內建的、特定於來源的解析器。在以下情況下,可能需要修改或開發這些特定於來源的解析器:
- 當我們的設備提供適合 ASIM 架構的事件,但 Microsoft Sentinel 中不提供適用於我們的設備和相關Schema的來源特定解析器。
- 當 ASIM 來源特定解析器可用於我們的設備,但您的設備以與 ASIM 解析器預期不同的方法或格式傳送事件。例如:來源設備可能配置為以非標準方式傳送事件。
解析器的客製化流程
以下工作流程描述了開發客製的 ASIM、特定於來源的解析器的進階步驟:
- Collect sample logs.
- Identify the schemas or schemas that the events sent from the source represent.
- Map the source event fields to the identified schema or schemas.
- Develop one or more ASIM parsers for your source. You’ll need to develop a filtering parser and a parameter-less parser for each schema relevant to the source.
- Test your parser.
- Deploy the parsers into your Microsoft Sentinel workspaces.
- Update the relevant ASIM unifying parser to reference the new custom parser
- You might also want to contribute your parsers to the primary ASIM distribution. Contributed parsers may also be made available in all workspaces as built-in parsers.
收集sample logs
要建立有效的 ASIM 解析器,需要一組具有代表性的日誌,這在大多數情況下需要設定來源系統並將其連接到 Microsoft Sentinel。如果沒有可用的來源設備,雲端按量付費服務可讓我們部署許多設備進行開發和測試。
此外,找尋日誌的供應商文件和範例可以透過確保廣泛的日誌格式覆蓋來幫助加速開發並減少錯誤。
一組有代表性的日誌應包括:
- 具有不同事件結果的事件。
- 具有不同回應操作的事件。
- 使用者名稱、主機名稱和 ID 以及其他需要value normalization的欄位的不同格式。
Mapping
在開發解析器之前,要將來源事件或事件中的可用資訊對應到我們所確定的架構:
- 對應所有必填欄位(fields),最好也對應推薦欄位(recommended fields)。
- 嘗試將來源中可用的任何資訊對應到標準化欄位。如果無法作為所選架構的一部分使用,考慮對應到其他架構中可用的欄位。
- 將來源欄位的值對應到 ASIM 允許的標準化值。原始值儲存在單獨的欄位中,例如 EventOriginalResultDetails。
開發解析器
為每個相關Schema開發過濾和無參數解析器。
自訂解析器是在 Microsoft Sentinel 日誌頁面中開發的 KQL 查詢。解析器查詢(parser query)由三個部分組成:Filter > Parse > Prepare fields
過濾相關紀錄
在許多情況下,Microsoft Sentinel 中的table包含多種類型的事件。例如:
- Syslog table包含來自多個來源的資料。
- Custom tables可以包括提供多種事件類型並且可以適合各種schema的單一來源的資訊。
因此,解析器應該首先僅過濾與目標Schema相關的記錄。
KQL 中的篩選(Filtering)是使用 where operator完成的。例如,Sysmon event 1 報告process的建立,因此正規化為 ProcessEvent schema。 Sysmon event 1 事件是Event table的一部分,因此將使用下列篩選器:
Event | where Source == "Microsoft-Windows-Sysmon" and EventID == 1
使用監視清單(Watchlist)按來源類型過濾
在某些情況下,事件本身不包含允許過濾特定來源類型的資訊。
例如,Infoblox DNS 事件會作為 Syslog 訊息發送,很難與其他來源發送的 Syslog 訊息區分開來。在這種情況下,解析器依賴定義相關事件的來源清單。此清單維護在 ASimSourceType 監視清單中。
若要在解析器中使用 ASimSourceType 監視清單:
在解析器的開頭包含以下命令:
let Sources_by_SourceType=(sourcetype:string){_GetWatchlist('ASimSourceType') | where SearchKey == tostring(sourcetype) | extend Source=column_ifexists('Source','') | where isnotempty(Source)| distinct Source };
在解析器過濾部分新增使用監視清單的過濾器。例如,Infoblox DNS 解析器在過濾部分包含以下內容:
| where Computer in (Sources_by_SourceType('InfobloxNIOS'))
要在解析器中使用此範例:
- 將 Computer 替換為包含來源的來源資訊的欄位名稱。可以將其保留為任何基於 Syslog 的解析器的電腦。
- 將 InfobloxNIOS token替換為解析器選擇的值。通知解析器使用者必須使用我們選擇的值以及發送此類型事件的來源清單來更新 ASimSourceType 監視清單。
基於解析器參數的過濾
開發過濾解析器時,確保解析器接受相關schema的過濾參數,如該schema的參考文章所述。使用現有解析器作為起點可確保解析器包含正確的函數簽章。在大多數情況下,實際的過濾代碼對於過濾相同schema的解析器也類似。
過濾時,請確保:
- 使用實際欄位在解析之前進行過濾。如果過濾結果不夠準確,請在解析後重複測試以微調結果。
- 如果參數未定義且仍具有預設值,則不進行篩選。
以下範例顯示如何實現字串參數(預設值通常為“*”)和清單參數(預設值通常為empty list)的篩選。
srcipaddr=='*' or ClientIP==srcipaddr
array_length(domain_has_any) == 0 or Name has_any (domain_has_any)
過濾優化
為了確保解析器的效能,請注意以下過濾建議:
- 永遠過濾內建欄位而不是解析欄位。雖然有時使用解析欄位更容易進行過濾,但它會極大地影響效能。
- 使用提供優化效能的operator。特別是 ==、has 和startswith。使用諸如包含或匹配正規表示式之類的operator也會極大地影響效能。
過濾效能建議可能並不總是那麼容易遵循。例如,使用 has 不如 contains 準確。在其他情況下,匹配內建欄位(例如 SyslogMessage)不如比較提取的欄位(例如 DvcAction)準確。在這種情況下,建議您仍然對內建欄位使用效能最佳化operator進行預先過濾,並在解析後使用更準確的條件重複過濾。
有關範例,請參閱以下 Infoblox DNS 解析器片段。解析器首先檢查 SyslogMessage 欄位是否包含單字 client。但是,該語法可能用在訊息中的不同位置,因此在解析 Log_Type 欄位後,解析器會再次檢查單字 client 是否確實是該欄位的值。
Syslog | where ProcessName == "named" and SyslogMessage has "client"
…
| extend Log_Type = tostring(Parser[1]),
| where Log_Type == "client"
解析
一旦Query選擇了相關記錄,它可能需要解析它們。通常,如果在single text field中傳送多個event fields,則需要進行解析。
下面列出了執行解析的 KQL operator,按其效能最佳化排序。第一個提供最優化的效能,而最後一個提供最不優化的效能。
除了解析字串(string)之外,解析階段可能還需要對原始值進行更多處理,包括:
- 格式化和類型轉換。提取來源欄位後,可能需要對其進行格式化以適合目標schema欄位。例如,可能需要將表示日期和時間的字串轉換為日期時間欄位。 todatetime 和 tohex 等函數在這些情況下很有幫助。
- Value lookup。來源欄位的值一旦提取,可能需要對應到為目標schema欄位指定的vlaue set。例如,某些來源會報告 DNS 回應代碼,而schema則要求使用更常見的文字回應代碼。函數 iff 和 case 有助於對應一些值。
例如,Microsoft DNS 解析器使用 iff 語句根據事件 ID 和回應代碼指派 EventResult field,如下所示:
extend EventResult = iff(EventId==257 and ResponseCode==0 ,'Success','Failure')
對於多個值,使用datatable與lookup,如同一 DNS 解析器所示:
let RCodeTable = datatable(ResponseCode:int,ResponseCodeName:string) [ 0, 'NOERROR', 1, 'FORMERR'....];
...
| lookup RCodeTable on ResponseCode
| extend EventResultDetails = case (
isnotempty(ResponseCodeName), ResponseCodeName,
ResponseCode between (3841 .. 4095), 'Reserved for Private Use',
'Unassigned')
Mapping values
很多情況下,提取的原始值需要正規化。例如,在 ASIM 中,MAC 位址使用冒號作為分隔符,而來源可能會傳送連字號分隔的 MAC 位址。用於轉換值的主要operator是extend,以及廣泛的 KQL string、數字和日期函數,如上面的解析部分所示。
當需要將一組值對應到目標欄位允許的值時,請使用 case、iff 和 Lookup 語句。當每個來源值映射到目標值時,使用datatable operator 與 lookup 對應來定義mapping。例如
let NetworkProtocolLookup = datatable(Proto:real, NetworkProtocol:string)[
6, 'TCP',
17, 'UDP'
];
let DnsResponseCodeLookup=datatable(DnsResponseCode:int,DnsResponseCodeName:string)[
0,'NOERROR',
1,'FORMERR',
2,'SERVFAIL',
3,'NXDOMAIN',
...
];
...
| lookup DnsResponseCodeLookup on DnsResponseCode
| lookup NetworkProtocolLookup on Proto
注意,當mapping只有兩個可能值時,lookup也是有用且有效率的。當mapping條件更複雜時,請使用 iff 或 case 函數。 iff 函數可以mapping兩個值:
| extend EventResult =
iff(EventId==257 and ResponseCode==0,'Success','Failure’)
case 函數支援兩個以上的目標值。下面的範例展示如何組合lookup和case。如果未找到lookup value,上面的查找範例將在 DnsResponseCodeName 欄位中傳回空值。下面的案例範例透過使用lookup operation的結果(如果可用)來增強它,否則指定其他條件。
| extend DnsResponseCodeName =
case (
DnsResponseCodeName != "", DnsResponseCodeName,
DnsResponseCode between (3841 .. 4095), 'Reserved for Private Use',
'Unassigned'
)
準備Result set中的欄位(Fields)
解析器必須準備result set中的欄位以確保使用正規化欄位。以下 KQL operator用於準備result set中的欄位:
Handle parsing variants
在許多情況下,事件流(event stream)中的事件包括需要不同解析邏輯的變體。要在單一解析器中解析不同的變體,可以使用條件語句(例如 iff 和 case),也可以使用union。
若要使用 union 處理多個變體,請為每個變體建立一個單獨的函數,並使用 union 語句組合結果:
let AzureFirewallNetworkRuleLogs = AzureDiagnostics
| where Category == "AzureFirewallNetworkRule"
| where isnotempty(msg_s);
let parseLogs = AzureFirewallNetworkRuleLogs
| where msg_s has_any("TCP", "UDP")
| parse-where
msg_s with networkProtocol:string
" request from " srcIpAddr:string
":" srcPortNumber:int
…
| project-away msg_s;
let parseLogsWithUrls = AzureFirewallNetworkRuleLogs
| where msg_s has_all ("Url:","ThreatIntel:")
| parse-where
msg_s with networkProtocol:string
" request from " srcIpAddr:string
" to " dstIpAddr:string
…
union parseLogs, parseLogsWithUrls…
為了避免重複事件和過度處理,確保每個函數首先使用原生欄位僅過濾要解析的事件。另外,如果需要,請在union之前在每個分支(branch)使用 project-away。
部署解析器
透過將解析器複製到 Azure Monitor Log頁面並將Query儲存為函數來手動部署解析器。此方法對於測試很有用。
要部署大量解析器,微軟建議使用 parser ARM templates,如下所示:
- 根據每個Schema的相關範本建立一個 YAML 檔,並將我們的Query包含在其中。從與我們的schema和解析器類型、過濾或無參數相關的 YAML 模板開始。
- 使用 ASIM Yaml 到 ARM 範本轉換器將 YAML 檔案轉換為 ARM 範本。
- 如果部署更新,使用網站或函數刪除 PowerShell 工具刪除舊版本的函數。
- 使用 Azure 網站或 PowerShell 部署範本。
也可以使用linked templates將多個模板組合到單一部署過程中。
設定Azure Monitor Data Collection Rules
正規化日誌資料的另一種方法是在擷取時轉換資料。這提供了以解析格式儲存資料以便在 Microsoft Sentinel 中使用的效益。
資料收集規則 (DCR) 在 Azure Monitor 中提供類似 ETL 的流水線,讓我們定義處理進入 Azure Monitor 的資料的方式。根據工作流程的類型,DCR 可能會指定資料應傳送到的位置,並且可能會在將資料儲存在 Azure Monitor 日誌中之前篩選或轉換資料。某些資料收集規則將由 Azure Monitor 建立和管理,而我們可以建立其他規則來根據你的特定要求自訂資料收集。
DCR的類型
Azure Monitor 目前有兩種類型的資料收集規則:
- Standard DCR。與將資料傳送至 Azure Monitor 的不同工作流程一起使用。目前支援的工作流程是 Azure Monitor agent和自訂日誌。
- Workspace transformation DCR。與 Log Analytics 工作區結合使用,將套用ingestion-time transformations於目前不支援 DCR 的工作流程。
Transformations
DCR中的Transformations可讓我們在將傳入資料儲存到 Log Analytics 工作區之前對其進行篩選或修改。資料轉換是使用 KQL 語句定義的,該語句單獨應用於資料來源中的每個entry。它必須了解傳入資料的格式並以目標table的schema建立輸出。
input stream由名為 source 的virtual table呈現,其中的columns與input data stream definition相符。以下是一個典型的轉換範例。此範例包括以下功能:
- 使用 where 語句過濾傳入資料
- 使用extend operator新增column
- 使用project operator格式化輸出以符合目標table的column
source
| where severity == "Critical"
| extend Properties = parse_json(properties)
| project
TimeGenerated = todatetime(["time"]),
Category = category,
StatusDescription = StatusDescription,
EventName = name,
EventId = tostring(Properties.EventId)