41 pages ยท 8 sections
Ctrl K
GitHub Portfolio

IAM & RBAC

Identity and Access Management with Role-Based Access Control ensures the right people have the right access to the right resources at the right time. This guide covers AWS IAM, Kubernetes RBAC, multi-tenant SaaS architectures, and just-in-time access patterns.

RBAC Fundamentals

Role-Based Access Control (RBAC) maps permissions to roles, and roles to users. This indirection simplifies administration and enables principle of least privilege at scale.

RBAC Core Model

  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
  โ”‚  User   โ”‚โ”€โ”€โ”€โ”€โ†’โ”‚  Role   โ”‚โ”€โ”€โ”€โ”€โ†’โ”‚ Permission  โ”‚โ”€โ”€โ”€โ”€โ†’โ”‚ Resource โ”‚
  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
       โ”‚               โ”‚                  โ”‚
       โ”‚               โ”‚                  โ”‚
  Attributes:     Attributes:        Attributes:
  - identity      - name             - action (read/write/delete)
  - department    - scope            - resource type
  - manager       - permissions      - conditions
  - tenure        - time-bound

Relationships:
  User โ†’ Role:     Many-to-Many (user can have multiple roles)
  Role โ†’ Permission: Many-to-Many (role has multiple permissions)
  User โ†’ Permission: Indirect (only via role)

RBAC vs ABAC Comparison

DimensionRBACABAC
Access DecisionBased on role membershipBased on attributes of user, resource, environment
GranularityCoarse (role-level)Fine-grained (attribute-level)
ScalabilityRole explosion at large scaleScales better with complex environments
ManagementSimpler to understand and auditComplex policy evaluation engine required
Dynamic AccessStatic role assignmentsDynamic based on real-time attributes
Best ForStructured organizations, clear job functionsComplex, multi-dimensional access requirements
HybridStart with RBAC, add ABAC conditions for edge cases (RBAC + ABAC)

AWS IAM

Least Privilege Policy Examples

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "S3ReadOnlyApplicationBucket",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:GetObjectVersion",
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Resource": [
        "arn:aws:s3:::company-app-data-prod",
        "arn:aws:s3:::company-app-data-prod/*"
      ],
      "Condition": {
        "StringEquals": {
          "aws:RequestedRegion": ["us-east-1", "us-west-2"],
          "s3:x-amz-server-side-encryption": "AES256"
        },
        "Bool": {
          "aws:MultiFactorAuthPresent": "true"
        },
        "DateGreaterThan": {
          "aws:CurrentTime": "2024-01-01T00:00:00Z"
        }
      }
    },
    {
      "Sid": "EC2LimitedAccess",
      "Effect": "Allow",
      "Action": [
        "ec2:DescribeInstances",
        "ec2:DescribeVolumes",
        "ec2:DescribeSnapshots",
        "ec2:CreateTags"
      ],
      "Resource": "*",
      "Condition": {
        "StringEquals": {
          "aws:PrincipalTag/Environment": "${aws:ResourceTag/Environment}"
        },
        "ForAllValues:StringEquals": {
          "aws:TagKeys": ["Environment", "Application", "Owner"]
        }
      }
    },
    {
      "Sid": "DenyNonVPCAccess",
      "Effect": "Deny",
      "Action": [
        "ec2:RunInstances",
        "rds:CreateDBInstance"
      ],
      "Resource": "*",
      "Condition": {
        "Bool": {
          "ec2:Vpc": "false"
        }
      }
    },
    {
      "Sid": "DenyUnencryptedStorage",
      "Effect": "Deny",
      "Action": [
        "s3:PutObject",
        "ebs:CreateVolume",
        "rds:CreateDBInstance"
      ],
      "Resource": "*",
      "Condition": {
        "StringNotEquals": {
          "s3:x-amz-server-side-encryption": "AES256",
          "ebs:Encrypted": "true"
        }
      }
    }
  ]
}

IAM Permission Boundaries

Permission boundaries define the maximum permissions a role can have, regardless of what its identity-based policies grant:

# terraform/iam-permission-boundary.tf

# Permission boundary for developer roles
resource "aws_iam_policy" "developer_boundary" {
  name        = "DeveloperPermissionBoundary"
  description = "Maximum permissions for developer roles"
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "AllowDevelopmentServices"
        Effect = "Allow"
        Action = [
          "ec2:*",
          "s3:*",
          "rds:*",
          "elasticache:*",
          "sqs:*",
          "sns:*",
          "lambda:*",
          "logs:*",
          "cloudwatch:*"
        ]
        Resource = "*"
        Condition = {
          StringEquals = {
            "aws:RequestedRegion" = ["us-east-1", "us-west-2"]
          }
        }
      },
      {
        Sid    = "DenyProductionResources"
        Effect = "Deny"
        Action = "*"
        Resource = "*"
        Condition = {
          StringEquals = {
            "aws:ResourceTag/Environment" = "production"
          }
        }
      },
      {
        Sid    = "DenyDangerousActions"
        Effect = "Deny"
        Action = [
          "iam:CreateUser",
          "iam:DeleteUser",
          "iam:CreateAccessKey",
          "iam:DeleteAccountPasswordPolicy",
          "iam:AttachUserPolicy",
          "iam:DetachUserPolicy",
          "organizations:LeaveOrganization",
          "account:CloseAccount"
        ]
        Resource = "*"
      },
      {
        Sid    = "DenyUnencrypted",
        Effect = "Deny"
        Action = [
          "s3:PutObject"
        ]
        Resource = "*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "AES256"
          }
        }
      }
    ]
  })
}

