Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions src/apify_client/_internal_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

from __future__ import annotations

from pydantic import BaseModel, ConfigDict
import json
from base64 import b64encode
from typing import Annotated, overload

from apify_client._models import ActorJobStatus # noqa: TC001
from pydantic import BaseModel, ConfigDict, Field, RootModel

from apify_client._models import ActorJobStatus, WebhookCreate


class ActorJob(BaseModel):
Expand All @@ -27,3 +31,58 @@ class ActorJobResponse(BaseModel):
model_config = ConfigDict(extra='allow')

data: ActorJob


class WebhookRepresentation(BaseModel):
"""Representation of a webhook for base64-encoded API transmission.

Contains only the fields needed for the webhook payload sent via query parameters.
"""

model_config = ConfigDict(populate_by_name=True, extra='ignore')

event_types: Annotated[list[str], Field(alias='eventTypes')]
request_url: Annotated[str, Field(alias='requestUrl')]
payload_template: Annotated[str | None, Field(alias='payloadTemplate')] = None
headers_template: Annotated[str | None, Field(alias='headersTemplate')] = None


class WebhookRepresentationList(RootModel[list[WebhookRepresentation]]):
"""List of webhook representations with base64 encoding support."""

@classmethod
def from_webhooks(cls, webhooks: list[dict | WebhookCreate]) -> WebhookRepresentationList:
"""Construct from a list of webhook dictionaries."""
representations = []
for webhook in webhooks:
webhook_dict = webhook.model_dump(exclude_none=True) if isinstance(webhook, WebhookCreate) else webhook
representations.append(WebhookRepresentation.model_validate(webhook_dict))
return cls(representations)

def to_base64(self) -> str:
"""Encode this list of webhook representations to a base64 string."""
data = [r.model_dump(by_alias=True, exclude_none=True) for r in self.root]
return b64encode(json.dumps(data).encode('utf-8')).decode('ascii')

@overload
@classmethod
def encode_to_base64(cls, webhooks: None) -> None: ...

@overload
@classmethod
def encode_to_base64(cls, webhooks: list[dict | WebhookCreate]) -> str: ...

@classmethod
def encode_to_base64(cls, webhooks: list[dict | WebhookCreate] | None) -> str | None:
"""Encode a list of webhooks to base64 for API transmission.

Args:
webhooks: A list of webhooks with keys like `event_types`, `request_url`, etc.
If None, returns None.

Returns:
A base64-encoded JSON string, or None if webhooks is None.
"""
if webhooks is None:
return None
return cls.from_webhooks(webhooks).to_base64()
15 changes: 8 additions & 7 deletions src/apify_client/_resource_clients/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic import TypeAdapter

from apify_client._docs import docs_group
from apify_client._internal_models import WebhookRepresentationList
from apify_client._models import (
Actor,
ActorPermissionLevel,
Expand All @@ -23,11 +24,11 @@
RunOrigin,
RunResponse,
UpdateActorRequest,
WebhookCreate,
)
from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync
from apify_client._utils import (
encode_key_value_store_record_value,
encode_webhook_list_to_base64,
response_to_dict,
to_seconds,
)
Expand Down Expand Up @@ -224,7 +225,7 @@ def start(
timeout: timedelta | None = None,
force_permission_level: ActorPermissionLevel | None = None,
wait_for_finish: int | None = None,
webhooks: list[dict] | None = None,
webhooks: list[dict | WebhookCreate] | None = None,
) -> Run:
"""Start the Actor and immediately return the Run object.

Expand Down Expand Up @@ -270,7 +271,7 @@ def start(
timeout=to_seconds(timeout, as_int=True),
waitForFinish=wait_for_finish,
forcePermissionLevel=force_permission_level.value if force_permission_level is not None else None,
webhooks=encode_webhook_list_to_base64(webhooks) if webhooks is not None else None,
webhooks=WebhookRepresentationList.encode_to_base64(webhooks),
)

response = self._http_client.call(
Expand All @@ -295,7 +296,7 @@ def call(
restart_on_error: bool | None = None,
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
webhooks: list[dict] | None = None,
webhooks: list[dict | WebhookCreate] | None = None,
force_permission_level: ActorPermissionLevel | None = None,
wait_duration: timedelta | None = None,
logger: Logger | None | Literal['default'] = 'default',
Expand Down Expand Up @@ -689,7 +690,7 @@ async def start(
timeout: timedelta | None = None,
force_permission_level: ActorPermissionLevel | None = None,
wait_for_finish: int | None = None,
webhooks: list[dict] | None = None,
webhooks: list[dict | WebhookCreate] | None = None,
) -> Run:
"""Start the Actor and immediately return the Run object.

