Skip to content
Open
209 changes: 209 additions & 0 deletions src/workos/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from workos.types.authorization.resource_identifier import ResourceIdentifier
from workos.types.authorization.authorization_resource import AuthorizationResource
from workos.types.authorization.role import Role, RoleList
from workos.types.authorization.role_assignment import RoleAssignment
from workos.types.list_resource import (
ListArgs,
ListMetadata,
Expand Down Expand Up @@ -42,6 +43,7 @@ class _Unset(Enum):
AUTHORIZATION_PERMISSIONS_PATH = "authorization/permissions"
AUTHORIZATION_RESOURCES_PATH = "authorization/resources"
AUTHORIZATION_ORGANIZATIONS_PATH = "authorization/organizations"
AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH = "authorization/organization_memberships"


class ResourceListFilters(ListArgs, total=False):
Expand Down Expand Up @@ -72,6 +74,15 @@ class ParentResourceByExternalId(TypedDict):
_role_adapter: TypeAdapter[Role] = TypeAdapter(Role)


class RoleAssignmentListFilters(ListArgs, total=False):
organization_membership_id: str


RoleAssignmentsListResource = WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
]


class PermissionListFilters(ListArgs, total=False):
pass

Expand Down Expand Up @@ -280,6 +291,38 @@ def check(
resource: ResourceIdentifier,
) -> SyncOrAsync[AccessCheckResponse]: ...

def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> SyncOrAsync[RoleAssignment]: ...

def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> SyncOrAsync[None]: ...

def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> SyncOrAsync[None]: ...

def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> SyncOrAsync[RoleAssignmentsListResource]: ...


class Authorization(AuthorizationModule):
_http_client: SyncHTTPClient
Expand Down Expand Up @@ -754,6 +797,89 @@ def check(

return AccessCheckResponse.model_validate(response)

# Role Assignments

def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> RoleAssignment:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

response = self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_POST,
json=json,
)

return RoleAssignment.model_validate(response)

def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> None:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

self._http_client.delete_with_body(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
json=json,
)

def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> None:
self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}",
method=REQUEST_METHOD_DELETE,
)

def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> RoleAssignmentsListResource:
list_params: RoleAssignmentListFilters = {
"organization_membership_id": organization_membership_id,
"limit": limit,
"before": before,
"after": after,
"order": order,
}

query_params: ListArgs = {
"limit": limit,
"before": before,
"after": after,
"order": order,
}

response = self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_GET,
params=query_params,
)

return WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
](
list_method=self.list_role_assignments,
list_args=list_params,
**ListPage[RoleAssignment](**response).model_dump(),
)


class AsyncAuthorization(AuthorizationModule):
_http_client: AsyncHTTPClient
Expand Down Expand Up @@ -1229,3 +1355,86 @@ async def check(
)

return AccessCheckResponse.model_validate(response)

# Role Assignments

async def assign_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> RoleAssignment:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

response = await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_POST,
json=json,
)

return RoleAssignment.model_validate(response)

async def remove_role(
self,
organization_membership_id: str,
*,
role_slug: str,
resource_identifier: ResourceIdentifier,
) -> None:
json: Dict[str, Any] = {"role_slug": role_slug}
json.update(resource_identifier)

await self._http_client.delete_with_body(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
json=json,
)

async def remove_role_assignment(
self,
organization_membership_id: str,
role_assignment_id: str,
) -> None:
await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}",
method=REQUEST_METHOD_DELETE,
)

async def list_role_assignments(
self,
*,
organization_membership_id: str,
limit: int = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: PaginationOrder = "desc",
) -> RoleAssignmentsListResource:
list_params: RoleAssignmentListFilters = {
"organization_membership_id": organization_membership_id,
"limit": limit,
"before": before,
"after": after,
"order": order,
}

query_params: ListArgs = {
"limit": limit,
"before": before,
"after": after,
"order": order,
}

response = await self._http_client.request(
f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments",
method=REQUEST_METHOD_GET,
params=query_params,
)

return WorkOSListResource[
RoleAssignment, RoleAssignmentListFilters, ListMetadata
](
list_method=self.list_role_assignments,
list_args=list_params,
**ListPage[RoleAssignment](**response).model_dump(),
)
Loading