Skip to content

Commit 8d29103

Browse files
committed
feat(heuristics): add Fake Email analyzer to validate maintainer email domains
Signed-off-by: Amine <[email protected]>
1 parent 1813f82 commit 8d29103

File tree

5 files changed

+273
-0
lines changed

5 files changed

+273
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies = [
3838
"problog >= 2.2.6,<3.0.0",
3939
"cryptography >=44.0.0,<45.0.0",
4040
"semgrep == 1.113.0",
41+
"dnspython >=2.7.0,<3.0.0",
4142
]
4243
keywords = []
4344
# https://pypi.org/classifiers/

src/macaron/malware_analyzer/pypi_heuristics/heuristics.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ class Heuristics(str, Enum):
4343
#: Indicates that the package source code contains suspicious code patterns.
4444
SUSPICIOUS_PATTERNS = "suspicious_patterns"
4545

46+
#: Indicates that the package maintainer's email address is suspicious or invalid.
47+
FAKE_EMAIL = "fake_email"
48+
4649

4750
class HeuristicResult(str, Enum):
4851
"""Result type indicating the outcome of a heuristic."""
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
2+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
3+
4+
"""The heuristic analyzer to check the email address of the package maintainers."""
5+
6+
import logging
7+
import re
8+
9+
import dns.resolver as dns_resolver
10+
11+
from macaron.errors import HeuristicAnalyzerValueError
12+
from macaron.json_tools import JsonType
13+
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
14+
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
15+
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
16+
17+
logger: logging.Logger = logging.getLogger(__name__)
18+
19+
20+
class FakeEmailAnalyzer(BaseHeuristicAnalyzer):
21+
"""Analyze the email address of the package maintainers."""
22+
23+
def __init__(self) -> None:
24+
super().__init__(
25+
name="fake_email_analyzer",
26+
heuristic=Heuristics.FAKE_EMAIL,
27+
depends_on=None,
28+
)
29+
30+
def is_valid_email(self, email: str) -> bool:
31+
"""Check if the email format is valid and the domain has MX records.
32+
33+
Parameters
34+
----------
35+
email: str
36+
The email address to check.
37+
38+
Returns
39+
-------
40+
bool:
41+
``True`` if the email address is valid, ``False`` otherwise.
42+
43+
Raises
44+
------
45+
HeuristicAnalyzerValueError
46+
if the failure is due to DNS resolution.
47+
"""
48+
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
49+
return False
50+
51+
domain = email.split("@")[1]
52+
try:
53+
records = dns_resolver.resolve(domain, "MX")
54+
if not records:
55+
return False
56+
except Exception as err:
57+
err_message = f"Failed to resolve domain {domain}: {err}"
58+
raise HeuristicAnalyzerValueError(err_message) from err
59+
return True
60+
61+
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
62+
"""Analyze the package.
63+
64+
Parameters
65+
----------
66+
pypi_package_json: PyPIPackageJsonAsset
67+
The PyPI package JSON asset object.
68+
69+
Returns
70+
-------
71+
tuple[HeuristicResult, dict[str, JsonType]]:
72+
The result and related information collected during the analysis.
73+
74+
Raises
75+
------
76+
HeuristicAnalyzerValueError
77+
if the analysis fails.
78+
"""
79+
response = pypi_package_json.download("")
80+
if not response:
81+
error_message = "Failed to download package JSON "
82+
return HeuristicResult.FAIL, {"message": error_message}
83+
84+
data = pypi_package_json.package_json
85+
author_email = data.get("info", {}).get("author_email", None)
86+
maintainer_email = data.get("info", {}).get("maintainer_email", None)
87+
if maintainer_email is None and author_email is None:
88+
message = "No maintainers are available"
89+
return HeuristicResult.SKIP, {"message": message}
90+
91+
if author_email is not None and not self.is_valid_email(author_email):
92+
return HeuristicResult.FAIL, {"email": author_email}
93+
if maintainer_email is not None and not self.is_valid_email(maintainer_email):
94+
return HeuristicResult.FAIL, {"email": maintainer_email}
95+
96+
return HeuristicResult.PASS, {}

src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from macaron.malware_analyzer.pypi_heuristics.metadata.anomalous_version import AnomalousVersionAnalyzer
2121
from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer
2222
from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer
23+
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
2324
from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer
2425
from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer
2526
from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer
@@ -357,6 +358,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
357358
WheelAbsenceAnalyzer,
358359
AnomalousVersionAnalyzer,
359360
TyposquattingPresenceAnalyzer,
361+
FakeEmailAnalyzer,
360362
]
361363