# Apply boundary to developer role
resource "aws_iam_role" "developer" {
  name               = "DeveloperRole"
  path               = "/"
  assume_role_policy = data.aws_iam_policy_document.trust_policy.json
  
  permissions_boundary = aws_iam_policy.developer_boundary.arn
  
  managed_policy_arns = [
    aws_iam_policy.developer_policy.arn
  ]
}

Service Control Policies (SCPs) for Organizations

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DenyLeavingOrganization",
      "Effect": "Deny",
      "Action": [
        "organizations:LeaveOrganization"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DenyAccountClosure",
      "Effect": "Deny",
      "Action": [
        "account:CloseAccount",
        "account:DeleteAlternateContact"
      ],
      "Resource": "*"
    },
    {
      "Sid": "RequireMFAForConsoleAccess",
      "Effect": "Deny",
      "Action": [
        "iam:CreateAccessKey",
        "iam:CreateUser",
        "iam:PutUserPolicy"
      ],
      "Resource": "*",
      "Condition": {
        "BoolIfExists": {
          "aws:MultiFactorAuthPresent": "false"
        }
      }
    },
    {
      "Sid": "DenyUnusedRegions",
      "Effect": "Deny",
      "NotAction": [
        "iam:*",
        "organizations:*",
        "account:*",
        "support:*",
        "billing:*",
        "ce:*"
      ],
      "Resource": "*",
      "Condition": {
        "StringNotEquals": {
          "aws:RequestedRegion": [
            "us-east-1",
            "us-west-2",
            "eu-west-1"
          ]
        }
      }
    },
    {
      "Sid": "RequireEncryptedEBS",
      "Effect": "Deny",
      "Action": [
        "ec2:CreateVolume",
        "ec2:RunInstances"
      ],
      "Resource": [
        "arn:aws:ec2:*:*:volume/*"
      ],
      "Condition": {
        "Bool": {
          "ec2:Encrypted": "false"
        }
      }
    },
    {
      "Sid": "DenyPublicS3Buckets",
      "Effect": "Deny",
      "Action": [
        "s3:PutBucketAcl",
        "s3:PutBucketPolicy"
      ],
      "Resource": "*",
      "Condition": {
        "StringEquals": {
          "s3:x-amz-acl": ["public-read", "public-read-write"]
        }
      }
    }
  ]
}

Multi-Tenant RBAC Architecture

Tenant Isolation Patterns

PatternIsolation LevelCostComplexityUse Case
Shared EverythingRow-level in shared DBLowestLowStartups, small SaaS
Shared DB, Separate SchemaSchema-level isolationLowMediumMid-market SaaS
Database-per-TenantFull DB isolationMediumHighEnterprise SaaS, compliance
Account-per-TenantFull AWS account isolationHighestHighestGovCloud, regulated industries

Complete RBAC Model for SaaS Platform

# rbac_model.py โ€” Multi-tenant RBAC implementation
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import List, Set, Optional, Dict
from functools import wraps
import time

class Permission(Enum):
    """Fine-grained permissions for SaaS platform."""
    # User management
    USER_READ = auto()
    USER_CREATE = auto()
    USER_UPDATE = auto()
    USER_DELETE = auto()
    USER_INVITE = auto()
    
    # Resource management
    RESOURCE_READ = auto()
    RESOURCE_CREATE = auto()
    RESOURCE_UPDATE = auto()
    RESOURCE_DELETE = auto()
    RESOURCE_SHARE = auto()
    
    # Billing
    BILLING_READ = auto()
    BILLING_UPDATE = auto()
    BILLING_MANAGE = auto()
    
    # Administration
    TENANT_SETTINGS_READ = auto()
    TENANT_SETTINGS_UPDATE = auto()
    MEMBER_MANAGE = auto()
    ROLE_MANAGE = auto()
    AUDIT_LOG_READ = auto()
    
    # API keys
    API_KEY_READ = auto()
    API_KEY_CREATE = auto()
    API_KEY_REVOKE = auto()

