Skip to content

Fixes #296 - query all services for wildcard-only actions #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion policy_sentry/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Policy Sentry is a tool for generating least-privilege IAM Policies.
"""
__version__ = "0.11.2"
__version__ = "0.11.3"
import click
from policy_sentry import command

Expand Down
67 changes: 33 additions & 34 deletions policy_sentry/command/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
get_action_data,
get_actions_matching_condition_key,
get_actions_with_arn_type_and_access_level,
get_actions_matching_arn_type
get_actions_matching_arn_type,
get_actions_that_support_wildcard_arns_only
)
from policy_sentry.querying.conditions import (
get_condition_keys_for_service,
Expand All @@ -32,6 +33,20 @@
iam_definition_path = DATASTORE_FILE_PATH


def print_list(output, fmt="json"):
"""Common method on how to print a list, depending on whether the user requests JSON or YAML output"""
print(yaml.dump(output)) if fmt == "yaml" else [
print(item) for item in output
]


def print_dict(output, fmt="json"):
"""Common method on how to print a dict, depending on whether the user requests JSON or YAML output"""
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]


@click.group()
def query():
"""Allow users to query the IAM tables from command line"""
Expand Down Expand Up @@ -113,63 +128,53 @@ def query_action_table(
for serv in all_services:
result = get_actions_with_access_level(serv, level)
output.extend(result)
print(yaml.dump(output)) if fmt == "yaml" else [
print(result) for result in output
]
print_list(output=output, fmt=fmt)
# Get a list of all services in the database
elif resource_type == "*":
print("ALL actions that do not support resource ARN constraints")
output = get_actions_that_support_wildcard_arns_only(service)
print_dict(output=output, fmt=fmt)
else:
print("All services in the database:\n")
output = all_services
print(yaml.dump(output)) if fmt == "yaml" else [
print(item) for item in output
]
print_list(output=output, fmt=fmt)
elif name is None and access_level and not resource_type:
print(
f"All IAM actions under the {service} service that have the access level {access_level}:"
)
level = transform_access_level_text(access_level)
output = get_actions_with_access_level(service, level)
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]
print_dict(output=output, fmt=fmt)
elif name is None and access_level and resource_type:
print(
f"{service} {access_level.upper()} actions that have the resource type {resource_type.upper()}:"
)
access_level = transform_access_level_text(access_level)
output = get_actions_with_arn_type_and_access_level(service, resource_type, access_level)
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]
print_dict(output=output, fmt=fmt)
# Get a list of all IAM actions under the service that support the specified condition key.
elif condition:
print(
f"IAM actions under {service} service that support the {condition} condition only:"
)
output = get_actions_matching_condition_key(service, condition)
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]
print_dict(output=output, fmt=fmt)
# Get a list of IAM Actions under the service that only support resources = "*"
# (i.e., you cannot restrict it according to ARN)
elif resource_type:
print(
f"IAM actions under {service} service that have the resource type {resource_type}:"
)
output = get_actions_matching_arn_type(service, resource_type)
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]
print_dict(output=output, fmt=fmt)
elif name and access_level is None:
output = get_action_data(service, name)
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]
print_dict(output=output, fmt=fmt)
else:
# Get a list of all IAM Actions available to the service
output = get_actions_for_service(service)
print(f"ALL {service} actions:")
print(yaml.dump(output)) if fmt == "yaml" else [print(item) for item in output]
print_list(output=output, fmt=fmt)
return output


Expand Down Expand Up @@ -225,20 +230,16 @@ def query_arn_table(name, service, list_arn_types, fmt):
# Get a list of all RAW ARN formats available through the service.
if name is None and list_arn_types is False:
output = get_raw_arns_for_service(service)
print(yaml.dump(output)) if fmt == "yaml" else [print(item) for item in output]
print_list(output=output, fmt=fmt)
# Get a list of all the ARN types per service, paired with the RAW ARNs
elif name is None and list_arn_types:
output = get_arn_types_for_service(service)
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]
print_dict(output=output, fmt=fmt)
# Get the raw ARN format for the `cloud9` service with the short name
# `environment`
else:
output = get_arn_type_details(service, name)
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]
print_dict(output=output, fmt=fmt)
return output


Expand Down Expand Up @@ -287,11 +288,9 @@ def query_condition_table(name, service, fmt="json"):
# Get a list of all condition keys available to the service
if name is None:
output = get_condition_keys_for_service(service)
print(yaml.dump(output)) if fmt == "yaml" else [print(item) for item in output]
print_list(output=output, fmt=fmt)
# Get details on the specific condition key
else:
output = get_condition_key_details(service, name)
print(yaml.dump(output)) if fmt == "yaml" else [
print(json.dumps(output, indent=4))
]
print_dict(output=output, fmt=fmt)
return output
2 changes: 1 addition & 1 deletion policy_sentry/querying/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def get_actions_that_support_wildcard_arns_only(service_prefix):
if len(action_data["resource_types"].keys()) == 1:
for resource_type in action_data["resource_types"]:
if resource_type == '':
results.append(f"{service_prefix}:{action_name}")
results.append(f"{some_prefix}:{action_name}")
else:
service_prefix_data = get_service_prefix_data(service_prefix)
for action_name, action_data in service_prefix_data["privileges"].items():
Expand Down
2 changes: 1 addition & 1 deletion tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def query_with_yaml(c):
c.run('echo "Querying the action table"', pty=True)
c.run('./policy_sentry/bin/cli.py query action-table --service ram --fmt yaml', pty=True)
c.run('./policy_sentry/bin/cli.py query action-table --service ram --name tagresource --fmt yaml', pty=True)
c.run('./policy_sentry/bin/cli.py query action-table ''--service ram --access-level permissions-management --fmt yaml', pty=True)
c.run('./policy_sentry/bin/cli.py query action-table --service ram --access-level permissions-management --fmt yaml', pty=True)
c.run('./policy_sentry/bin/cli.py query action-table --service ses --condition ses:FeedbackAddress --fmt yaml', pty=True)
c.run('echo "Querying the ARN table"', pty=True)
c.run('./policy_sentry/bin/cli.py query arn-table --service ssm --fmt yaml', pty=True)
Expand Down
Empty file added test/command/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions test/command/test_query_actions_click.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import unittest
from click.testing import CliRunner
from policy_sentry.command.query import query


class QueryClickUnitTests(unittest.TestCase):
def setUp(self):
self.runner = CliRunner()

def test_query_action_table_basic_with_click(self):
"""command.query.query_action_table: should return exit code 0"""
result = self.runner.invoke(query, ["action-table", "--service", "ram"])
self.assertTrue(result.exit_code == 0)

def test_click_query_action_table(self):
"""command.query.query_action_table: click testing"""
cases = [
["action-table", "--service", "all", "--access-level", "permissions-management"],
["action-table", "--service", "all", "--resource-type", "*"],
["action-table", "--service", "all"],
["action-table", "--service", "ram", "--access-level", "permissions-management"],
["action-table", "--service", "ram", "--access-level", "permissions-management", "--resource-type", "resource-share"],
["action-table", "--service", "ses", "--condition", "ses:FeedbackAddress"],
["action-table", "--service", "ssm", "--resource-type", "*"],
["action-table", "--service", "ram", "--name", "tagresource"],
["action-table", "--service", "ssm", "--resource-type", "parameter"],
["action-table", "--service", "ram"],
]
for case in cases:
result = self.runner.invoke(query, case)
print(result.output)
self.assertTrue(result.exit_code == 0)

def test_click_query_arn_table(self):
"""command.query.query_arn_table: click testing"""
cases = [
["arn-table", "--service", "ssm"],
["arn-table", "--service", "cloud9", "--list-arn-types"],
["arn-table", "--service", "cloud9", "--name", "environment"],
]
for case in cases:
result = self.runner.invoke(query, case)
print(result.output)
self.assertTrue(result.exit_code == 0)

def test_click_query_condition_table(self):
"""command.query.query_condition_table: click testing"""
cases = [
["condition-table", "--service", "cloud9"],
["condition-table", "--service", "cloud9", "--name", "cloud9:Permissions"],
]
for case in cases:
result = self.runner.invoke(query, case)
print(result.output)
self.assertTrue(result.exit_code == 0)
26 changes: 26 additions & 0 deletions test/querying/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
get_all_service_prefixes,
get_all_actions
)
from policy_sentry.command.query import query_action_table


class QueryActionsTestCase(unittest.TestCase):
Expand All @@ -24,3 +25,28 @@ def test_get_all_actions(self):
# performance notes:
# old: 0.112s (without sort)
# new: 0.106s

def test_query_actions_with_access_level_and_wildcard_only(self):
service = "all"
resource_type = "*"
result = query_action_table(
service=service,
resource_type=resource_type,
name=None,
access_level="permissions-management",
condition=None
)
print(len(result))
self.assertTrue(len(result) > 200)

def test_GH_296_query_all_actions_with_wildcard_resources(self):
service = "all"
resource_type = "*"
result = query_action_table(
service=service,
resource_type=resource_type,
name=None,
access_level=None,
condition=None
)
self.assertTrue(len(result) > 3000)