362364
# name used to query the result of all problog rules, so it can be accessed outside the model.
@@ -424,13 +426,18 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
424426
failed({Heuristics.ONE_RELEASE.value}),
425427
failed({Heuristics.ANOMALOUS_VERSION.value}).
426428
429+
% Package released recently with the a maintainer email address that is not valid.
430+
{Confidence.MEDIUM.value}::trigger(malware_medium_confidence_3) :-
431+
quickUndetailed,
432+
failed({Heuristics.FAKE_EMAIL.value}).
427433
% ----- Evaluation -----
428434
429435
% Aggregate result
430436
{problog_result_access} :- trigger(malware_high_confidence_1).
431437
{problog_result_access} :- trigger(malware_high_confidence_2).
432438
{problog_result_access} :- trigger(malware_high_confidence_3).
433439
{problog_result_access} :- trigger(malware_high_confidence_4).
440+
{problog_result_access} :- trigger(malware_medium_confidence_3).
434441
{problog_result_access} :- trigger(malware_medium_confidence_2).
435442
{problog_result_access} :- trigger(malware_medium_confidence_1).
436443
query({problog_result_access}).
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
2+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
3+
4+
"""Tests for the FakeEmailAnalyzer heuristic."""
5+
6+
7+
from collections.abc import Generator
8+
from unittest.mock import MagicMock, patch
9+
10+
import pytest
11+
12+
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
13+
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
14+
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
15+
16+
17+
@pytest.fixture(name="analyzer")
18+
def analyzer_fixture() -> FakeEmailAnalyzer:
19+
"""Pytest fixture to create a FakeEmailAnalyzer instance."""
20+
return FakeEmailAnalyzer()
21+
22+
23+
@pytest.fixture(name="pypi_package_json_asset_mock")
24+
def pypi_package_json_asset_mock_fixture() -> MagicMock:
25+
"""Pytest fixture for a mock PyPIPackageJsonAsset."""
26+
mock_asset = MagicMock(spec=PyPIPackageJsonAsset)
27+
# Default to successful download, tests can override
28+
mock_asset.download = MagicMock(return_value=True)
29+
# package_json should be set by each test to simulate different PyPI responses
30+
mock_asset.package_json = {}
31+
return mock_asset
32+
33+
34+
@pytest.fixture(name="mock_dns_resolve")
35+
def mock_dns_resolve_fixture() -> Generator[MagicMock]:
36+
"""General purpose mock for dns.resolver.resolve.
37+
38+
Patches where dns_resolver is imported in the module under test.
39+
"""
40+
with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.dns_resolver.resolve") as mock_resolve:
41+
# Default behavior: simulate successful MX record lookup.
42+
mock_mx_record = MagicMock()
43+
mock_mx_record.exchange = "mail.default-domain.com"
44+
mock_resolve.return_value = [mock_mx_record]
45+
yield mock_resolve
46+
47+
48+
# Tests for the analyze method
49+
def test_analyze_download_failure(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
50+
"""Test the analyzer fails if downloading package JSON fails."""
51+
pypi_package_json_asset_mock.download.return_value = False
52+
result, info = analyzer.analyze(pypi_package_json_asset_mock)
53+
assert result == HeuristicResult.FAIL
54+
assert "message" in info
55+
assert isinstance(info["message"], str)
56+
assert "Failed to download package JSON" in info["message"]
57+
pypi_package_json_asset_mock.download.assert_called_once_with("")
58+
59+
60+
def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
61+
"""Test the analyzer skips if no author_email or maintainer_email is present."""
62+
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}}
63+
result, info = analyzer.analyze(pypi_package_json_asset_mock)
64+
assert result == HeuristicResult.SKIP
65+
assert info["message"] == "No maintainers are available"
66+
67+
68+
def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
69+
"""Test the analyzer skips if 'info' key is missing in PyPI data."""
70+
pypi_package_json_asset_mock.package_json = {} # No 'info' key
71+
result, info = analyzer.analyze(pypi_package_json_asset_mock)
72+
assert result == HeuristicResult.SKIP
73+
assert info["message"] == "No maintainers are available"
74+
75+
76+
def test_analyze_fail_empty_author_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
77+
"""Test analyzer fails for empty author_email string (maintainer_email is None)."""
78+
pypi_package_json_asset_mock.package_json = {"info": {"author_email": "", "maintainer_email": None}}
79+
result, info = analyzer.analyze(pypi_package_json_asset_mock)
80+
assert result == HeuristicResult.FAIL
81+
assert info["email"] == ""
82+
83+
84+
def test_analyze_pass_only_maintainer_email_valid(
85+
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
86+
) -> None:
87+
"""Test analyzer passes when only maintainer_email is present and valid."""
88+
mock_mx_record = MagicMock()
89+
mock_mx_record.exchange = "mail.example.net"
90+
mock_dns_resolve.return_value = [mock_mx_record]
91+
92+
pypi_package_json_asset_mock.package_json = {
93+
"info": {"author_email": None, "maintainer_email": "[email protected]"}
94+
}
95+
result, info = analyzer.analyze(pypi_package_json_asset_mock)
96+
assert result == HeuristicResult.PASS
97+
assert info == {}
98+
mock_dns_resolve.assert_called_once_with("example.net", "MX")
99+
100+
101+
def test_analyze_pass_both_emails_valid(
102+
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
103+
) -> None:
104+
"""Test the analyzer passes when both emails are present and valid."""
105+
106+
def side_effect_dns_resolve(domain: str, record_type: str = "MX") -> list[MagicMock]:
107+
mock_mx = MagicMock()
108+
domains = {
109+
"MX": {"example.com", "example.net"},
110+
}
111+
if domain not in domains.get(record_type, set()):
112+
pytest.fail(f"Unexpected domain for DNS resolve: {domain}")
113+
mock_mx.exchange = f"mail.{domain}"
114+
return [mock_mx]
115+
116+
mock_dns_resolve.side_effect = side_effect_dns_resolve
117+
118+
pypi_package_json_asset_mock.package_json = {
119+
"info": {"author_email": "[email protected]", "maintainer_email": "[email protected]"}
120+
}
121+
result, info = analyzer.analyze(pypi_package_json_asset_mock)
122+
assert result == HeuristicResult.PASS
123+
assert info == {}
124+
assert mock_dns_resolve.call_count == 2
125+
mock_dns_resolve.assert_any_call("example.com", "MX")
126+
mock_dns_resolve.assert_any_call("example.net", "MX")
127+
128+
129+
def test_analyze_fail_author_email_invalid_format(
130+
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
131+
) -> None:
132+
"""Test analyzer fails when author_email has an invalid format."""
133+
pypi_package_json_asset_mock.package_json = {
134+
"info": {"author_email": "bad_email_format", "maintainer_email": "[email protected]"}
135+
}
136+
result, info = analyzer.analyze(pypi_package_json_asset_mock)
137+
assert result == HeuristicResult.FAIL
138+
assert info["email"] == "bad_email_format"
139+
mock_dns_resolve.assert_not_called() # Regex check fails before DNS lookup
140+
141+
142+
# Tests for the is_valid_email method
143+
def test_is_valid_email_valid_email_with_mx(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
144+
"""Test is_valid_email returns True for a valid email with MX records."""
145+
mock_mx_record = MagicMock()
146+
mock_mx_record.exchange = "mail.example.com"
147+
mock_dns_resolve.return_value = [mock_mx_record]
148+
assert analyzer.is_valid_email("[email protected]") is True
149+
mock_dns_resolve.assert_called_once_with("example.com", "MX")
150+
151+
152+
def test_is_valid_email_invalid_format(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
153+
"""Test is_valid_email method with various invalid email formats."""
154+
assert not analyzer.is_valid_email("not_an_email")
155+
assert not analyzer.is_valid_email("test@")
156+
assert not analyzer.is_valid_email("@example.com")
157+
assert not analyzer.is_valid_email("test@example")
158+
assert not analyzer.is_valid_email("")
159+
mock_dns_resolve.assert_not_called()
160+
161+
162+
def test_is_valid_email_no_mx_records_returned(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
163+
"""Test is_valid_email returns False if DNS resolve returns no MX records."""
164+
mock_dns_resolve.return_value = [] # Simulate no MX records found
165+
assert analyzer.is_valid_email("[email protected]") is False
166+
mock_dns_resolve.assert_called_once_with("no-mx-domain.com", "MX")

0 commit comments

Comments
 (0)