diff --git a/src/workos/authorization.py b/src/workos/authorization.py index 5c3a220f..c32e4454 100644 --- a/src/workos/authorization.py +++ b/src/workos/authorization.py @@ -14,6 +14,7 @@ from workos.types.authorization.resource_identifier import ResourceIdentifier from workos.types.authorization.authorization_resource import AuthorizationResource from workos.types.authorization.role import Role, RoleList +from workos.types.authorization.role_assignment import RoleAssignment from workos.types.list_resource import ( ListArgs, ListMetadata, @@ -42,6 +43,7 @@ class _Unset(Enum): AUTHORIZATION_PERMISSIONS_PATH = "authorization/permissions" AUTHORIZATION_RESOURCES_PATH = "authorization/resources" AUTHORIZATION_ORGANIZATIONS_PATH = "authorization/organizations" +AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH = "authorization/organization_memberships" class ResourceListFilters(ListArgs, total=False): @@ -72,6 +74,15 @@ class ParentResourceByExternalId(TypedDict): _role_adapter: TypeAdapter[Role] = TypeAdapter(Role) +class RoleAssignmentListFilters(ListArgs, total=False): + organization_membership_id: str + + +RoleAssignmentsListResource = WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata +] + + class PermissionListFilters(ListArgs, total=False): pass @@ -280,6 +291,38 @@ def check( resource: ResourceIdentifier, ) -> SyncOrAsync[AccessCheckResponse]: ... + def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> SyncOrAsync[RoleAssignment]: ... + + def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> SyncOrAsync[None]: ... + + def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> SyncOrAsync[None]: ... + + def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> SyncOrAsync[RoleAssignmentsListResource]: ... + class Authorization(AuthorizationModule): _http_client: SyncHTTPClient @@ -754,6 +797,89 @@ def check( return AccessCheckResponse.model_validate(response) + # Role Assignments + + def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> RoleAssignment: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + + response = self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_POST, + json=json, + ) + + return RoleAssignment.model_validate(response) + + def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> None: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + + self._http_client.delete_with_body( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + json=json, + ) + + def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> None: + self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}", + method=REQUEST_METHOD_DELETE, + ) + + def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> RoleAssignmentsListResource: + list_params: RoleAssignmentListFilters = { + "organization_membership_id": organization_membership_id, + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + query_params: ListArgs = { + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + response = self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_GET, + params=query_params, + ) + + return WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata + ]( + list_method=self.list_role_assignments, + list_args=list_params, + **ListPage[RoleAssignment](**response).model_dump(), + ) + class AsyncAuthorization(AuthorizationModule): _http_client: AsyncHTTPClient @@ -1229,3 +1355,86 @@ async def check( ) return AccessCheckResponse.model_validate(response) + + # Role Assignments + + async def assign_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> RoleAssignment: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + + response = await self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_POST, + json=json, + ) + + return RoleAssignment.model_validate(response) + + async def remove_role( + self, + organization_membership_id: str, + *, + role_slug: str, + resource_identifier: ResourceIdentifier, + ) -> None: + json: Dict[str, Any] = {"role_slug": role_slug} + json.update(resource_identifier) + + await self._http_client.delete_with_body( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + json=json, + ) + + async def remove_role_assignment( + self, + organization_membership_id: str, + role_assignment_id: str, + ) -> None: + await self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments/{role_assignment_id}", + method=REQUEST_METHOD_DELETE, + ) + + async def list_role_assignments( + self, + *, + organization_membership_id: str, + limit: int = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: PaginationOrder = "desc", + ) -> RoleAssignmentsListResource: + list_params: RoleAssignmentListFilters = { + "organization_membership_id": organization_membership_id, + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + query_params: ListArgs = { + "limit": limit, + "before": before, + "after": after, + "order": order, + } + + response = await self._http_client.request( + f"{AUTHORIZATION_ORGANIZATION_MEMBERSHIPS_PATH}/{organization_membership_id}/role_assignments", + method=REQUEST_METHOD_GET, + params=query_params, + ) + + return WorkOSListResource[ + RoleAssignment, RoleAssignmentListFilters, ListMetadata + ]( + list_method=self.list_role_assignments, + list_args=list_params, + **ListPage[RoleAssignment](**response).model_dump(), + ) diff --git a/tests/test_authorization_role_assignments.py b/tests/test_authorization_role_assignments.py new file mode 100644 index 00000000..ed040ece --- /dev/null +++ b/tests/test_authorization_role_assignments.py @@ -0,0 +1,358 @@ +from typing import Union + +import pytest +from tests.utils.fixtures.mock_role_assignment import ( + MockRoleAssignment, + MockRoleAssignmentsList, +) +from tests.utils.list_resource import list_response_of +from tests.utils.syncify import syncify +from workos.authorization import AsyncAuthorization, Authorization + + +@pytest.mark.sync_and_async(Authorization, AsyncAuthorization) +class TestAuthorizationRoleAssignments: + @pytest.fixture(autouse=True) + def setup(self, module_instance: Union[Authorization, AsyncAuthorization]): + self.http_client = module_instance._http_client + self.authorization = module_instance + + @pytest.fixture + def mock_role_assignments_list(self): + return MockRoleAssignmentsList().dict() + + @pytest.fixture + def mock_role_assignments_empty_list(self): + return list_response_of(data=[]) + + def test_assign_role_by_resource_id(self, capture_and_mock_http_client_request): + mock_role_assignment = MockRoleAssignment().dict() + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignment, 201 + ) + + response = syncify( + self.authorization.assign_role( + "om_01ABC", + role_slug="admin", + resource_identifier={"resource_id": "res_01XYZ"}, + ) + ) + + assert request_kwargs["method"] == "post" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_id": "res_01XYZ", + } + assert "resource_external_id" not in request_kwargs["json"] + assert "resource_type_slug" not in request_kwargs["json"] + + assert response.dict() == mock_role_assignment + + def test_assign_role_by_external_id_and_resource_type_slug( + self, capture_and_mock_http_client_request + ): + mock_role_assignment = MockRoleAssignment().dict() + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignment, 201 + ) + + response = syncify( + self.authorization.assign_role( + "om_01ABC", + role_slug="editor", + resource_identifier={ + "resource_external_id": "ext_doc_456", + "resource_type_slug": "document", + }, + ) + ) + + assert request_kwargs["method"] == "post" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == { + "role_slug": "editor", + "resource_external_id": "ext_doc_456", + "resource_type_slug": "document", + } + assert "resource_id" not in request_kwargs["json"] + + assert response.dict() == mock_role_assignment + + def test_remove_role_by_resource_id(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, status_code=204 + ) + + syncify( + self.authorization.remove_role( + "om_01ABC", + role_slug="admin", + resource_identifier={"resource_id": "res_01XYZ"}, + ) + ) + + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == { + "role_slug": "admin", + "resource_id": "res_01XYZ", + } + assert "resource_external_id" not in request_kwargs["json"] + assert "resource_type_slug" not in request_kwargs["json"] + + def test_remove_role_by_external_id_and_resource_type_slug( + self, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, status_code=204 + ) + + syncify( + self.authorization.remove_role( + "om_01ABC", + role_slug="editor", + resource_identifier={ + "resource_external_id": "ext_doc_456", + "resource_type_slug": "document", + }, + ) + ) + + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["json"] == { + "role_slug": "editor", + "resource_external_id": "ext_doc_456", + "resource_type_slug": "document", + } + assert "resource_id" not in request_kwargs["json"] + + def test_remove_role_assignment(self, capture_and_mock_http_client_request): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, status_code=204 + ) + + syncify( + self.authorization.remove_role_assignment( + "om_01ABC", + role_assignment_id="ra_01XYZ", + ) + ) + + assert request_kwargs["method"] == "delete" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments/ra_01XYZ" + ) + + def test_list_role_assignments_returns_paginated_list( + self, + mock_role_assignments_list, + capture_and_mock_http_client_request, + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments_list, 200 + ) + + response = syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC" + ) + ) + + assert request_kwargs["method"] == "get" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["params"] == {"limit": 10, "order": "desc"} + + assert response.object == "list" + assert len(response.data) == 2 + + assert response.data[0].object == "role_assignment" + assert response.data[0].id == "ra_01ABC" + assert response.data[0].role.slug == "admin" + assert response.data[0].resource.id == "res_01ABC" + assert response.data[0].resource.external_id == "ext_123" + assert response.data[0].resource.resource_type_slug == "document" + assert response.data[0].created_at == "2024-01-15T09:30:00.000Z" + assert response.data[0].updated_at == "2024-01-15T09:30:00.000Z" + + assert response.data[1].object == "role_assignment" + assert response.data[1].id == "ra_01DEF" + assert response.data[1].role.slug == "editor" + assert response.data[1].resource.id == "res_01XYZ" + assert response.data[1].resource.external_id == "ext_456" + assert response.data[1].resource.resource_type_slug == "folder" + assert response.data[1].created_at == "2024-01-14T08:00:00.000Z" + assert response.data[1].updated_at == "2024-01-14T08:00:00.000Z" + + assert response.list_metadata.before is None + assert response.list_metadata.after == "ra_01DEF" + + def test_list_role_assignments_returns_empty_list( + self, + mock_role_assignments_empty_list, + capture_and_mock_http_client_request, + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments_empty_list, 200 + ) + + response = syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC" + ) + ) + + assert request_kwargs["method"] == "get" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["params"] == {"limit": 10, "order": "desc"} + + assert len(response.data) == 0 + assert response.list_metadata.before is None + assert response.list_metadata.after is None + + def test_list_role_assignments_with_limit( + self, + mock_role_assignments_list, + capture_and_mock_http_client_request, + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments_list, 200 + ) + + syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC", + limit=25, + ) + ) + + assert request_kwargs["params"]["limit"] == 25 + assert request_kwargs["params"]["order"] == "desc" + assert "before" not in request_kwargs["params"] + assert "after" not in request_kwargs["params"] + + def test_list_role_assignments_with_before( + self, + mock_role_assignments_list, + capture_and_mock_http_client_request, + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments_list, 200 + ) + + syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC", + before="cursor_before", + ) + ) + + assert request_kwargs["params"]["before"] == "cursor_before" + assert request_kwargs["params"]["limit"] == 10 + assert request_kwargs["params"]["order"] == "desc" + assert "after" not in request_kwargs["params"] + + def test_list_role_assignments_with_after( + self, + mock_role_assignments_list, + capture_and_mock_http_client_request, + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments_list, 200 + ) + + syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC", + after="cursor_after", + ) + ) + + assert request_kwargs["params"]["after"] == "cursor_after" + assert request_kwargs["params"]["limit"] == 10 + assert request_kwargs["params"]["order"] == "desc" + assert "before" not in request_kwargs["params"] + + def test_list_role_assignments_with_order_desc( + self, + mock_role_assignments_list, + capture_and_mock_http_client_request, + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments_list, 200 + ) + + syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC", + order="desc", + ) + ) + + assert request_kwargs["params"]["order"] == "desc" + assert request_kwargs["params"]["limit"] == 10 + assert "before" not in request_kwargs["params"] + assert "after" not in request_kwargs["params"] + + def test_list_role_assignments_with_order_asc( + self, + mock_role_assignments_list, + capture_and_mock_http_client_request, + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments_list, 200 + ) + + syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC", + order="asc", + ) + ) + + assert request_kwargs["params"]["order"] == "asc" + assert request_kwargs["params"]["limit"] == 10 + assert "before" not in request_kwargs["params"] + assert "after" not in request_kwargs["params"] + + def test_list_role_assignments_with_all_parameters( + self, + mock_role_assignments_list, + capture_and_mock_http_client_request, + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_role_assignments_list, 200 + ) + + syncify( + self.authorization.list_role_assignments( + organization_membership_id="om_01ABC", + limit=5, + before="cursor_before", + after="cursor_after", + order="asc", + ) + ) + + assert request_kwargs["method"] == "get" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/role_assignments" + ) + assert request_kwargs["params"]["limit"] == 5 + assert request_kwargs["params"]["before"] == "cursor_before" + assert request_kwargs["params"]["after"] == "cursor_after" + assert request_kwargs["params"]["order"] == "asc" diff --git a/tests/utils/fixtures/mock_role_assignment.py b/tests/utils/fixtures/mock_role_assignment.py new file mode 100644 index 00000000..e9c789f0 --- /dev/null +++ b/tests/utils/fixtures/mock_role_assignment.py @@ -0,0 +1,68 @@ +from typing import Optional, Sequence + +from workos.types.authorization.role_assignment import ( + RoleAssignment, + RoleAssignmentResource, + RoleAssignmentRole, +) +from workos.types.list_resource import ListMetadata, ListPage + + +class MockRoleAssignment(RoleAssignment): + def __init__( + self, + id: str = "ra_01ABC", + role_slug: str = "admin", + resource_id: str = "res_01ABC", + resource_external_id: str = "ext_123", + resource_type_slug: str = "document", + created_at: str = "2024-01-01T00:00:00Z", + updated_at: str = "2024-01-01T00:00:00Z", + ): + super().__init__( + object="role_assignment", + id=id, + role=RoleAssignmentRole(slug=role_slug), + resource=RoleAssignmentResource( + id=resource_id, + external_id=resource_external_id, + resource_type_slug=resource_type_slug, + ), + created_at=created_at, + updated_at=updated_at, + ) + + +class MockRoleAssignmentsList(ListPage[RoleAssignment]): + def __init__( + self, + data: Optional[Sequence[RoleAssignment]] = None, + before: Optional[str] = None, + after: Optional[str] = "ra_01DEF", + ): + if data is None: + data = [ + MockRoleAssignment( + id="ra_01ABC", + role_slug="admin", + resource_id="res_01ABC", + resource_external_id="ext_123", + resource_type_slug="document", + created_at="2024-01-15T09:30:00.000Z", + updated_at="2024-01-15T09:30:00.000Z", + ), + MockRoleAssignment( + id="ra_01DEF", + role_slug="editor", + resource_id="res_01XYZ", + resource_external_id="ext_456", + resource_type_slug="folder", + created_at="2024-01-14T08:00:00.000Z", + updated_at="2024-01-14T08:00:00.000Z", + ), + ] + super().__init__( + object="list", + data=data, + list_metadata=ListMetadata(before=before, after=after), + )