Skip to content

add query command for service data #484

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions policy_sentry/command/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import click
import yaml

from policy_sentry.querying.services import get_services_data
from policy_sentry.util.access_levels import transform_access_level_text
from policy_sentry.querying.all import get_all_service_prefixes
from policy_sentry.querying.arns import (
Expand Down Expand Up @@ -47,8 +49,17 @@ def print_list(output: Any, fmt: str = "json") -> None:


def print_dict(output: Any, fmt: str = "json") -> None:
"""Common method on how to print a dict, depending on whether the user requests JSON or YAML output"""
print(yaml.dump(output)) if fmt == "yaml" else [print(json.dumps(output, indent=4))]
"""Common method on how to print a dict, depending on whether the user requests JSON, YAML or CSV output"""
if fmt == "csv":
if not output:
return None
print(",".join(output[0].keys()))
for entry in output:
print(",".join(entry.values()))
elif fmt == "json":
print(json.dumps(output, indent=4))
elif fmt == "yaml":
print(yaml.dump(output))


@click.group()
Expand Down Expand Up @@ -97,7 +108,7 @@ def query() -> None:
type=click.Choice(["yaml", "json"]),
default="json",
required=False,
help='Format output as YAML or JSON. Defaults to "yaml"',
help='Format output as YAML or JSON. Defaults to "json"',
)
@click.option(
"--verbose",
Expand Down Expand Up @@ -229,7 +240,7 @@ def query_action_table(
type=click.Choice(["yaml", "json"]),
default="json",
required=False,
help='Format output as YAML or JSON. Defaults to "yaml"',
help='Format output as YAML or JSON. Defaults to "json"',
)
@click.option(
"--verbose",
Expand Down Expand Up @@ -296,7 +307,7 @@ def query_arn_table(
type=click.Choice(["yaml", "json"]),
default="json",
required=False,
help='Format output as YAML or JSON. Defaults to "yaml"',
help='Format output as YAML or JSON. Defaults to "json"',
)
@click.option(
"--verbose",
Expand Down Expand Up @@ -334,3 +345,42 @@ def query_condition_table(
output = get_condition_key_details(service, name)
print_dict(output=output, fmt=fmt)
return output


@query.command(short_help="Query the service table.")
@click.option(
"--fmt",
type=click.Choice(["yaml", "json", "csv"]),
default="json",
required=False,
help='Format output as YAML, JSON or CSV. Defaults to "json"',
)
@click.option(
"--verbose",
"-v",
type=click.Choice(
["critical", "error", "warning", "info", "debug"], case_sensitive=False
),
)
def service_table(fmt: str, verbose: str | None) -> None:
"""Query the service table from the Policy Sentry database"""
if verbose:
log_level = getattr(logging, verbose.upper())
set_stream_logger(level=log_level)
query_service_table(fmt)


def query_service_table(fmt: str = "json") -> list[dict[str, str]]:
"""Query the service table from the Policy Sentry database.
Use this one when leveraging Policy Sentry as a library."""
if os.path.exists(LOCAL_DATASTORE_FILE_PATH):
logger.info(
f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/"
)
else:
# Otherwise, leverage the datastore inside the python package
logger.debug("Leveraging the bundled IAM Definition.")

output = get_services_data()
print_dict(output=output, fmt=fmt)
return output
19 changes: 19 additions & 0 deletions policy_sentry/querying/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Methods that execute specific queries against the SQLite database for the SERIVCES table.
This supports the policy_sentry query functionality
"""

from __future__ import annotations

from policy_sentry.shared.iam_data import iam_definition


def get_services_data() -> list[dict[str, str]]:
return [
{
"prefix": service_prefix,
"service_name": data["service_name"],
}
for service_prefix, data in iam_definition.items()
if isinstance(data, dict)
]
16 changes: 16 additions & 0 deletions test/querying/test_query_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import unittest

from policy_sentry.querying.services import get_services_data


class QueryServicesTestCase(unittest.TestCase):
def test_get_services_data(self):
# when
results = get_services_data()

# then
self.assertGreater(len(results), 400) # in 07/24 it was 405

# both should be a non-empty string
self.assertTrue(results[0]["prefix"])
self.assertTrue(results[0]["service_name"])