# Pre-defined roles with permission sets
ROLE_DEFINITIONS = {
    "owner": {
        "permissions": list(Permission),
        "is_admin": True,
        "max_count_per_tenant": 2
    },
    "admin": {
        "permissions": [
            p for p in Permission 
            if p not in [Permission.ROLE_MANAGE, Permission.BILLING_MANAGE]
        ],
        "is_admin": True,
        "max_count_per_tenant": 5
    },
    "editor": {
        "permissions": [
            Permission.RESOURCE_READ, Permission.RESOURCE_CREATE,
            Permission.RESOURCE_UPDATE, Permission.RESOURCE_SHARE,
            Permission.USER_READ
        ],
        "is_admin": False,
        "max_count_per_tenant": None
    },
    "viewer": {
        "permissions": [
            Permission.RESOURCE_READ, Permission.USER_READ
        ],
        "is_admin": False,
        "max_count_per_tenant": None
    },
    "billing_admin": {
        "permissions": [
            Permission.BILLING_READ, Permission.BILLING_UPDATE,
            Permission.BILLING_MANAGE, Permission.AUDIT_LOG_READ
        ],
        "is_admin": False,
        "max_count_per_tenant": 2
    }
}

@dataclass
class TenantContext:
    """Request context with tenant isolation."""
    tenant_id: str
    user_id: str
    user_email: str
    roles: List[str]
    permissions: Set[Permission]
    is_admin: bool
    issued_at: float = field(default_factory=time.time)
    
    @property
    def is_expired(self) -> bool:
        return time.time() - self.issued_at > 3600  # 1 hour
    
    def has_permission(self, permission: Permission) -> bool:
        return permission in self.permissions or self.is_admin
    
    def has_any_permission(self, permissions: List[Permission]) -> bool:
        return any(p in self.permissions for p in permissions) or self.is_admin

class RBACService:
    """Multi-tenant RBAC service with caching."""
    
    def __init__(self):
        self._tenant_roles: Dict[str, Dict[str, List[str]]] = {}
        self._role_cache: Dict[str, Set[Permission]] = {}
    
    def create_tenant_context(
        self, tenant_id: str, user_id: str, 
        user_email: str, assigned_roles: List[str]
    ) -> TenantContext:
        """Build tenant context from user's roles."""
        all_permissions: Set[Permission] = set()
        is_admin = False
        
        for role_name in assigned_roles:
            role_def = ROLE_DEFINITIONS.get(role_name)
            if role_def:
                all_permissions.update(role_def["permissions"])
                if role_def["is_admin"]:
                    is_admin = True
        
        return TenantContext(
            tenant_id=tenant_id,
            user_id=user_id,
            user_email=user_email,
            roles=assigned_roles,
            permissions=all_permissions,
            is_admin=is_admin
        )
    
    def check_permission(self, ctx: TenantContext, permission: Permission) -> None:
        """Raise if user lacks permission."""
        if ctx.is_expired:
            raise PermissionError("Session expired")
        if not ctx.has_permission(permission):
            raise PermissionError(
                f"User {ctx.user_id} lacks permission {permission.name} "
                f"in tenant {ctx.tenant_id}"
            )
    
    def can_access_resource(
        self, ctx: TenantContext, 
        resource_tenant_id: str,
        resource_owner_id: Optional[str] = None
    ) -> bool:
        """Check cross-tenant access with ownership override."""
        # Tenant isolation
        if ctx.tenant_id != resource_tenant_id:
            return False
        
        # Admin can access all
        if ctx.is_admin:
            return True
        
        # Owner can access own resources
        if resource_owner_id and ctx.user_id == resource_owner_id:
            return True
        
        return True  # Same tenant, checked permissions elsewhere


