chore: remote waf fingerprinting base
This commit is contained in:
parent
f32f4ffaee
commit
d25848cc5f
|
|
@ -49,6 +49,7 @@ stem = "^1.8.0"
|
||||||
pandas = "^2.2.1"
|
pandas = "^2.2.1"
|
||||||
openpyxl = "^3.0.10"
|
openpyxl = "^3.0.10"
|
||||||
tomli = "^2.2.1"
|
tomli = "^2.2.1"
|
||||||
|
pyyaml = "^6.0.3"
|
||||||
|
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
jsonschema = "^4.0.0"
|
jsonschema = "^4.0.0"
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,84 @@
|
||||||
|
import requests
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
|
NUCLEI_FINGERPRINT_URL: str = "https://raw.githubusercontent.com/projectdiscovery/nuclei-templates/refs/heads/main/http/global-matchers/global-waf-detect.yaml"
|
||||||
|
|
||||||
|
def _check_nuclei_regex(matcher: dict[str,str|list[str]], response: requests.Response) -> bool:
|
||||||
|
import re
|
||||||
|
|
||||||
|
and_cond: bool = matcher.get('condition', '') == 'and'
|
||||||
|
|
||||||
|
target_text: str
|
||||||
|
if matcher['part'] == 'body':
|
||||||
|
target_text = response.text
|
||||||
|
elif matcher['part'] == 'header':
|
||||||
|
target_text = str(response.headers)
|
||||||
|
else:
|
||||||
|
target_text = response.text + str(response.headers)
|
||||||
|
|
||||||
|
for regex in matcher['regex']:
|
||||||
|
if re.search(regex, target_text):
|
||||||
|
if not and_cond:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# `and` conditions will cycle, resulting in this default return True
|
||||||
|
# unless an early failed detection breaks the loop (resulting in False)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _check_nuclei_words(matcher: dict[str,str|list[str]], response: requests.Response) -> bool:
|
||||||
|
and_cond: bool = matcher.get('condition', '') == 'and'
|
||||||
|
|
||||||
|
target_text: str
|
||||||
|
if matcher['part'] == 'body':
|
||||||
|
target_text = response.text
|
||||||
|
elif matcher['part'] == 'header':
|
||||||
|
target_text = str(response.headers)
|
||||||
|
else:
|
||||||
|
target_text = response.text + str(response.headers)
|
||||||
|
|
||||||
|
for word in matcher['words']:
|
||||||
|
if word in target_text:
|
||||||
|
if not and_cond:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# `and` conditions will cycle, resulting in this default return True
|
||||||
|
# unless an early failed detection breaks the loop (resulting in False)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def fetch_nuclei_fingerprints() -> list[dict[str,str|list[str]]] | None:
|
||||||
|
"""Fetch the latest Nuclei WAF fingerprints from the official repository."""
|
||||||
|
try:
|
||||||
|
response = requests.get(NUCLEI_FINGERPRINT_URL, timeout=10)
|
||||||
|
response.raise_for_status()
|
||||||
|
raw = yaml.safe_load(response.text)
|
||||||
|
fingerprints: list[dict[str,str|list[str]]] = raw['http'][0]['matchers']
|
||||||
|
return fingerprints
|
||||||
|
except requests.RequestException as e:
|
||||||
|
print(f"Error fetching Nuclei fingerprints: {e}")
|
||||||
|
return None
|
||||||
|
except yaml.YAMLError as e:
|
||||||
|
print(f"Error parsing YAML data: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def nuclei_check(response: requests.Response, fingerprints: list[dict[str,str|list[str]]]) -> bool:
|
||||||
|
"""Check if the response matches any of the WAF fingerprints.
|
||||||
|
|
||||||
|
Keyword arguments:
|
||||||
|
response -- The HTTP response to check.
|
||||||
|
fingerprints -- The list of Nuclei WAF fingerprints to check against.
|
||||||
|
|
||||||
|
Returns True if a WAF is detected, False otherwise.
|
||||||
|
"""
|
||||||
|
for matcher in fingerprints:
|
||||||
|
if matcher['type'] == 'word':
|
||||||
|
return _check_nuclei_words(matcher, response)
|
||||||
|
elif matcher['type'] == 'regex':
|
||||||
|
return _check_nuclei_regex(matcher, response)
|
||||||
|
return False
|
||||||
|
|
@ -0,0 +1,26 @@
|
||||||
|
id: global-waf-detect
|
||||||
|
http:
|
||||||
|
- global-matchers: true
|
||||||
|
matchers-condition: or
|
||||||
|
matchers:
|
||||||
|
- type: regex
|
||||||
|
name: regexSite
|
||||||
|
regex:
|
||||||
|
- '(?i)access.to.this.page.has.been.denied'
|
||||||
|
- '(?i)http(s)?://(www.)?anotheroneblocked.\w+.whywasiblocked'
|
||||||
|
condition: or
|
||||||
|
part: response
|
||||||
|
|
||||||
|
- type: word
|
||||||
|
name: wordSiteBody
|
||||||
|
part: body
|
||||||
|
words:
|
||||||
|
- "bad_text_in_body"
|
||||||
|
|
||||||
|
- type: word
|
||||||
|
name: wordSiteHead
|
||||||
|
part: header
|
||||||
|
condition: or
|
||||||
|
words:
|
||||||
|
- "text_in_head"
|
||||||
|
- "other_in_head"
|
||||||
|
|
@ -0,0 +1,107 @@
|
||||||
|
import os
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch, Mock
|
||||||
|
import requests
|
||||||
|
from requests.structures import CaseInsensitiveDict
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from sherlock_project import waf_check
|
||||||
|
|
||||||
|
|
||||||
|
TEMPLATE_BODY_PATH: str = os.path.join(os.path.dirname(__file__), 'mocks', 'global_waf_detect.yaml')
|
||||||
|
|
||||||
|
def side_effect(url, **kwargs) -> Mock:
|
||||||
|
if url == waf_check.NUCLEI_FINGERPRINT_URL:
|
||||||
|
with open(TEMPLATE_BODY_PATH, 'r', encoding='utf-8') as file:
|
||||||
|
template_body: str = file.read()
|
||||||
|
mock_response: Mock = Mock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.text = template_body
|
||||||
|
return mock_response
|
||||||
|
raise RuntimeError("Unexpected URL")
|
||||||
|
|
||||||
|
class TestWafCheck(unittest.TestCase):
|
||||||
|
|
||||||
|
@patch('sherlock_project.waf_check.requests.get')
|
||||||
|
def test_fetch_nuclei_fingerprints(self, mock_requests_get): # type: ignore
|
||||||
|
mock_requests_get.side_effect = side_effect
|
||||||
|
|
||||||
|
result = waf_check.fetch_nuclei_fingerprints()
|
||||||
|
|
||||||
|
with open(TEMPLATE_BODY_PATH, 'r', encoding='utf-8') as file:
|
||||||
|
template_body: str = file.read()
|
||||||
|
|
||||||
|
expected: list[dict[str, str | list[str]]] = yaml.safe_load(template_body)['http'][0]['matchers']
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_nuclei_regex_check(self):
|
||||||
|
mock_res: requests.Response = requests.Response()
|
||||||
|
mock_res.status_code = 200
|
||||||
|
mock_res._content = b"This is a test response with Test-Regex in the body."
|
||||||
|
mock_res.headers = CaseInsensitiveDict({
|
||||||
|
'Content-Type': 'text/html',
|
||||||
|
'Server': 'TestServer'
|
||||||
|
})
|
||||||
|
matcher: dict[str, str | list[str]] = {
|
||||||
|
'type': 'regex',
|
||||||
|
'name': 'test-regex',
|
||||||
|
'part': 'body',
|
||||||
|
'regex': [r'(?i)not-present'],
|
||||||
|
'condition': 'or'
|
||||||
|
}
|
||||||
|
self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['regex'] = [r'(?i)TeSt-REgEx']
|
||||||
|
self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['regex'] = [r'(?i)TeSt-REgEx', r'(?i)Not-Present']
|
||||||
|
self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['condition'] = 'and'
|
||||||
|
self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['part'] = 'header'
|
||||||
|
matcher['regex'] = [r'(?i)testserver']
|
||||||
|
self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['part'] = 'response'
|
||||||
|
self.assertTrue(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['regex'] = [r'(?i)not-present']
|
||||||
|
self.assertFalse(waf_check._check_nuclei_regex(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
def test_nuclei_words_check(self):
|
||||||
|
mock_res: requests.Response = requests.Response()
|
||||||
|
mock_res.status_code = 200
|
||||||
|
mock_res._content = b"This is a test response with test-words in the body."
|
||||||
|
mock_res.headers = CaseInsensitiveDict({
|
||||||
|
'Content-Type': 'text/html',
|
||||||
|
'Server': 'TestServer'
|
||||||
|
})
|
||||||
|
matcher: dict[str, str | list[str]] = {
|
||||||
|
'type': 'word',
|
||||||
|
'name': 'test-word',
|
||||||
|
'part': 'body',
|
||||||
|
'words': ['not-present'],
|
||||||
|
'condition': 'or'
|
||||||
|
}
|
||||||
|
self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['words'] = ['test-word']
|
||||||
|
self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['words'] = ['test-word', 'Not-Present']
|
||||||
|
self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['condition'] = 'and'
|
||||||
|
self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['part'] = 'header'
|
||||||
|
matcher['words'] = ['testserver']
|
||||||
|
self.assertFalse(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['words'] = ['TestServer']
|
||||||
|
self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
|
|
||||||
|
matcher['part'] = 'response'
|
||||||
|
self.assertTrue(waf_check._check_nuclei_words(matcher, mock_res)) # pyright: ignore[reportPrivateUsage]
|
||||||
Loading…
Reference in New Issue