Expand Down Expand Up @@ -735,7 +736,7 @@ async def start(
timeout=to_seconds(timeout, as_int=True),
waitForFinish=wait_for_finish,
forcePermissionLevel=force_permission_level.value if force_permission_level is not None else None,
webhooks=encode_webhook_list_to_base64(webhooks) if webhooks is not None else None,
webhooks=WebhookRepresentationList.encode_to_base64(webhooks),
)

response = await self._http_client.call(
Expand All @@ -760,7 +761,7 @@ async def call(
restart_on_error: bool | None = None,
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
webhooks: list[dict] | None = None,
webhooks: list[dict | WebhookCreate] | None = None,
force_permission_level: ActorPermissionLevel | None = None,
wait_duration: timedelta | None = None,
logger: Logger | None | Literal['default'] = 'default',
Expand Down
68 changes: 50 additions & 18 deletions src/apify_client/_resource_clients/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def list_and_lock_head(self, *, lock_duration: timedelta, limit: int | None = No
result = response_to_dict(response)
return HeadAndLockResponse.model_validate(result).data

def add_request(self, request: dict, *, forefront: bool | None = None) -> RequestRegistration:
def add_request(self, request: dict | RequestDraft, *, forefront: bool | None = None) -> RequestRegistration:
"""Add a request to the queue.

https://docs.apify.com/api/v2#/reference/request-queues/request-collection/add-request
Expand All @@ -178,12 +178,15 @@ def add_request(self, request: dict, *, forefront: bool | None = None) -> Reques
Returns:
The added request.
"""
if isinstance(request, dict):
request = RequestDraft.model_validate(request)

request_params = self._build_params(forefront=forefront, clientKey=self.client_key)

response = self._http_client.call(
url=self._build_url('requests'),
method='POST',
json=request,
json=request.model_dump(by_alias=True, exclude_none=True),
params=request_params,
timeout=FAST_OPERATION_TIMEOUT,
)
Expand Down Expand Up @@ -217,7 +220,7 @@ def get_request(self, request_id: str) -> Request | None:

return None

def update_request(self, request: dict, *, forefront: bool | None = None) -> RequestRegistration:
def update_request(self, request: dict | Request, *, forefront: bool | None = None) -> RequestRegistration:
"""Update a request in the queue.

https://docs.apify.com/api/v2#/reference/request-queues/request/update-request
Expand All @@ -229,14 +232,17 @@ def update_request(self, request: dict, *, forefront: bool | None = None) -> Req
Returns:
The updated request.
"""
request_id = request['id']
if isinstance(request, dict):
request = Request.model_validate(request)

request_id = request.id

request_params = self._build_params(forefront=forefront, clientKey=self.client_key)

response = self._http_client.call(
url=self._build_url(f'requests/{request_id}'),
method='PUT',
json=request,
json=request.model_dump(by_alias=True, exclude_none=True),
params=request_params,
timeout=STANDARD_OPERATION_TIMEOUT,
)
Expand Down Expand Up @@ -315,7 +321,7 @@ def delete_request_lock(self, request_id: str, *, forefront: bool | None = None)

def batch_add_requests(
self,
requests: list[dict],
requests: list[dict | RequestDraft],
*,
forefront: bool = False,
max_parallel: int = 1,
Expand Down Expand Up @@ -348,14 +354,19 @@ def batch_add_requests(
if max_parallel != 1:
raise NotImplementedError('max_parallel is only supported in async client')

requests_as_dicts: list[dict] = [
(RequestDraft.model_validate(r) if isinstance(r, dict) else r).model_dump(by_alias=True, exclude_none=True)
for r in requests
]

request_params = self._build_params(clientKey=self.client_key, forefront=forefront)

# Compute the payload size limit to ensure it doesn't exceed the maximum allowed size.
payload_size_limit_bytes = _MAX_PAYLOAD_SIZE_BYTES - math.ceil(_MAX_PAYLOAD_SIZE_BYTES * _SAFETY_BUFFER_PERCENT)

# Split the requests into batches, constrained by the max payload size and max requests per batch.
batches = constrained_batches(
requests,
requests_as_dicts,
max_size=payload_size_limit_bytes,
max_count=_RQ_MAX_REQUESTS_PER_BATCH,
)
Expand Down Expand Up @@ -394,21 +405,26 @@ def batch_add_requests(
)
).data

def batch_delete_requests(self, requests: list[dict]) -> BatchDeleteResult:
def batch_delete_requests(self, requests: list[dict | RequestDraft]) -> BatchDeleteResult:
"""Delete given requests from the queue.

https://docs.apify.com/api/v2#/reference/request-queues/batch-request-operations/delete-requests

Args:
requests: List of the requests to delete.
"""
requests_as_dicts: list[dict] = [
(RequestDraft.model_validate(r) if isinstance(r, dict) else r).model_dump(by_alias=True, exclude_none=True)
for r in requests
]

request_params = self._build_params(clientKey=self.client_key)

response = self._http_client.call(
url=self._build_url('requests/batch'),
method='DELETE',
params=request_params,
json=requests,
json=requests_as_dicts,
timeout=FAST_OPERATION_TIMEOUT,
)

Expand Down Expand Up @@ -580,7 +596,7 @@ async def list_and_lock_head(self, *, lock_duration: timedelta, limit: int | Non
result = response_to_dict(response)
return HeadAndLockResponse.model_validate(result).data

async def add_request(self, request: dict, *, forefront: bool | None = None) -> RequestRegistration:
async def add_request(self, request: dict | RequestDraft, *, forefront: bool | None = None) -> RequestRegistration:
"""Add a request to the queue.

https://docs.apify.com/api/v2#/reference/request-queues/request-collection/add-request
Expand All @@ -592,12 +608,15 @@ async def add_request(self, request: dict, *, forefront: bool | None = None) ->
Returns:
The added request.
"""
if isinstance(request, dict):
request = RequestDraft.model_validate(request)

request_params = self._build_params(forefront=forefront, clientKey=self.client_key)

response = await self._http_client.call(
url=self._build_url('requests'),
method='POST',
json=request,
json=request.model_dump(by_alias=True, exclude_none=True),
params=request_params,
timeout=FAST_OPERATION_TIMEOUT,
)
Expand Down Expand Up @@ -629,7 +648,7 @@ async def get_request(self, request_id: str) -> Request | None:
catch_not_found_or_throw(exc)
return None

async def update_request(self, request: dict, *, forefront: bool | None = None) -> RequestRegistration:
async def update_request(self, request: dict | Request, *, forefront: bool | None = None) -> RequestRegistration:
"""Update a request in the queue.

https://docs.apify.com/api/v2#/reference/request-queues/request/update-request
Expand All @@ -641,14 +660,17 @@ async def update_request(self, request: dict, *, forefront: bool | None = None)
Returns:
The updated request.
"""
request_id = request['id']
if isinstance(request, dict):
request = Request.model_validate(request)

request_id = request.id

request_params = self._build_params(forefront=forefront, clientKey=self.client_key)

response = await self._http_client.call(
url=self._build_url(f'requests/{request_id}'),
method='PUT',
json=request,
json=request.model_dump(by_alias=True, exclude_none=True),
params=request_params,
timeout=STANDARD_OPERATION_TIMEOUT,
)
Expand Down Expand Up @@ -777,7 +799,7 @@ async def _batch_add_requests_worker(

async def batch_add_requests(
self,
requests: list[dict],
requests: list[dict | RequestDraft],
*,
forefront: bool = False,
max_parallel: int = 5,
Expand Down Expand Up @@ -807,6 +829,11 @@ async def batch_add_requests(
if min_delay_between_unprocessed_requests_retries:
logger.warning('`min_delay_between_unprocessed_requests_retries` is deprecated and not used anymore.')

requests_as_dicts: list[dict] = [
(RequestDraft.model_validate(r) if isinstance(r, dict) else r).model_dump(by_alias=True, exclude_none=True)
for r in requests
]

asyncio_queue: asyncio.Queue[Iterable[dict]] = asyncio.Queue()
request_params = self._build_params(clientKey=self.client_key, forefront=forefront)

Expand All @@ -815,7 +842,7 @@ async def batch_add_requests(

# Split the requests into batches, constrained by the max payload size and max requests per batch.
batches = constrained_batches(
requests,
requests_as_dicts,
max_size=payload_size_limit_bytes,
max_count=_RQ_MAX_REQUESTS_PER_BATCH,
)
Expand Down Expand Up @@ -858,21 +885,26 @@ async def batch_add_requests(
)
).data

async def batch_delete_requests(self, requests: list[dict]) -> BatchDeleteResult:
async def batch_delete_requests(self, requests: list[dict | RequestDraft]) -> BatchDeleteResult:
"""Delete given requests from the queue.

https://docs.apify.com/api/v2#/reference/request-queues/batch-request-operations/delete-requests

Args:
requests: List of the requests to delete.
"""
requests_as_dicts: list[dict] = [
(RequestDraft.model_validate(r) if isinstance(r, dict) else r).model_dump(by_alias=True, exclude_none=True)
for r in requests
]

request_params = self._build_params(clientKey=self.client_key)

response = await self._http_client.call(
url=self._build_url('requests/batch'),
method='DELETE',
params=request_params,
json=requests,
json=requests_as_dicts,
timeout=FAST_OPERATION_TIMEOUT,
)
result = response_to_dict(response)
Expand Down
Loading
Loading