def require_permission(permission: Permission):
    """Decorator to enforce permission checks on API endpoints."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            ctx = kwargs.get('ctx') or kwargs.get('tenant_context')
            if not ctx:
                raise PermissionError("Tenant context required")
            
            RBACService().check_permission(ctx, permission)
            return func(*args, **kwargs)
        return wrapper
    return decorator


# Usage example
rbac = RBACService()

# Create context for a user
ctx = rbac.create_tenant_context(
    tenant_id="tenant_abc123",
    user_id="user_def456",
    user_email="admin@client.com",
    assigned_roles=["admin"]
)

# Enforce permission check
@require_permission(Permission.RESOURCE_DELETE)
def delete_resource(resource_id: str, *, ctx: TenantContext):
    print(f"Deleting {resource_id} in tenant {ctx.tenant_id}")

# This will succeed
delete_resource("res_123", ctx=ctx)

# Create viewer context โ€” this would fail for delete
viewer_ctx = rbac.create_tenant_context(
    tenant_id="tenant_abc123",
    user_id="user_xyz789",
    user_email="viewer@client.com",
    assigned_roles=["viewer"]
)

# This will raise PermissionError
try:
    delete_resource("res_123", ctx=viewer_ctx)
except PermissionError as e:
    print(f"Access denied: {e}")

Kubernetes RBAC

Roles and ClusterRoles

# k8s/rbac/developer-role.yaml
# Namespace-scoped role for developers
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: production
  name: developer
rules:
  # Pods โ€” read and logs only
  - apiGroups: [""]
    resources: ["pods", "pods/log"]
    verbs: ["get", "list", "watch"]
  
  # Deployments โ€” read only
  - apiGroups: ["apps"]
    resources: ["deployments", "replicasets"]
    verbs: ["get", "list", "watch"]
  
  # Services โ€” read only
  - apiGroups: [""]
    resources: ["services", "endpoints"]
    verbs: ["get", "list", "watch"]
  
  # ConfigMaps โ€” read only (secrets explicitly excluded)
  - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["get", "list"]
  
  # Jobs โ€” create for one-off tasks
  - apiGroups: ["batch"]
    resources: ["jobs"]
    verbs: ["create", "get", "list", "watch"]
  
  # No access to: secrets, persistentvolumes, nodes, clusterroles, serviceaccounts
---
# Cluster-scoped read-only role
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: cluster-reader
rules:
  - apiGroups: [""]
    resources: ["nodes", "namespaces", "persistentvolumes"]
    verbs: ["get", "list", "watch"]
  - apiGroups: ["rbac.authorization.k8s.io"]
    resources: ["clusterroles", "clusterrolebindings", "roles", "rolebindings"]
    verbs: ["get", "list", "watch"]
  - apiGroups: ["metrics.k8s.io"]
    resources: ["nodes", "pods"]
    verbs: ["get", "list"]
# k8s/rbac/bindings.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: developer-binding
  namespace: production
subjects:
  - kind: Group
    name: engineering
    apiGroup: rbac.authorization.k8s.io
  - kind: User
    name: alice@company.com
    apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: Role
  name: developer
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: cluster-reader-binding
subjects:
  - kind: Group
    name: sre-team
    apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: ClusterRole
  name: cluster-reader
  apiGroup: rbac.authorization.k8s.io
---
# Service account for CI/CD with minimal permissions
apiVersion: v1
kind: ServiceAccount
metadata:
  name: cicd-deployer
  namespace: production
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::123456789:role/CICDDeployerRole
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: cicd-deployer
  namespace: production
rules:
  - apiGroups: ["apps"]
    resources: ["deployments"]
    verbs: ["get", "list", "watch", "patch"]
  - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io
kind: RoleBinding
metadata:
  name: cicd-deployer-binding
  namespace: production
subjects:
  - kind: ServiceAccount
    name: cicd-deployer
    namespace: production
roleRef:
  kind: Role
  name: cicd-deployer
  apiGroup: rbac.authorization.k8s.io

Just-in-Time (JIT) Access

JIT access provides temporary elevated permissions on-demand, reducing standing privileged access.

#!/usr/bin/env python3
"""
jit_access.py โ€” Just-in-Time access request and approval system.
Integrates with AWS IAM and Slack for approvals.
"""

import boto3
import json
import time
from datetime import datetime, timedelta
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import requests

class JITStatus(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    DENIED = "denied"
    EXPIRED = "expired"
    REVOKED = "revoked"

@dataclass
class JITRequest:
    request_id: str
    requester_email: str
    role_arn: str
    reason: str
    requested_duration_minutes: int
    status: JITStatus
    approver_email: Optional[str] = None
    approved_at: Optional[float] = None
    expires_at: Optional[float] = None
    ticket_reference: Optional[str] = None

class JITAccessManager:
    """Manages just-in-time elevated access."""
    
    JIT_ROLES = {
        "production-read": {
            "arn": "arn:aws:iam::123456789:role/JIT-ProductionRead",
            "max_duration": 240,  # 4 hours
            "requires_approval": True,
            "auto_approvers": ["sre-lead@company.com"]
        },
        "production-write": {
            "arn": "arn:aws:iam::123456789:role/JIT-ProductionWrite",
            "max_duration": 120,  # 2 hours
            "requires_approval": True,
            "auto_approvers": []
        },
        "database-admin": {
            "arn": "arn:aws:iam::123456789:role/JIT-DBAdmin",
            "max_duration": 60,   # 1 hour
            "requires_approval": True,
            "auto_approvers": []
        },
        "security-audit": {
            "arn": "arn:aws:iam::123456789:role/JIT-SecurityAudit",
            "max_duration": 480,  # 8 hours
            "requires_approval": False,  # Auto-approved for security team
            "auto_approvers": ["*@security.company.com"]
        }
    }
    
    def __init__(self):
        self.sts = boto3.client('sts')
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table('jit-access-requests')
        self.slack_webhook = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
    
    def request_access(self, requester_email: str, role_name: str,
                       reason: str, duration: int, 
                       ticket_ref: Optional[str] = None) -> JITRequest:
        """Request JIT access to a role."""
        
        if role_name not in self.JIT_ROLES:
            raise ValueError(f"Unknown JIT role: {role_name}")
        
        role_config = self.JIT_ROLES[role_name]
        
        if duration > role_config["max_duration"]:
            raise ValueError(
                f"Max duration for {role_name} is {role_config['max_duration']} minutes"
            )
        
        request_id = f"jit-{int(time.time())}-{hash(requester_email) % 10000:04d}"
        
        request = JITRequest(
            request_id=request_id,
            requester_email=requester_email,
            role_arn=role_config["arn"],
            reason=reason,
            requested_duration_minutes=duration,
            status=JITStatus.PENDING,
            ticket_reference=ticket_ref
        )
        
        # Store request
        self.table.put_item(Item={
            'request_id': request_id,
            'requester': requester_email,
            'role': role_name,
            'reason': reason,
            'duration': duration,
            'status': 'pending',
            'created_at': int(time.time()),
            'ticket_ref': ticket_ref
        })
        
        # Send approval request
        if role_config["requires_approval"]:
            self._send_approval_request(request, role_name)
        else:
            # Auto-approve
            self.approve_request(request_id, "auto-approver@system")
        
        return request
    
    def approve_request(self, request_id: str, 
                        approver_email: str) -> dict:
        """Approve a JIT request and provision temporary credentials."""
        
        # Get request
        response = self.table.get_item(Key={'request_id': request_id})
        item = response.get('Item')
        if not item:
            raise ValueError(f"Request {request_id} not found")
        
        if item['status'] != 'pending':
            raise ValueError(f"Request is not pending: {item['status']}")
        
        # Assume the JIT role
        duration_seconds = item['duration'] * 60
        assumed = self.sts.assume_role(
            RoleArn=item['role_arn'],
            RoleSessionName=f"jit-{item['requester']}-{request_id}",
            DurationSeconds=min(duration_seconds, 3600)
        )
        
        credentials = assumed['Credentials']
        
        # Update request
        self.table.update_item(
            Key={'request_id': request_id},
            UpdateExpression="SET #status = :status, approver = :approver, "
                           "approved_at = :approved_at, expires_at = :expires_at",
            ExpressionAttributeNames={'#status': 'status'},
            ExpressionAttributeValues={
                ':status': 'approved',
                ':approver': approver_email,
                ':approved_at': int(time.time()),
                ':expires_at': int(credentials['Expiration'].timestamp())
            }
        )
        
        return {
            'request_id': request_id,
            'access_key_id': credentials['AccessKeyId'],
            'secret_access_key': credentials['SecretAccessKey'],
            'session_token': credentials['SessionToken'],
            'expires_at': credentials['Expiration'].isoformat()
        }
    
    def revoke_request(self, request_id: str) -> None:
        """Revoke an active JIT session."""
        self.table.update_item(
            Key={'request_id': request_id},
            UpdateExpression="SET #status = :status",
            ExpressionAttributeNames={'#status': 'status'},
            ExpressionAttributeValues={':status': 'revoked'}
        )
    
    def _send_approval_request(self, request: JITRequest, role_name: str) -> None:
        """Send Slack approval request."""
        message = {
            "text": "JIT Access Request",
            "blocks": [
                {
                    "type": "header",
                    "text": {"type": "plain_text", "text": "๐Ÿ” JIT Access Request"}
                },
                {
                    "type": "section",
                    "fields": [
                        {"type": "mrkdwn", "text": f"*Requester:*\n{request.requester_email}"},
                        {"type": "mrkdwn", "text": f"*Role:*\n{role_name}"},
                        {"type": "mrkdwn", "text": f"*Duration:*\n{request.requested_duration_minutes} minutes"},
                        {"type": "mrkdwn", "text": f"*Ticket:*\n{request.ticket_reference or 'N/A'}"}
                    ]
                },
                {"type": "section", "text": {"type": "mrkdwn", "text": f"*Reason:* {request.reason}"}},
                {
                    "type": "actions",
                    "elements": [
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "โœ… Approve"},
                            "style": "primary",
                            "value": f"approve:{request.request_id}",
                            "action_id": "approve_jit"
                        },
                        {
                            "type": "button",
                            "text": {"type": "plain_text", "text": "โŒ Deny"},
                            "style": "danger",
                            "value": f"deny:{request.request_id}",
                            "action_id": "deny_jit"
                        }
                    ]
                }
            ]
        }
        requests.post(self.slack_webhook, json=message)

Access Review and Recertification

# access-review-process.yaml
access_review_process:
  frequency:
    privileged_accounts: "quarterly"
    standard_accounts: "semi-annually"
    third_party_access: "quarterly"
    service_accounts: "annually"
    
  workflow:
    step_1_notification:
      action: "Email manager with list of direct reports and their access"
      timing: "15 days before review deadline"
      
    step_2_manager_review:
      action: "Manager validates each access assignment"
      requires: "Justification for each role/permission"
      outcomes: ["certify", "revoke", "modify"]
      
    step_3_automation:
      action: "Automated removal of un-certified access"
      timing: "7 days after deadline"
      exceptions: "CISO approval required to delay"
      
    step_4_audit:
      action: "Compliance team reviews completion rates"
      evidence: "Signed attestations stored for 7 years"
  
  escalation:
    level_1: "Reminder at T-7 days"
    level_2: "Manager's manager notified at T-3 days"
    level_3: "HR and security team at T-1 day"
    level_4: "Auto-revocation at T+7 days"

Automated User Lifecycle Management

#!/usr/bin/env python3
"""
user_lifecycle_manager.py โ€” Automated user provisioning/deprovisioning.
Integrates with Okta/Entra ID, AWS IAM, and internal systems.
"""

import boto3
import json
import requests
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional, Dict
from enum import Enum
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LifecycleAction(Enum):
    PROVISION = "provision"
    UPDATE = "update"
    DISABLE = "disable"
    DELETE = "delete"
    TRANSFER = "transfer"

@dataclass
class UserRecord:
    email: str
    first_name: str
    last_name: str
    department: str
    job_title: str
    manager_email: str
    start_date: Optional[datetime] = None
    end_date: Optional[datetime] = None
    aws_username: Optional[str] = None
    groups: List[str] = None

class UserLifecycleManager:
    """Automates user provisioning and deprovisioning across systems."""
    
    def __init__(self):
        self.iam = boto3.client('iam')
        self.identitystore = boto3.client('identitystore')
        self.ssoadmin = boto3.client('sso-admin')
        self.okta_client = None  # Initialize with Okta SDK
        
        self.IDENTITY_STORE_ID = "d-1234567890"
        self.SSO_INSTANCE_ARN = "arn:aws:sso:::instance/ssoins-1234567890"
    
    def provision_user(self, user: UserRecord) -> Dict:
        """Provision a new user across all systems."""
        logger.info(f"Provisioning user: {user.email}")
        results = {}
        
        # 1. Create in AWS IAM Identity Center (SSO)
        results['sso'] = self._create_sso_user(user)
        
        # 2. Assign to groups based on role
        results['groups'] = self._assign_sso_groups(
            results['sso']['user_id'], user
        )
        
        # 3. Assign AWS permission sets
        results['permissions'] = self._assign_permission_sets(
            results['sso']['user_id'], user
        )
        
        # 4. Create AWS IAM user (for CLI/programmatic access)
        results['iam'] = self._create_iam_user(user)
        
        # 5. Add to security groups
        results['security_groups'] = self._configure_security_groups(user)
        
        # 6. Send welcome email
        self._send_welcome_email(user, results)
        
        logger.info(f"Provisioning complete for {user.email}")
        return results
    
    def _create_sso_user(self, user: UserRecord) -> Dict:
        """Create user in AWS IAM Identity Center."""
        response = self.identitystore.create_user(
            IdentityStoreId=self.IDENTITY_STORE_ID,
            UserName=user.email,
            DisplayName=f"{user.first_name} {user.last_name}",
            Name={
                'FamilyName': user.last_name,
                'GivenName': user.first_name
            },
            Emails=[{
                'Value': user.email,
                'Type': 'Work',
                'Primary': True
            }]
        )
        return {
            'user_id': response['UserId'],
            'status': 'created'
        }
    
    def _assign_sso_groups(self, user_id: str, user: UserRecord) -> List[str]:
        """Assign user to SSO groups based on department and role."""
        group_mapping = {
            'engineering': ['Engineering-All', 'AWS-Developer-Access'],
            'platform': ['Platform-Team', 'AWS-Admin-Access'],
            'security': ['Security-Team', 'AWS-Security-Access', 'Wazuh-Admin'],
            'finance': ['Finance-Team', 'AWS-ReadOnly-Access'],
            'sales': ['Sales-Team', 'AWS-ReadOnly-Access'],
        }
        
        groups = group_mapping.get(user.department.lower(), ['AWS-ReadOnly-Access'])
        
        assigned = []
        for group_name in groups:
            try:
                # Get group ID (would be cached in production)
                group_id = self._get_group_id(group_name)
                self.identitystore.create_group_membership(
                    IdentityStoreId=self.IDENTITY_STORE_ID,
                    GroupId=group_id,
                    MemberId={'UserId': user_id}
                )
                assigned.append(group_name)
            except Exception as e:
                logger.warning(f"Failed to assign group {group_name}: {e}")
        
        return assigned
    
    def _assign_permission_sets(self, user_id: str, user: UserRecord) -> List[str]:
        """Assign AWS permission sets based on role."""
        permission_set_mapping = {
            'engineer': ['ViewOnlyAccess', 'DeveloperAccess'],
            'senior_engineer': ['ViewOnlyAccess', 'PowerUserAccess'],
            'platform_engineer': ['ViewOnlyAccess', 'AdministratorAccess'],
            'security_engineer': ['ViewOnlyAccess', 'SecurityAudit', 'ReadOnlyAccess'],
        }
        
        role_key = user.job_title.lower().replace(' ', '_')
        permission_sets = permission_set_mapping.get(
            role_key, ['ViewOnlyAccess']
        )
        
        assigned = []
        for ps_name in permission_sets:
            try:
                ps_arn = self._get_permission_set_arn(ps_name)
                self.ssoadmin.create_account_assignment(
                    InstanceArn=self.SSO_INSTANCE_ARN,
                    PermissionSetArn=ps_arn,
                    PrincipalId=user_id,
                    PrincipalType='USER',
                    TargetId='123456789012',  # AWS Account ID
                    TargetType='AWS_ACCOUNT'
                )
                assigned.append(ps_name)
            except Exception as e:
                logger.warning(f"Failed to assign permission set {ps_name}: {e}")
        
        return assigned
    
    def _create_iam_user(self, user: UserRecord) -> Dict:
        """Create IAM user for programmatic access."""
        username = user.email.split('@')[0]
        
        try:
            self.iam.create_user(
                UserName=username,
                Tags=[
                    {'Key': 'Department', 'Value': user.department},
                    {'Key': 'Manager', 'Value': user.manager_email},
                    {'Key': 'Email', 'Value': user.email},
                    {'Key': 'CreatedBy', 'Value': 'automation'}
                ]
            )
            
            # Attach policies based on role
            policy_arns = self._get_policies_for_role(user.job_title)
            for policy_arn in policy_arns:
                self.iam.attach_user_policy(
                    UserName=username,
                    PolicyArn=policy_arn
                )
            
            return {'username': username, 'status': 'created'}
            
        except self.iam.exceptions.EntityAlreadyExistsException:
            return {'username': username, 'status': 'already_exists'}
    
    def deprovision_user(self, email: str, transfer_to: Optional[str] = None) -> Dict:
        """Deprovision a user โ€” the critical offboarding process."""
        logger.info(f"Deprovisioning user: {email}")
        results = {}
        username = email.split('@')[0]
        
        # 1. Disable in SSO immediately (fastest protection)
        try:
            user_id = self._get_sso_user_id(email)
            self.identitystore.delete_user(
                IdentityStoreId=self.IDENTITY_STORE_ID,
                UserId=user_id
            )
            results['sso'] = 'disabled'
            logger.info(f"SSO user disabled: {email}")
        except Exception as e:
            logger.error(f"SSO deprovisioning failed: {e}")
            results['sso'] = f'error: {str(e)}'
        
        # 2. Delete IAM access keys
        try:
            keys = self.iam.list_access_keys(UserName=username)
            for key in keys['AccessKeyMetadata']:
                self.iam.update_access_key(
                    UserName=username,
                    AccessKeyId=key['AccessKeyId'],
                    Status='Inactive'
                )
                logger.info(f"Disabled access key: {key['AccessKeyId']}")
            results['access_keys'] = 'disabled'
        except Exception as e:
            logger.error(f"Access key disable failed: {e}")
        
        # 3. Remove from all IAM groups
        try:
            groups = self.iam.list_groups_for_user(UserName=username)
            for group in groups['Groups']:
                self.iam.remove_user_from_group(
                    GroupName=group['GroupName'],
                    UserName=username
                )
            results['iam_groups'] = 'removed'
        except Exception as e:
            logger.error(f"IAM group removal failed: {e}")
        
        # 4. Delete IAM login profile (console access)
        try:
            self.iam.delete_login_profile(UserName=username)
            results['console_access'] = 'disabled'
        except self.iam.exceptions.NoSuchEntityException:
            results['console_access'] = 'already_disabled'
        except Exception as e:
            logger.error(f"Console access disable failed: {e}")
        
        # 5. Transfer ownership of resources if specified
        if transfer_to:
            results['transfer'] = self._transfer_resources(email, transfer_to)
        
        # 6. Schedule IAM user deletion (after 30-day grace period)
        # In production: publish to SQS for delayed deletion
        results['iam_user'] = 'scheduled_for_deletion'
        
        # 7. Revoke all active sessions
        try:
            # Create explicit deny policy to block any existing sessions
            deny_policy = {
                "Version": "2012-10-17",
                "Statement": [{
                    "Effect": "Deny",
                    "Action": "*",
                    "Resource": "*"
                }]
            }
            self.iam.put_user_policy(
                UserName=username,
                PolicyName='ExplicitDenyAll',
                PolicyDocument=json.dumps(deny_policy)
            )
            results['sessions'] = 'revoked'
        except Exception as e:
            logger.error(f"Session revocation failed: {e}")
        
        # 8. Audit log
        self._log_deprovisioning(email, results, transfer_to)
        
        logger.info(f"Deprovisioning complete for {email}")
        return results
    
    def _get_group_id(self, group_name: str) -> str:
        """Look up SSO group ID by name."""
        # In production: use cached lookup
        response = self.identitystore.list_groups(
            IdentityStoreId=self.IDENTITY_STORE_ID,
            Filters=[{'AttributePath': 'DisplayName', 'AttributeValue': group_name}]
        )
        return response['Groups'][0]['GroupId']
    
    def _get_permission_set_arn(self, name: str) -> str:
        """Look up permission set ARN by name."""
        response = self.ssoadmin.list_permission_sets(
            InstanceArn=self.SSO_INSTANCE_ARN
        )
        for ps_arn in response['PermissionSets']:
            desc = self.ssoadmin.describe_permission_set(
                InstanceArn=self.SSO_INSTANCE_ARN,
                PermissionSetArn=ps_arn
            )
            if desc['PermissionSet']['Name'] == name:
                return ps_arn
        raise ValueError(f"Permission set not found: {name}")
    
    def _get_sso_user_id(self, email: str) -> str:
        """Look up SSO user ID by email."""
        response = self.identitystore.list_users(
            IdentityStoreId=self.IDENTITY_STORE_ID,
            Filters=[{'AttributePath': 'UserName', 'AttributeValue': email}]
        )
        return response['Users'][0]['UserId']
    
    def _get_policies_for_role(self, job_title: str) -> List[str]:
        """Map job title to IAM policies."""
        policy_map = {
            'engineer': [
                'arn:aws:iam::aws:policy/ReadOnlyAccess',
                'arn:aws:iam::123456789:policy/DeveloperAccess'
            ],
            'security_engineer': [
                'arn:aws:iam::aws:policy/ReadOnlyAccess',
                'arn:aws:iam::aws:policy/SecurityAudit'
            ],
        }
        return policy_map.get(job_title.lower().replace(' ', '_'), 
                             ['arn:aws:iam::aws:policy/ReadOnlyAccess'])
    
    def _transfer_resources(self, from_email: str, to_email: str) -> Dict:
        """Transfer resource ownership during offboarding."""
        # Implementation: Update S3 bucket ownership, reassign EC2 tags, etc.
        return {'status': 'completed', 'resources_updated': 0}
    
    def _configure_security_groups(self, user: UserRecord) -> List[str]:
        """Add user to security/monitoring groups."""
        return []
    
    def _send_welcome_email(self, user: UserRecord, results: Dict) -> None:
        """Send welcome email with access instructions."""
        pass
    
    def _log_deprovisioning(self, email: str, results: Dict, 
                           transfer_to: Optional[str]) -> None:
        """Log deprovisioning for audit trail."""
        audit_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'action': 'user_deprovision',
            'user_email': email,
            'transfer_to': transfer_to,
            'results': results,
            'source': 'automation'
        }
        logger.info(f"AUDIT: {json.dumps(audit_entry)}")


# Usage
if __name__ == '__main__':
    manager = UserLifecycleManager()
    
    # Provision new hire
    new_hire = UserRecord(
        email="john.doe@company.com",
        first_name="John",
        last_name="Doe",
        department="engineering",
        job_title="engineer",
        manager_email="jane.smith@company.com"
    )
    # manager.provision_user(new_hire)
    
    # Deprovision terminated employee
    # manager.deprovision_user("former.employee@company.com")

AWS IAM Identity Center (SSO)

# terraform/aws-sso.tf

# Permission sets
resource "aws_ssoadmin_permission_set" "developer" {
  name             = "DeveloperAccess"
  description      = "Standard developer access to AWS"
  instance_arn     = local.sso_instance_arn
  session_duration = "PT8H"
  
  tags = {
    Environment = "all"
  }
}

resource "aws_ssoadmin_managed_policy_attachment" "developer_readonly" {
  instance_arn       = local.sso_instance_arn
  managed_policy_arn = "arn:aws:iam::aws:policy/ReadOnlyAccess"
  permission_set_arn = aws_ssoadmin_permission_set.developer.arn
}

resource "aws_ssoadmin_permission_set_inline_policy" "developer_custom" {
  instance_arn       = local.sso_instance_arn
  permission_set_arn = aws_ssoadmin_permission_set.developer.arn
  inline_policy      = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "DeveloperEC2"
        Effect = "Allow"
        Action = [
          "ec2:StartInstances",
          "ec2:StopInstances",
          "ec2:RebootInstances",
          "ec2:CreateTags"
        ]
        Resource = "*"
        Condition = {
          StringEquals = {
            "ec2:ResourceTag/Team" = "$${aws:PrincipalTag/Team}"
          }
        }
      },
      {
        Sid    = "DeveloperLambda"
        Effect = "Allow"
        Action = [
          "lambda:UpdateFunctionCode",
          "lambda:UpdateFunctionConfiguration",
          "lambda:InvokeFunction"
        ]
        Resource = "arn:aws:lambda:*:*:function:team-*"
      }
    ]
  })
}

# Account assignments
resource "aws_ssoadmin_account_assignment" "developer_prod" {
  instance_arn       = local.sso_instance_arn
  permission_set_arn = aws_ssoadmin_permission_set.developer.arn
  principal_id       = aws_identitystore_group.engineers.group_id
  principal_type     = "GROUP"
  target_id          = "123456789012"  # Production account
  target_type        = "AWS_ACCOUNT"
}

Privileged Access Management (PAM) Principles

PrincipleImplementation
Least PrivilegeGrant minimum access needed for task duration
Need-to-KnowAccess only data required for specific function
Separation of DutiesRequire two people for critical operations
Just-in-Time AccessElevated permissions expire automatically
Complete Audit TrailLog every privileged action with session recording
Regular RecertificationRe-validate all privileged access quarterly
Emergency AccessBreak-glass procedures with immediate notification

Never Use Root Account

The AWS root account should be locked immediately after account creation. Store credentials in a physical safe. Enable MFA. Use only for account recovery. All daily operations must use IAM roles with proper RBAC.

Related Topics