Source code for infrahouse_toolkit.cli.ih_plan.cmd_min_permissions

"""
.. topic:: ``ih-plan min-permissions``

    A ``ih-plan min-permissions`` subcommand.

    See ``ih-plan min-permissions --help`` for more details.
"""

import json
from json import JSONDecodeError

import click

from infrahouse_toolkit import DEFAULT_OPEN_ENCODING


@click.command(name="min-permissions")
@click.option("--existing-actions", help="A file with permissions.", default=None)
@click.argument("trace_file")
def cmd_min_permissions(existing_actions, trace_file):
    """
    Parse Terraform trace file and produce an action list from the trace.

    The trace file contains entries with AWS actions. The command
    finds the actions and AWS services to generate a list that
    you can add to an AWS policy.
    It's useful to prepare the least privileges policy.

    The output looks similar to this:

    \b
    [
        "ec2:DeleteNatGateway",
        "ec2:DescribeAddresses",
        "ec2:DescribeInternetGateways",
        "ec2:DescribeNatGateways",
    ]

    """
    actions = ActionList()
    if existing_actions:
        actions.load_from_file(existing_actions)

    print(f"## Existing {actions.count} actions:")
    print(actions)

    new_actions = ActionList()
    new_actions.parse_trace(trace_file, existing=actions.actions)
    print(f"## {new_actions.count} new action(s):")
    print(str(new_actions))

    combined_actions = ActionList()
    for action in actions.actions + new_actions.actions:
        combined_actions.add(action)

    print(f"## Old and new actions together excluding duplicates, {combined_actions.count} in total:")
    print(str(combined_actions))


[docs]class ActionList: """ List of AWS actions. Action here is a string as in AWS's policy e.g. ``ec2:DescribeInstances``. """ SERVICE_NAMING_MAP = { "auto scaling": "autoscaling", "elastic load balancing v2": "elasticloadbalancing", "route 53": "route53", "secrets manager": "secretsmanager", "cloudwatch logs": "logs", "eventbridge": "events", } PERMISSION_NAMING_MAP = { "HeadBucket": "ListBucket", "HeadObject": "GetObject", "CreateMultipartUpload": "PutObject", "UploadPart": "PutObject", "CompleteMultipartUpload": "PutObject", "GetBucketAccelerateConfiguration": "GetAccelerateConfiguration", "GetBucketEncryption": "GetEncryptionConfiguration", "GetBucketCors": "GetBucketCORS", "GetBucketLifecycleConfiguration": "GetLifecycleConfiguration", "GetBucketReplication": "GetReplicationConfiguration", "GetObjectLockConfiguration": "GetBucketObjectLockConfiguration", "DeletePublicAccessBlock": "PutBucketPublicAccessBlock", "GetPublicAccessBlock": "GetBucketPublicAccessBlock", "PutPublicAccessBlock": "PutBucketPublicAccessBlock", } # Some permissions require additional ones. REQUIRED_EXTRA_PERMISSIONS_MAP = { "autoscaling:CreateAutoScalingGroup": [ "iam:PassRole", "iam:CreateServiceLinkedRole", "ec2:CreateTags", "ec2:RunInstances", ], "autoscaling:UpdateAutoScalingGroup": ["iam:PassRole"], "elasticloadbalancing:CreateLoadBalancer": ["elasticloadbalancing:AddTags"], "iam:AddRoleToInstanceProfile": ["iam:PassRole"], "iam:CreateInstanceProfile": ["iam:TagInstanceProfile"], "ec2:CreateLaunchTemplate": ["ec2:CreateTags"], "ec2:ImportKeyPair": ["ec2:CreateTags"], "ec2:RunInstances": ["ec2:CreateTags"], "logs:CreateLogGroup": ["logs:TagResource"], "lambda:CreateFunction": ["lambda:TagResource"], "s3:CreateBucket": ["s3:PutBucketTagging"], "s3:PutObject": [ "kms:Decrypt", "kms:CreateGrant", "kms:DescribeKey", "kms:Encrypt", "s3:AbortMultipartUpload", "s3:GetObject", "s3:ListMultipartUploadParts", "s3:PutObjectTagging", ], "events:PutRule": ["events:TagResource"], "events:PutTargets": ["events:TagResource"], } def __init__(self): self._actions = set() @property def actions(self) -> list: """List of action strings.""" return sorted(list(self._actions)) @property def count(self) -> int: """Number of actions in the list.""" return len(self._actions)
[docs] def add(self, action: str): """Add a new action. Convert service name to the AWS policy format and add dependent actions if any.""" norm_action = self._normalize_action(action) self._actions.add(norm_action) for dependency in self.REQUIRED_EXTRA_PERMISSIONS_MAP.get(str(norm_action), []): self._actions.add(dependency)
[docs] def load_from_file(self, file): """Load actions from a file with a JSON. The JSON should be an array of strings.""" with open(file, encoding=DEFAULT_OPEN_ENCODING) as f_desc: for action in json.loads(f_desc.read()): self.add(action)
[docs] def parse_trace(self, file, existing=None): """Inspect a Terraform trace file and collect actions""" existing_permissions = existing or [] with open(file, encoding=DEFAULT_OPEN_ENCODING) as f_decs: for line in f_decs.readlines(): try: operation = json.loads(line) operation_key = "aws.operation" if "aws.operation" in operation else "rpc.method" service_key = "aws.service" if "aws.service" in operation else "rpc.service" if all((operation_key in operation, service_key in operation)): service_name = operation[service_key].lower() permission = self._normalize_action(f"{service_name}:{operation[operation_key]}") if permission not in existing_permissions: self.add(permission) elif all( ( operation.get("tf_resource_type") == "aws_s3_bucket_versioning", operation.get("tf_rpc") == "ApplyResourceChange", ) ): permission = "s3:PutBucketVersioning" elif all( ( operation.get("tf_resource_type") == "aws_s3_bucket_server_side_encryption_configuration", operation.get("tf_rpc") == "ApplyResourceChange", ) ): permission = "s3:PutEncryptionConfiguration" else: continue if permission not in existing_permissions: self.add(permission) except JSONDecodeError: pass
def _normalize_action(self, action): if ":" in action: s_part, a_part = action.split(":") return f"{self.SERVICE_NAMING_MAP.get(s_part, s_part)}:{self.PERMISSION_NAMING_MAP.get(a_part, a_part)}" return action def __str__(self): return json.dumps(self.actions, indent=4)