Skip to content

Enable mypy on analysis and querying folder #436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions policy_sentry/analysis/analyze.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
"""
Functions to support the analyze capability in this tool
"""
from __future__ import annotations

import logging
from policy_sentry.analysis.expand import determine_actions_to_expand, get_expanded_policy
from typing import Any

from policy_sentry.analysis.expand import (
determine_actions_to_expand,
get_expanded_policy,
)
from policy_sentry.querying.actions import remove_actions_not_matching_access_level
from policy_sentry.util.policy_files import (
get_actions_from_policy,
Expand All @@ -12,7 +19,9 @@
logger = logging.getLogger(__name__)


def analyze_by_access_level(policy_json, access_level):
def analyze_by_access_level(
policy_json: dict[str, Any], access_level: str
) -> list[str]:
"""
Determine if a policy has any actions with a given access level. This is particularly useful when determining who
has 'Permissions management' level access
Expand All @@ -32,7 +41,9 @@ def analyze_by_access_level(policy_json, access_level):
return actions_by_level


def analyze_statement_by_access_level(statement_json, access_level):
def analyze_statement_by_access_level(
statement_json: dict[str, Any], access_level: str
) -> list[str]:
"""
Determine if a statement has any actions with a given access level.

Expand Down
47 changes: 19 additions & 28 deletions policy_sentry/analysis/expand.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import logging
import copy
import fnmatch
from typing import Any

from policy_sentry.querying.all import get_all_actions
from policy_sentry.util.policy_files import get_actions_from_statement

Expand All @@ -29,7 +31,7 @@ def expand(action: str | list[str]) -> list[str]:
return expanded_actions

if action == "*":
return all_actions.copy()
return list(all_actions)
elif "*" in action:
expanded = [
expanded_action
Expand Down Expand Up @@ -71,7 +73,7 @@ def determine_actions_to_expand(action_list: list[str]) -> list[str]:
return new_action_list


def get_expanded_policy(policy):
def get_expanded_policy(policy: dict[str, Any]) -> dict[str, Any]:
"""
Given a policy, expand the * Actions in IAM policy files to improve readability

Expand All @@ -82,38 +84,31 @@ def get_expanded_policy(policy):
"""
modified_policy = copy.deepcopy(policy)

if isinstance(modified_policy["Statement"], dict):
requested_actions = get_actions_from_statement(modified_policy["Statement"])
modified_statement = modified_policy["Statement"]
if isinstance(modified_statement, dict):
requested_actions = get_actions_from_statement(modified_statement)
expanded_actions = determine_actions_to_expand(requested_actions)
if "NotAction" in modified_policy["Statement"]:
if isinstance(modified_policy["Statement"]["NotAction"], list):
modified_policy["Statement"]["NotAction"].clear()
modified_policy["Statement"]["NotAction"].extend(expanded_actions)
elif isinstance(modified_policy["Statement"]["NotAction"], str):
modified_policy["Statement"]["NotAction"] = []
if "NotAction" in modified_statement:
if isinstance(modified_statement["NotAction"], list):
modified_statement["NotAction"] = expanded_actions
elif isinstance(modified_statement["NotAction"], str):
modified_statement["NotAction"] = []
logger.warning(
"NotAction is in the statement. Policy Sentry will expand any wildcard actions "
"that are in the NotAction list, but it will not factor the NotAction actions into any analysis about "
"whether or not the actions are allowed by the policy. "
"If you are concerned about this, please review this specific policy manually."
)
elif "Action" in modified_policy["Statement"]:
if isinstance(modified_policy["Statement"]["Action"], list):
modified_policy["Statement"]["Action"].clear()
modified_policy["Statement"]["Action"].extend(expanded_actions)
elif isinstance(modified_policy["Statement"]["Action"], str):
modified_policy["Statement"]["Action"] = []
elif "Action" in modified_statement:
modified_statement["Action"] = expanded_actions
# Otherwise it will be a list of Sids
elif isinstance(modified_policy["Statement"], list):
for statement in modified_policy["Statement"]:
elif isinstance(modified_statement, list):
for statement in modified_statement:
requested_actions = get_actions_from_statement(statement)
expanded_actions = determine_actions_to_expand(
requested_actions
)
expanded_actions = determine_actions_to_expand(requested_actions)
if "NotAction" in statement:
if isinstance(statement["NotAction"], list):
statement["NotAction"].clear()
statement["NotAction"].extend(expanded_actions)
statement["NotAction"] = expanded_actions
elif isinstance(statement["NotAction"], str):
statement["NotAction"] = []
logger.warning(
Expand All @@ -123,11 +118,7 @@ def get_expanded_policy(policy):
"If you are concerned about this, please review this specific policy manually."
)
elif "Action" in statement:
if isinstance(statement["Action"], list):
statement["Action"].clear()
elif isinstance(statement["Action"], str):
statement["Action"] = []
statement["Action"].extend(expanded_actions)
statement["Action"] = expanded_actions
else:
logger.critical("Unknown error: The 'Statement' is neither a dict nor a list")
return modified_policy
6 changes: 5 additions & 1 deletion policy_sentry/querying/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import logging
import functools
from typing import cast

from policy_sentry.shared.iam_data import get_service_prefix_data
from policy_sentry.util.conditions import is_condition_key_match
from policy_sentry.querying.actions import get_action_data
Expand Down Expand Up @@ -52,6 +54,8 @@ def get_condition_key_details(
}
return output

return {}


def get_conditions_for_action_and_raw_arn(action: str, raw_arn: str) -> list[str]:
"""
Expand Down Expand Up @@ -109,6 +113,6 @@ def get_condition_value_type(condition_key: str) -> str | None:
"conditions"
].items():
if is_condition_key_match(condition_key_entry, condition_key):
return condition_key_data["type"].lower()
return cast("str", condition_key_data["type"]).lower()

return None
35 changes: 33 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,40 @@ strict = true
pretty = true

exclude = [
'^policy_sentry/analysis',
'^policy_sentry/bin',
'^policy_sentry/command',
'^policy_sentry/querying',
'^policy_sentry/writing',
]

[tool.pytest.ini_options]
testpaths = [
"test",
"test/analysis",
"test/command",
"test/querying",
"test/util",
"test/writing",
]
norecursedirs = [
"_build",
"tmp*",
"__pycache__",
]

# supported from version 7.3.0
#[tool.coverage]
#omit = [
# "policy_sentry/shared/awsdocs.py",
# # ignore v1 variants
# "policy_sentry/querying/actions_v1.py",
# "policy_sentry/querying/all_v1.py",
# "policy_sentry/querying/arns_v1.py",
# # omit anything in a .local directory anywhere
# "*/.local/*",
# "*/virtualenv/*",
# "*/venv/*",
# "*/.venv/*",
# "*/docs/*",
# "*/examples/*",
# "utils/*",
#]
9 changes: 4 additions & 5 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
[tool:pytest]
testpaths = test test/analysis test/command test/querying test/util test/writing
python_files=test/*/test_*.py
norecursedirs = .svn _build tmp* __pycache__

# Exclude: __pycache__ / .pyc
[coverage:run]
;include =
Expand All @@ -11,6 +6,10 @@ norecursedirs = .svn _build tmp* __pycache__
;source=policy_sentry/*
omit =
policy_sentry/shared/awsdocs.py
# ignore v1 variants
policy_sentry/querying/actions_v1.py
policy_sentry/querying/all_v1.py
policy_sentry/querying/arns_v1.py
# omit anything in a .local directory anywhere
*/.local/*
*/virtualenv/*
Expand Down
39 changes: 39 additions & 0 deletions test/util/test_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from policy_sentry.util.actions import (
get_service_from_action,
get_action_name_from_action,
get_full_action_name,
)


def test_get_service_from_action():
# given
action = "ec2:DescribeInstance"

# when
service = get_service_from_action(action=action)

# then
assert service == "ec2"


def test_get_action_name_from_action():
# given
action = "ec2:DescribeInstance"

# when
service = get_action_name_from_action(action=action)

# then
assert service == "describeinstance"


def test_get_full_action_name():
# given
service = "ec2"
action_name = "DescribeInstance"

# when
action = get_full_action_name(service=service, action_name=action_name)

# then
assert action == "ec2:DescribeInstance"