IAM & RBAC
Identity and Access Management with Role-Based Access Control ensures the right people have the right access to the right resources at the right time. This guide covers AWS IAM, Kubernetes RBAC, multi-tenant SaaS architectures, and just-in-time access patterns.
RBAC Fundamentals
Role-Based Access Control (RBAC) maps permissions to roles, and roles to users. This indirection simplifies administration and enables principle of least privilege at scale.
RBAC Core Model
โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโ
โ User โโโโโโโ Role โโโโโโโ Permission โโโโโโโ Resource โ
โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโ
โ โ โ
โ โ โ
Attributes: Attributes: Attributes:
- identity - name - action (read/write/delete)
- department - scope - resource type
- manager - permissions - conditions
- tenure - time-bound
Relationships:
User โ Role: Many-to-Many (user can have multiple roles)
Role โ Permission: Many-to-Many (role has multiple permissions)
User โ Permission: Indirect (only via role)
RBAC vs ABAC Comparison
| Dimension | RBAC | ABAC |
|---|---|---|
| Access Decision | Based on role membership | Based on attributes of user, resource, environment |
| Granularity | Coarse (role-level) | Fine-grained (attribute-level) |
| Scalability | Role explosion at large scale | Scales better with complex environments |
| Management | Simpler to understand and audit | Complex policy evaluation engine required |
| Dynamic Access | Static role assignments | Dynamic based on real-time attributes |
| Best For | Structured organizations, clear job functions | Complex, multi-dimensional access requirements |
| Hybrid | Start with RBAC, add ABAC conditions for edge cases (RBAC + ABAC) | |
AWS IAM
Least Privilege Policy Examples
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3ReadOnlyApplicationBucket",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion",
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::company-app-data-prod",
"arn:aws:s3:::company-app-data-prod/*"
],
"Condition": {
"StringEquals": {
"aws:RequestedRegion": ["us-east-1", "us-west-2"],
"s3:x-amz-server-side-encryption": "AES256"
},
"Bool": {
"aws:MultiFactorAuthPresent": "true"
},
"DateGreaterThan": {
"aws:CurrentTime": "2024-01-01T00:00:00Z"
}
}
},
{
"Sid": "EC2LimitedAccess",
"Effect": "Allow",
"Action": [
"ec2:DescribeInstances",
"ec2:DescribeVolumes",
"ec2:DescribeSnapshots",
"ec2:CreateTags"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:PrincipalTag/Environment": "${aws:ResourceTag/Environment}"
},
"ForAllValues:StringEquals": {
"aws:TagKeys": ["Environment", "Application", "Owner"]
}
}
},
{
"Sid": "DenyNonVPCAccess",
"Effect": "Deny",
"Action": [
"ec2:RunInstances",
"rds:CreateDBInstance"
],
"Resource": "*",
"Condition": {
"Bool": {
"ec2:Vpc": "false"
}
}
},
{
"Sid": "DenyUnencryptedStorage",
"Effect": "Deny",
"Action": [
"s3:PutObject",
"ebs:CreateVolume",
"rds:CreateDBInstance"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256",
"ebs:Encrypted": "true"
}
}
}
]
}
IAM Permission Boundaries
Permission boundaries define the maximum permissions a role can have, regardless of what its identity-based policies grant:
# terraform/iam-permission-boundary.tf
# Permission boundary for developer roles
resource "aws_iam_policy" "developer_boundary" {
name = "DeveloperPermissionBoundary"
description = "Maximum permissions for developer roles"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowDevelopmentServices"
Effect = "Allow"
Action = [
"ec2:*",
"s3:*",
"rds:*",
"elasticache:*",
"sqs:*",
"sns:*",
"lambda:*",
"logs:*",
"cloudwatch:*"
]
Resource = "*"
Condition = {
StringEquals = {
"aws:RequestedRegion" = ["us-east-1", "us-west-2"]
}
}
},
{
Sid = "DenyProductionResources"
Effect = "Deny"
Action = "*"
Resource = "*"
Condition = {
StringEquals = {
"aws:ResourceTag/Environment" = "production"
}
}
},
{
Sid = "DenyDangerousActions"
Effect = "Deny"
Action = [
"iam:CreateUser",
"iam:DeleteUser",
"iam:CreateAccessKey",
"iam:DeleteAccountPasswordPolicy",
"iam:AttachUserPolicy",
"iam:DetachUserPolicy",
"organizations:LeaveOrganization",
"account:CloseAccount"
]
Resource = "*"
},
{
Sid = "DenyUnencrypted",
Effect = "Deny"
Action = [
"s3:PutObject"
]
Resource = "*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "AES256"
}
}
}
]
})
}
# Apply boundary to developer role
resource "aws_iam_role" "developer" {
name = "DeveloperRole"
path = "/"
assume_role_policy = data.aws_iam_policy_document.trust_policy.json
permissions_boundary = aws_iam_policy.developer_boundary.arn
managed_policy_arns = [
aws_iam_policy.developer_policy.arn
]
}
Service Control Policies (SCPs) for Organizations
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyLeavingOrganization",
"Effect": "Deny",
"Action": [
"organizations:LeaveOrganization"
],
"Resource": "*"
},
{
"Sid": "DenyAccountClosure",
"Effect": "Deny",
"Action": [
"account:CloseAccount",
"account:DeleteAlternateContact"
],
"Resource": "*"
},
{
"Sid": "RequireMFAForConsoleAccess",
"Effect": "Deny",
"Action": [
"iam:CreateAccessKey",
"iam:CreateUser",
"iam:PutUserPolicy"
],
"Resource": "*",
"Condition": {
"BoolIfExists": {
"aws:MultiFactorAuthPresent": "false"
}
}
},
{
"Sid": "DenyUnusedRegions",
"Effect": "Deny",
"NotAction": [
"iam:*",
"organizations:*",
"account:*",
"support:*",
"billing:*",
"ce:*"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestedRegion": [
"us-east-1",
"us-west-2",
"eu-west-1"
]
}
}
},
{
"Sid": "RequireEncryptedEBS",
"Effect": "Deny",
"Action": [
"ec2:CreateVolume",
"ec2:RunInstances"
],
"Resource": [
"arn:aws:ec2:*:*:volume/*"
],
"Condition": {
"Bool": {
"ec2:Encrypted": "false"
}
}
},
{
"Sid": "DenyPublicS3Buckets",
"Effect": "Deny",
"Action": [
"s3:PutBucketAcl",
"s3:PutBucketPolicy"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": ["public-read", "public-read-write"]
}
}
}
]
}
Multi-Tenant RBAC Architecture
Tenant Isolation Patterns
| Pattern | Isolation Level | Cost | Complexity | Use Case |
|---|---|---|---|---|
| Shared Everything | Row-level in shared DB | Lowest | Low | Startups, small SaaS |
| Shared DB, Separate Schema | Schema-level isolation | Low | Medium | Mid-market SaaS |
| Database-per-Tenant | Full DB isolation | Medium | High | Enterprise SaaS, compliance |
| Account-per-Tenant | Full AWS account isolation | Highest | Highest | GovCloud, regulated industries |
Complete RBAC Model for SaaS Platform
# rbac_model.py โ Multi-tenant RBAC implementation
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import List, Set, Optional, Dict
from functools import wraps
import time
class Permission(Enum):
"""Fine-grained permissions for SaaS platform."""
# User management
USER_READ = auto()
USER_CREATE = auto()
USER_UPDATE = auto()
USER_DELETE = auto()
USER_INVITE = auto()
# Resource management
RESOURCE_READ = auto()
RESOURCE_CREATE = auto()
RESOURCE_UPDATE = auto()
RESOURCE_DELETE = auto()
RESOURCE_SHARE = auto()
# Billing
BILLING_READ = auto()
BILLING_UPDATE = auto()
BILLING_MANAGE = auto()
# Administration
TENANT_SETTINGS_READ = auto()
TENANT_SETTINGS_UPDATE = auto()
MEMBER_MANAGE = auto()
ROLE_MANAGE = auto()
AUDIT_LOG_READ = auto()
# API keys
API_KEY_READ = auto()
API_KEY_CREATE = auto()
API_KEY_REVOKE = auto()
# Pre-defined roles with permission sets
ROLE_DEFINITIONS = {
"owner": {
"permissions": list(Permission),
"is_admin": True,
"max_count_per_tenant": 2
},
"admin": {
"permissions": [
p for p in Permission
if p not in [Permission.ROLE_MANAGE, Permission.BILLING_MANAGE]
],
"is_admin": True,
"max_count_per_tenant": 5
},
"editor": {
"permissions": [
Permission.RESOURCE_READ, Permission.RESOURCE_CREATE,
Permission.RESOURCE_UPDATE, Permission.RESOURCE_SHARE,
Permission.USER_READ
],
"is_admin": False,
"max_count_per_tenant": None
},
"viewer": {
"permissions": [
Permission.RESOURCE_READ, Permission.USER_READ
],
"is_admin": False,
"max_count_per_tenant": None
},
"billing_admin": {
"permissions": [
Permission.BILLING_READ, Permission.BILLING_UPDATE,
Permission.BILLING_MANAGE, Permission.AUDIT_LOG_READ
],
"is_admin": False,
"max_count_per_tenant": 2
}
}
@dataclass
class TenantContext:
"""Request context with tenant isolation."""
tenant_id: str
user_id: str
user_email: str
roles: List[str]
permissions: Set[Permission]
is_admin: bool
issued_at: float = field(default_factory=time.time)
@property
def is_expired(self) -> bool:
return time.time() - self.issued_at > 3600 # 1 hour
def has_permission(self, permission: Permission) -> bool:
return permission in self.permissions or self.is_admin
def has_any_permission(self, permissions: List[Permission]) -> bool:
return any(p in self.permissions for p in permissions) or self.is_admin
class RBACService:
"""Multi-tenant RBAC service with caching."""
def __init__(self):
self._tenant_roles: Dict[str, Dict[str, List[str]]] = {}
self._role_cache: Dict[str, Set[Permission]] = {}
def create_tenant_context(
self, tenant_id: str, user_id: str,
user_email: str, assigned_roles: List[str]
) -> TenantContext:
"""Build tenant context from user's roles."""
all_permissions: Set[Permission] = set()
is_admin = False
for role_name in assigned_roles:
role_def = ROLE_DEFINITIONS.get(role_name)
if role_def:
all_permissions.update(role_def["permissions"])
if role_def["is_admin"]:
is_admin = True
return TenantContext(
tenant_id=tenant_id,
user_id=user_id,
user_email=user_email,
roles=assigned_roles,
permissions=all_permissions,
is_admin=is_admin
)
def check_permission(self, ctx: TenantContext, permission: Permission) -> None:
"""Raise if user lacks permission."""
if ctx.is_expired:
raise PermissionError("Session expired")
if not ctx.has_permission(permission):
raise PermissionError(
f"User {ctx.user_id} lacks permission {permission.name} "
f"in tenant {ctx.tenant_id}"
)
def can_access_resource(
self, ctx: TenantContext,
resource_tenant_id: str,
resource_owner_id: Optional[str] = None
) -> bool:
"""Check cross-tenant access with ownership override."""
# Tenant isolation
if ctx.tenant_id != resource_tenant_id:
return False
# Admin can access all
if ctx.is_admin:
return True
# Owner can access own resources
if resource_owner_id and ctx.user_id == resource_owner_id:
return True
return True # Same tenant, checked permissions elsewhere
def require_permission(permission: Permission):
"""Decorator to enforce permission checks on API endpoints."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
ctx = kwargs.get('ctx') or kwargs.get('tenant_context')
if not ctx:
raise PermissionError("Tenant context required")
RBACService().check_permission(ctx, permission)
return func(*args, **kwargs)
return wrapper
return decorator
# Usage example
rbac = RBACService()
# Create context for a user
ctx = rbac.create_tenant_context(
tenant_id="tenant_abc123",
user_id="user_def456",
user_email="admin@client.com",
assigned_roles=["admin"]
)
# Enforce permission check
@require_permission(Permission.RESOURCE_DELETE)
def delete_resource(resource_id: str, *, ctx: TenantContext):
print(f"Deleting {resource_id} in tenant {ctx.tenant_id}")
# This will succeed
delete_resource("res_123", ctx=ctx)
# Create viewer context โ this would fail for delete
viewer_ctx = rbac.create_tenant_context(
tenant_id="tenant_abc123",
user_id="user_xyz789",
user_email="viewer@client.com",
assigned_roles=["viewer"]
)
# This will raise PermissionError
try:
delete_resource("res_123", ctx=viewer_ctx)
except PermissionError as e:
print(f"Access denied: {e}")
Kubernetes RBAC
Roles and ClusterRoles
# k8s/rbac/developer-role.yaml
# Namespace-scoped role for developers
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: production
name: developer
rules:
# Pods โ read and logs only
- apiGroups: [""]
resources: ["pods", "pods/log"]
verbs: ["get", "list", "watch"]
# Deployments โ read only
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list", "watch"]
# Services โ read only
- apiGroups: [""]
resources: ["services", "endpoints"]
verbs: ["get", "list", "watch"]
# ConfigMaps โ read only (secrets explicitly excluded)
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list"]
# Jobs โ create for one-off tasks
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["create", "get", "list", "watch"]
# No access to: secrets, persistentvolumes, nodes, clusterroles, serviceaccounts
---
# Cluster-scoped read-only role
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: cluster-reader
rules:
- apiGroups: [""]
resources: ["nodes", "namespaces", "persistentvolumes"]
verbs: ["get", "list", "watch"]
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["clusterroles", "clusterrolebindings", "roles", "rolebindings"]
verbs: ["get", "list", "watch"]
- apiGroups: ["metrics.k8s.io"]
resources: ["nodes", "pods"]
verbs: ["get", "list"]
# k8s/rbac/bindings.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: developer-binding
namespace: production
subjects:
- kind: Group
name: engineering
apiGroup: rbac.authorization.k8s.io
- kind: User
name: alice@company.com
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: Role
name: developer
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: cluster-reader-binding
subjects:
- kind: Group
name: sre-team
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: ClusterRole
name: cluster-reader
apiGroup: rbac.authorization.k8s.io
---
# Service account for CI/CD with minimal permissions
apiVersion: v1
kind: ServiceAccount
metadata:
name: cicd-deployer
namespace: production
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456789:role/CICDDeployerRole
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: cicd-deployer
namespace: production
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "patch"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io
kind: RoleBinding
metadata:
name: cicd-deployer-binding
namespace: production
subjects:
- kind: ServiceAccount
name: cicd-deployer
namespace: production
roleRef:
kind: Role
name: cicd-deployer
apiGroup: rbac.authorization.k8s.io
Just-in-Time (JIT) Access
JIT access provides temporary elevated permissions on-demand, reducing standing privileged access.
#!/usr/bin/env python3
"""
jit_access.py โ Just-in-Time access request and approval system.
Integrates with AWS IAM and Slack for approvals.
"""
import boto3
import json
import time
from datetime import datetime, timedelta
from enum import Enum
from dataclasses import dataclass
from typing import Optional
import requests
class JITStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
DENIED = "denied"
EXPIRED = "expired"
REVOKED = "revoked"
@dataclass
class JITRequest:
request_id: str
requester_email: str
role_arn: str
reason: str
requested_duration_minutes: int
status: JITStatus
approver_email: Optional[str] = None
approved_at: Optional[float] = None
expires_at: Optional[float] = None
ticket_reference: Optional[str] = None
class JITAccessManager:
"""Manages just-in-time elevated access."""
JIT_ROLES = {
"production-read": {
"arn": "arn:aws:iam::123456789:role/JIT-ProductionRead",
"max_duration": 240, # 4 hours
"requires_approval": True,
"auto_approvers": ["sre-lead@company.com"]
},
"production-write": {
"arn": "arn:aws:iam::123456789:role/JIT-ProductionWrite",
"max_duration": 120, # 2 hours
"requires_approval": True,
"auto_approvers": []
},
"database-admin": {
"arn": "arn:aws:iam::123456789:role/JIT-DBAdmin",
"max_duration": 60, # 1 hour
"requires_approval": True,
"auto_approvers": []
},
"security-audit": {
"arn": "arn:aws:iam::123456789:role/JIT-SecurityAudit",
"max_duration": 480, # 8 hours
"requires_approval": False, # Auto-approved for security team
"auto_approvers": ["*@security.company.com"]
}
}
def __init__(self):
self.sts = boto3.client('sts')
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table('jit-access-requests')
self.slack_webhook = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
def request_access(self, requester_email: str, role_name: str,
reason: str, duration: int,
ticket_ref: Optional[str] = None) -> JITRequest:
"""Request JIT access to a role."""
if role_name not in self.JIT_ROLES:
raise ValueError(f"Unknown JIT role: {role_name}")
role_config = self.JIT_ROLES[role_name]
if duration > role_config["max_duration"]:
raise ValueError(
f"Max duration for {role_name} is {role_config['max_duration']} minutes"
)
request_id = f"jit-{int(time.time())}-{hash(requester_email) % 10000:04d}"
request = JITRequest(
request_id=request_id,
requester_email=requester_email,
role_arn=role_config["arn"],
reason=reason,
requested_duration_minutes=duration,
status=JITStatus.PENDING,
ticket_reference=ticket_ref
)
# Store request
self.table.put_item(Item={
'request_id': request_id,
'requester': requester_email,
'role': role_name,
'reason': reason,
'duration': duration,
'status': 'pending',
'created_at': int(time.time()),
'ticket_ref': ticket_ref
})
# Send approval request
if role_config["requires_approval"]:
self._send_approval_request(request, role_name)
else:
# Auto-approve
self.approve_request(request_id, "auto-approver@system")
return request
def approve_request(self, request_id: str,
approver_email: str) -> dict:
"""Approve a JIT request and provision temporary credentials."""
# Get request
response = self.table.get_item(Key={'request_id': request_id})
item = response.get('Item')
if not item:
raise ValueError(f"Request {request_id} not found")
if item['status'] != 'pending':
raise ValueError(f"Request is not pending: {item['status']}")
# Assume the JIT role
duration_seconds = item['duration'] * 60
assumed = self.sts.assume_role(
RoleArn=item['role_arn'],
RoleSessionName=f"jit-{item['requester']}-{request_id}",
DurationSeconds=min(duration_seconds, 3600)
)
credentials = assumed['Credentials']
# Update request
self.table.update_item(
Key={'request_id': request_id},
UpdateExpression="SET #status = :status, approver = :approver, "
"approved_at = :approved_at, expires_at = :expires_at",
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': 'approved',
':approver': approver_email,
':approved_at': int(time.time()),
':expires_at': int(credentials['Expiration'].timestamp())
}
)
return {
'request_id': request_id,
'access_key_id': credentials['AccessKeyId'],
'secret_access_key': credentials['SecretAccessKey'],
'session_token': credentials['SessionToken'],
'expires_at': credentials['Expiration'].isoformat()
}
def revoke_request(self, request_id: str) -> None:
"""Revoke an active JIT session."""
self.table.update_item(
Key={'request_id': request_id},
UpdateExpression="SET #status = :status",
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={':status': 'revoked'}
)
def _send_approval_request(self, request: JITRequest, role_name: str) -> None:
"""Send Slack approval request."""
message = {
"text": "JIT Access Request",
"blocks": [
{
"type": "header",
"text": {"type": "plain_text", "text": "๐ JIT Access Request"}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Requester:*\n{request.requester_email}"},
{"type": "mrkdwn", "text": f"*Role:*\n{role_name}"},
{"type": "mrkdwn", "text": f"*Duration:*\n{request.requested_duration_minutes} minutes"},
{"type": "mrkdwn", "text": f"*Ticket:*\n{request.ticket_reference or 'N/A'}"}
]
},
{"type": "section", "text": {"type": "mrkdwn", "text": f"*Reason:* {request.reason}"}},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "โ
Approve"},
"style": "primary",
"value": f"approve:{request.request_id}",
"action_id": "approve_jit"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "โ Deny"},
"style": "danger",
"value": f"deny:{request.request_id}",
"action_id": "deny_jit"
}
]
}
]
}
requests.post(self.slack_webhook, json=message)
Access Review and Recertification
# access-review-process.yaml
access_review_process:
frequency:
privileged_accounts: "quarterly"
standard_accounts: "semi-annually"
third_party_access: "quarterly"
service_accounts: "annually"
workflow:
step_1_notification:
action: "Email manager with list of direct reports and their access"
timing: "15 days before review deadline"
step_2_manager_review:
action: "Manager validates each access assignment"
requires: "Justification for each role/permission"
outcomes: ["certify", "revoke", "modify"]
step_3_automation:
action: "Automated removal of un-certified access"
timing: "7 days after deadline"
exceptions: "CISO approval required to delay"
step_4_audit:
action: "Compliance team reviews completion rates"
evidence: "Signed attestations stored for 7 years"
escalation:
level_1: "Reminder at T-7 days"
level_2: "Manager's manager notified at T-3 days"
level_3: "HR and security team at T-1 day"
level_4: "Auto-revocation at T+7 days"
Automated User Lifecycle Management
#!/usr/bin/env python3
"""
user_lifecycle_manager.py โ Automated user provisioning/deprovisioning.
Integrates with Okta/Entra ID, AWS IAM, and internal systems.
"""
import boto3
import json
import requests
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional, Dict
from enum import Enum
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LifecycleAction(Enum):
PROVISION = "provision"
UPDATE = "update"
DISABLE = "disable"
DELETE = "delete"
TRANSFER = "transfer"
@dataclass
class UserRecord:
email: str
first_name: str
last_name: str
department: str
job_title: str
manager_email: str
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
aws_username: Optional[str] = None
groups: List[str] = None
class UserLifecycleManager:
"""Automates user provisioning and deprovisioning across systems."""
def __init__(self):
self.iam = boto3.client('iam')
self.identitystore = boto3.client('identitystore')
self.ssoadmin = boto3.client('sso-admin')
self.okta_client = None # Initialize with Okta SDK
self.IDENTITY_STORE_ID = "d-1234567890"
self.SSO_INSTANCE_ARN = "arn:aws:sso:::instance/ssoins-1234567890"
def provision_user(self, user: UserRecord) -> Dict:
"""Provision a new user across all systems."""
logger.info(f"Provisioning user: {user.email}")
results = {}
# 1. Create in AWS IAM Identity Center (SSO)
results['sso'] = self._create_sso_user(user)
# 2. Assign to groups based on role
results['groups'] = self._assign_sso_groups(
results['sso']['user_id'], user
)
# 3. Assign AWS permission sets
results['permissions'] = self._assign_permission_sets(
results['sso']['user_id'], user
)
# 4. Create AWS IAM user (for CLI/programmatic access)
results['iam'] = self._create_iam_user(user)
# 5. Add to security groups
results['security_groups'] = self._configure_security_groups(user)
# 6. Send welcome email
self._send_welcome_email(user, results)
logger.info(f"Provisioning complete for {user.email}")
return results
def _create_sso_user(self, user: UserRecord) -> Dict:
"""Create user in AWS IAM Identity Center."""
response = self.identitystore.create_user(
IdentityStoreId=self.IDENTITY_STORE_ID,
UserName=user.email,
DisplayName=f"{user.first_name} {user.last_name}",
Name={
'FamilyName': user.last_name,
'GivenName': user.first_name
},
Emails=[{
'Value': user.email,
'Type': 'Work',
'Primary': True
}]
)
return {
'user_id': response['UserId'],
'status': 'created'
}
def _assign_sso_groups(self, user_id: str, user: UserRecord) -> List[str]:
"""Assign user to SSO groups based on department and role."""
group_mapping = {
'engineering': ['Engineering-All', 'AWS-Developer-Access'],
'platform': ['Platform-Team', 'AWS-Admin-Access'],
'security': ['Security-Team', 'AWS-Security-Access', 'Wazuh-Admin'],
'finance': ['Finance-Team', 'AWS-ReadOnly-Access'],
'sales': ['Sales-Team', 'AWS-ReadOnly-Access'],
}
groups = group_mapping.get(user.department.lower(), ['AWS-ReadOnly-Access'])
assigned = []
for group_name in groups:
try:
# Get group ID (would be cached in production)
group_id = self._get_group_id(group_name)
self.identitystore.create_group_membership(
IdentityStoreId=self.IDENTITY_STORE_ID,
GroupId=group_id,
MemberId={'UserId': user_id}
)
assigned.append(group_name)
except Exception as e:
logger.warning(f"Failed to assign group {group_name}: {e}")
return assigned
def _assign_permission_sets(self, user_id: str, user: UserRecord) -> List[str]:
"""Assign AWS permission sets based on role."""
permission_set_mapping = {
'engineer': ['ViewOnlyAccess', 'DeveloperAccess'],
'senior_engineer': ['ViewOnlyAccess', 'PowerUserAccess'],
'platform_engineer': ['ViewOnlyAccess', 'AdministratorAccess'],
'security_engineer': ['ViewOnlyAccess', 'SecurityAudit', 'ReadOnlyAccess'],
}
role_key = user.job_title.lower().replace(' ', '_')
permission_sets = permission_set_mapping.get(
role_key, ['ViewOnlyAccess']
)
assigned = []
for ps_name in permission_sets:
try:
ps_arn = self._get_permission_set_arn(ps_name)
self.ssoadmin.create_account_assignment(
InstanceArn=self.SSO_INSTANCE_ARN,
PermissionSetArn=ps_arn,
PrincipalId=user_id,
PrincipalType='USER',
TargetId='123456789012', # AWS Account ID
TargetType='AWS_ACCOUNT'
)
assigned.append(ps_name)
except Exception as e:
logger.warning(f"Failed to assign permission set {ps_name}: {e}")
return assigned
def _create_iam_user(self, user: UserRecord) -> Dict:
"""Create IAM user for programmatic access."""
username = user.email.split('@')[0]
try:
self.iam.create_user(
UserName=username,
Tags=[
{'Key': 'Department', 'Value': user.department},
{'Key': 'Manager', 'Value': user.manager_email},
{'Key': 'Email', 'Value': user.email},
{'Key': 'CreatedBy', 'Value': 'automation'}
]
)
# Attach policies based on role
policy_arns = self._get_policies_for_role(user.job_title)
for policy_arn in policy_arns:
self.iam.attach_user_policy(
UserName=username,
PolicyArn=policy_arn
)
return {'username': username, 'status': 'created'}
except self.iam.exceptions.EntityAlreadyExistsException:
return {'username': username, 'status': 'already_exists'}
def deprovision_user(self, email: str, transfer_to: Optional[str] = None) -> Dict:
"""Deprovision a user โ the critical offboarding process."""
logger.info(f"Deprovisioning user: {email}")
results = {}
username = email.split('@')[0]
# 1. Disable in SSO immediately (fastest protection)
try:
user_id = self._get_sso_user_id(email)
self.identitystore.delete_user(
IdentityStoreId=self.IDENTITY_STORE_ID,
UserId=user_id
)
results['sso'] = 'disabled'
logger.info(f"SSO user disabled: {email}")
except Exception as e:
logger.error(f"SSO deprovisioning failed: {e}")
results['sso'] = f'error: {str(e)}'
# 2. Delete IAM access keys
try:
keys = self.iam.list_access_keys(UserName=username)
for key in keys['AccessKeyMetadata']:
self.iam.update_access_key(
UserName=username,
AccessKeyId=key['AccessKeyId'],
Status='Inactive'
)
logger.info(f"Disabled access key: {key['AccessKeyId']}")
results['access_keys'] = 'disabled'
except Exception as e:
logger.error(f"Access key disable failed: {e}")
# 3. Remove from all IAM groups
try:
groups = self.iam.list_groups_for_user(UserName=username)
for group in groups['Groups']:
self.iam.remove_user_from_group(
GroupName=group['GroupName'],
UserName=username
)
results['iam_groups'] = 'removed'
except Exception as e:
logger.error(f"IAM group removal failed: {e}")
# 4. Delete IAM login profile (console access)
try:
self.iam.delete_login_profile(UserName=username)
results['console_access'] = 'disabled'
except self.iam.exceptions.NoSuchEntityException:
results['console_access'] = 'already_disabled'
except Exception as e:
logger.error(f"Console access disable failed: {e}")
# 5. Transfer ownership of resources if specified
if transfer_to:
results['transfer'] = self._transfer_resources(email, transfer_to)
# 6. Schedule IAM user deletion (after 30-day grace period)
# In production: publish to SQS for delayed deletion
results['iam_user'] = 'scheduled_for_deletion'
# 7. Revoke all active sessions
try:
# Create explicit deny policy to block any existing sessions
deny_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Action": "*",
"Resource": "*"
}]
}
self.iam.put_user_policy(
UserName=username,
PolicyName='ExplicitDenyAll',
PolicyDocument=json.dumps(deny_policy)
)
results['sessions'] = 'revoked'
except Exception as e:
logger.error(f"Session revocation failed: {e}")
# 8. Audit log
self._log_deprovisioning(email, results, transfer_to)
logger.info(f"Deprovisioning complete for {email}")
return results
def _get_group_id(self, group_name: str) -> str:
"""Look up SSO group ID by name."""
# In production: use cached lookup
response = self.identitystore.list_groups(
IdentityStoreId=self.IDENTITY_STORE_ID,
Filters=[{'AttributePath': 'DisplayName', 'AttributeValue': group_name}]
)
return response['Groups'][0]['GroupId']
def _get_permission_set_arn(self, name: str) -> str:
"""Look up permission set ARN by name."""
response = self.ssoadmin.list_permission_sets(
InstanceArn=self.SSO_INSTANCE_ARN
)
for ps_arn in response['PermissionSets']:
desc = self.ssoadmin.describe_permission_set(
InstanceArn=self.SSO_INSTANCE_ARN,
PermissionSetArn=ps_arn
)
if desc['PermissionSet']['Name'] == name:
return ps_arn
raise ValueError(f"Permission set not found: {name}")
def _get_sso_user_id(self, email: str) -> str:
"""Look up SSO user ID by email."""
response = self.identitystore.list_users(
IdentityStoreId=self.IDENTITY_STORE_ID,
Filters=[{'AttributePath': 'UserName', 'AttributeValue': email}]
)
return response['Users'][0]['UserId']
def _get_policies_for_role(self, job_title: str) -> List[str]:
"""Map job title to IAM policies."""
policy_map = {
'engineer': [
'arn:aws:iam::aws:policy/ReadOnlyAccess',
'arn:aws:iam::123456789:policy/DeveloperAccess'
],
'security_engineer': [
'arn:aws:iam::aws:policy/ReadOnlyAccess',
'arn:aws:iam::aws:policy/SecurityAudit'
],
}
return policy_map.get(job_title.lower().replace(' ', '_'),
['arn:aws:iam::aws:policy/ReadOnlyAccess'])
def _transfer_resources(self, from_email: str, to_email: str) -> Dict:
"""Transfer resource ownership during offboarding."""
# Implementation: Update S3 bucket ownership, reassign EC2 tags, etc.
return {'status': 'completed', 'resources_updated': 0}
def _configure_security_groups(self, user: UserRecord) -> List[str]:
"""Add user to security/monitoring groups."""
return []
def _send_welcome_email(self, user: UserRecord, results: Dict) -> None:
"""Send welcome email with access instructions."""
pass
def _log_deprovisioning(self, email: str, results: Dict,
transfer_to: Optional[str]) -> None:
"""Log deprovisioning for audit trail."""
audit_entry = {
'timestamp': datetime.utcnow().isoformat(),
'action': 'user_deprovision',
'user_email': email,
'transfer_to': transfer_to,
'results': results,
'source': 'automation'
}
logger.info(f"AUDIT: {json.dumps(audit_entry)}")
# Usage
if __name__ == '__main__':
manager = UserLifecycleManager()
# Provision new hire
new_hire = UserRecord(
email="john.doe@company.com",
first_name="John",
last_name="Doe",
department="engineering",
job_title="engineer",
manager_email="jane.smith@company.com"
)
# manager.provision_user(new_hire)
# Deprovision terminated employee
# manager.deprovision_user("former.employee@company.com")
AWS IAM Identity Center (SSO)
# terraform/aws-sso.tf
# Permission sets
resource "aws_ssoadmin_permission_set" "developer" {
name = "DeveloperAccess"
description = "Standard developer access to AWS"
instance_arn = local.sso_instance_arn
session_duration = "PT8H"
tags = {
Environment = "all"
}
}
resource "aws_ssoadmin_managed_policy_attachment" "developer_readonly" {
instance_arn = local.sso_instance_arn
managed_policy_arn = "arn:aws:iam::aws:policy/ReadOnlyAccess"
permission_set_arn = aws_ssoadmin_permission_set.developer.arn
}
resource "aws_ssoadmin_permission_set_inline_policy" "developer_custom" {
instance_arn = local.sso_instance_arn
permission_set_arn = aws_ssoadmin_permission_set.developer.arn
inline_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "DeveloperEC2"
Effect = "Allow"
Action = [
"ec2:StartInstances",
"ec2:StopInstances",
"ec2:RebootInstances",
"ec2:CreateTags"
]
Resource = "*"
Condition = {
StringEquals = {
"ec2:ResourceTag/Team" = "$${aws:PrincipalTag/Team}"
}
}
},
{
Sid = "DeveloperLambda"
Effect = "Allow"
Action = [
"lambda:UpdateFunctionCode",
"lambda:UpdateFunctionConfiguration",
"lambda:InvokeFunction"
]
Resource = "arn:aws:lambda:*:*:function:team-*"
}
]
})
}
# Account assignments
resource "aws_ssoadmin_account_assignment" "developer_prod" {
instance_arn = local.sso_instance_arn
permission_set_arn = aws_ssoadmin_permission_set.developer.arn
principal_id = aws_identitystore_group.engineers.group_id
principal_type = "GROUP"
target_id = "123456789012" # Production account
target_type = "AWS_ACCOUNT"
}
Privileged Access Management (PAM) Principles
| Principle | Implementation |
|---|---|
| Least Privilege | Grant minimum access needed for task duration |
| Need-to-Know | Access only data required for specific function |
| Separation of Duties | Require two people for critical operations |
| Just-in-Time Access | Elevated permissions expire automatically |
| Complete Audit Trail | Log every privileged action with session recording |
| Regular Recertification | Re-validate all privileged access quarterly |
| Emergency Access | Break-glass procedures with immediate notification |
Never Use Root Account
The AWS root account should be locked immediately after account creation. Store credentials in a physical safe. Enable MFA. Use only for account recovery. All daily operations must use IAM roles with proper RBAC.
Related Topics
- HashiCorp Vault โ Dynamic credentials and secrets management
- SecOps Overview โ Security frameworks and RBAC fundamentals
- Compliance Frameworks โ Access control requirements by framework