Microsoft 365 Defender Advanced hunting Query 에서 사용할 수 있는 Python 코드이다.
mde_search 함수를 재활용해서, 입력한 값이 어떤 타입인지에 따라 검색하는 조건을 바꾸도록 설계되어 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | import json import time import argparse import pandas as pd import logging import sys import os import re import validators import datetime import random import zipfile import urllib.parse from collections import OrderedDict from urllib3.util.retry import Retry import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) with open('./conf.json', 'r') as f: conf = json.load(f) MDE_TENANT = conf['tenant'] MDE_APP = conf['app'] MDE_SECRET = conf['secret'] MDE_URL = "https://login.microsoftonline.com/%s/oauth2/token" % (MDE_TENANT) logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.DEBUG) def requests_retry_session( retries=10, backoff_factor=0.3, status_forcelist=(500, 502, 504), session=None, ): session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session def http_request(address, hr_jsondata, hr_headers = None): trycount = 0 number = random.randint(1,3) while True: try: if trycount == 10: logging.warning('httpRequest retry 10') break else: trycount += 1 time.sleep(number) if hr_jsondata: res = requests_retry_session().post(url=address, data=hr_jsondata, headers=hr_headers, verify=False) else: res = requests_retry_session().get(url=address, headers=hr_headers, verify=False) if res.status_code == 200: break else: print(res.text) except Exception as httpRequest_e: httpRequest_error = str(httpRequest_e) logging.warning('httpRequest: %s', httpRequest_error) time.sleep(1) return res def mde_search(search_q, search_item): try: mde_search_result = '' mde_file_path = '' resourceAppIdUri = 'https://api.securitycenter.microsoft.com' body = { 'resource' : resourceAppIdUri, 'client_id' : MDE_APP, 'client_secret' : MDE_SECRET, 'grant_type' : 'client_credentials' } data = urllib.parse.urlencode(body).encode("utf-8") req = http_request(MDE_URL, data).json() aadToken = req["access_token"] url = "https://api.securitycenter.microsoft.com/api/advancedqueries/run" headers = { 'Content-Type' : 'application/json', 'Accept' : 'application/json', 'Authorization' : "Bearer " + aadToken } data = json.dumps({ 'Query' : search_q }).encode("utf-8") req = http_request(url, data, headers).json() if len(req['Results']) >= 1: df_result = pd.DataFrame(req['Results']) df_string = df_result.to_string() if len(df_result.index) > 30: df_string = '('+ str(len(df_result.index))+'/10000)' task_date = datetime.datetime.now() directory = './'+str(task_date.month)+'/'+str(search_item)+'/' if not os.path.exists(os.path.dirname(directory)): os.makedirs(os.path.dirname(directory)) mde_search_filename = search_item+'.xlsx' mde_file_path = os.path.join(directory, mde_search_filename) df_result.to_excel(mde_file_path) mde_search_result = df_string except Exception as main_mde_e: main_mde_error = str(main_mde_e) mde_search_result = main_mde_error logging.warning('main_mde_error: %s', main_mde_error) time.sleep(1) return mde_search_result, mde_file_path if __name__ == "__main__": """ 메인 내장 함수 """ parser = argparse.ArgumentParser() parser.add_argument('-q', '--query', help="search query", required=True) args, unknown = parser.parse_known_args() hashlist = [] ip_regex = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$" md5_regex = r"\b(?!^[\d]*$)(?!^[a-fA-F]*$)([a-f\d]{32}|[A-F\d]{32})\b" sha256_regex = r"[A-Fa-f0-9]{64}" domain_regex = r"^((?!-))(xn--)?[a-z0-9][a-z0-9-_]{0,61}[a-z0-9]{0,1}\.(xn--)?([a-z0-9\-]{1,61}|[a-z0-9-]{1,30}\.[a-z]{2,})$" account_regex = r"^(?![-._])(?!.*[_.-]{2})[\w.-]{6,30}(?<![-._])$" # Search 를 regex 별로 만들어서 여러개의 타입도 인식할 수 있도록 함. # Search시 여러 값을 동시에 인식할 수 있도록 구성 <- 추후 구성 query = args.query if ' ' in query: splitlist = query.split(' ') for s in splitlist: s = s.strip() hashlist.append(s) else: hashdata = query.strip() hashlist.append(hashdata) for search_item in hashlist: try: search_item = search_item.lower() taskdetect = False main_result = '' search_event = '' search_query = '' search_result_file = '' if bool(re.match(ip_regex, search_item)): # 네트워크, 프로세스, 레지스트리 search_event = '`Network, Process, Registry Events`\n- Search RemoteIP, LocalIP' search_query = 'find in (DeviceNetworkEvents, DeviceProcessEvents, DeviceRegistryEvents) where Timestamp > ago(30d) and (RemoteIP contains "{0}" or LocalIP contains "{0}") | summarize by DeviceName, LocalIP, RemoteIP, InitiatingProcessFileName | project-rename device=DeviceName, local=LocalIP, remoteip=RemoteIP, processname=InitiatingProcessFileName| take 50000'.format(search_item) # Paste your own query here elif bool(re.match(md5_regex, search_item)): # 파일 해쉬, 프로세스 관련 search_event = '`File, Process, ImageLoad Events`\n- Search MD5, InitiatingProcessMD5' search_query = 'find in (DeviceFileEvents, DeviceProcessEvents, DeviceImageLoadEvents) where Timestamp > ago(30d) and (MD5 == "{0}" or InitiatingProcessMD5 == "{0}") | summarize by DeviceName, InitiatingProcessFileName | project-rename device=DeviceName, processname=InitiatingProcessFileName| take 10000'.format(search_item) elif bool(re.match(sha256_regex, search_item)): # 네트워크, 프로세스, 레지스트리, 파일 해쉬, 프로세스 관련 search_event = '`File, Process, Device, Registry, Network, ImageLoad Events`\n- Search SHA256, InitiatingProcessSHA256' search_query = 'find in (DeviceFileEvents, DeviceProcessEvents, DeviceEvents, DeviceRegistryEvents, DeviceNetworkEvents, DeviceImageLoadEvents) where Timestamp > ago(30d) and (SHA256 == "{0}" or InitiatingProcessSHA256 == "{0}") | summarize by DeviceName, InitiatingProcessFileName | project-rename device=DeviceName, processname=InitiatingProcessFileName| take 50000'.format(search_item) elif bool(validators.url(search_item)): # 네트워크 RemoteUrl, FolderPath search_event = '`Network, File Events`\n- Search RemoteUrl, FolderPath' search_query = 'find in (DeviceNetworkEvents, DeviceFileEvents) where Timestamp > ago(30d) and (RemoteUrl contains "{0}" or FolderPath contains "{0}") | summarize by DeviceName, RemoteUrl, FolderPath, InitiatingProcessFileName | project-rename device=DeviceName, remoteurl=RemoteUrl, folder=FolderPath, processname=InitiatingProcessFileName| take 50000'.format(search_item) elif bool(re.match(domain_regex, search_item)): #RemoteUrl, FolderPath search_event = '`Network, File Events`\n- Search RemoteUrl, FolderPath' search_query = 'find in (DeviceNetworkEvents, DeviceFileEvents) where Timestamp > ago(30d) and (RemoteUrl contains "{0}" or FolderPath contains "{0}") | summarize by DeviceName, RemoteUrl, FolderPath, InitiatingProcessFileName | project-rename device=DeviceName, remoteurl=RemoteUrl, folder=FolderPath, processname=InitiatingProcessFileName| take 50000'.format(search_item) elif search_item.startswith('cve-'): # CVE 확인 search_event = '`SoftwareVulnerabilities`\n- Search CveId' search_query = 'find in (DeviceTvmSoftwareVulnerabilities) where (CveId contains "{}") | take 50000'.format(search_item) else: # 소프트웨어 검색 search_event = '`DeviceTvmSoftwareInventory`\n- Search SoftwareName' search_query = 'find in (DeviceTvmSoftwareInventory) where (SoftwareName contains "{}") | project DeviceId, DeviceName, OSPlatform, SoftwareVendor, SoftwareName, SoftwareVersion | summarize count() by SoftwareName, SoftwareVersion | take 50000'.format(search_item) if search_query: main_result, search_result_file = mde_search(search_query, search_item, search_event) else: search_event = 'not match ioc type' if main_result: taskdetect = True print(main_result) except Exception as e: pass | cs |
환경 설정 파일인 conf.json은 다음과 같이 작업하면 된다.
1 2 3 4 5 | { "tenant":"input here", "app":"input here", "secret":"input here" } | cs |
0 댓글