diff --git a/src/apify/_actor.py b/src/apify/_actor.py index ba41c115..9fc7ff1a 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -26,7 +26,7 @@ EventSystemInfoData, ) -from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation +from apify._charging import DEFAULT_DATASET_ITEM_EVENT, ChargeResult, ChargingManager, ChargingManagerImplementation from apify._configuration import Configuration from apify._consts import EVENT_LISTENERS_TIMEOUT from apify._crypto import decrypt_input_secrets, load_private_key @@ -380,14 +380,6 @@ def event_manager(self) -> EventManager: def _charging_manager_implementation(self) -> ChargingManagerImplementation: return ChargingManagerImplementation(self.configuration, self.apify_client) - @cached_property - def _charge_lock(self) -> asyncio.Lock: - """Lock to synchronize charge operations. - - Prevents race conditions between Actor.charge and Actor.push_data calls. - """ - return asyncio.Lock() - @cached_property def _storage_client(self) -> SmartApifyStorageClient: """Storage client used by the Actor. @@ -621,12 +613,8 @@ async def open_request_queue( storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud), ) - @overload - async def push_data(self, data: dict | list[dict]) -> None: ... - @overload - async def push_data(self, data: dict | list[dict], charged_event_name: str) -> ChargeResult: ... @_ensure_context - async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult | None: + async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult: """Store an object or a list of objects to the default dataset of the current Actor run. Args: @@ -634,35 +622,54 @@ async def push_data(self, data: dict | list[dict], charged_event_name: str | Non charged_event_name: If provided and if the Actor uses the pay-per-event pricing model, the method will attempt to charge for the event for each pushed item. """ + if charged_event_name and charged_event_name.startswith('apify-'): + raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually') + + charging_manager = self.get_charging_manager() + if not data: - return None + charged_event_name = charged_event_name or DEFAULT_DATASET_ITEM_EVENT + charge_limit_reached = charging_manager.is_event_charge_limit_reached(charged_event_name) + + return ChargeResult( + event_charge_limit_reached=charge_limit_reached, + charged_count=0, + chargeable_within_limit=charging_manager.compute_chargeable(), + ) data = data if isinstance(data, list) else [data] - # No charging, just push the data without locking. - if charged_event_name is None: - dataset = await self.open_dataset() - await dataset.push_data(data) - return None + dataset = await self.open_dataset() - # If charging is requested, acquire the charge lock to prevent race conditions between concurrent + # Acquire the charge lock to prevent race conditions between concurrent # push_data calls. We need to hold the lock for the entire push_data + charge sequence. - async with self._charge_lock: - max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit( - charged_event_name - ) - - # Push as many items as we can charge for. - pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data) + async with charging_manager.charge_lock(): + # Synthetic events are handled within dataset.push_data, only get data for `ChargeResult`. + if charged_event_name is None: + before = charging_manager.get_charged_event_count(DEFAULT_DATASET_ITEM_EVENT) + await dataset.push_data(data) + after = charging_manager.get_charged_event_count(DEFAULT_DATASET_ITEM_EVENT) + return ChargeResult( + event_charge_limit_reached=charging_manager.is_event_charge_limit_reached( + DEFAULT_DATASET_ITEM_EVENT + ), + charged_count=after - before, + chargeable_within_limit=charging_manager.compute_chargeable(), + ) - dataset = await self.open_dataset() + pushed_items_count = charging_manager.compute_push_data_limit( + items_count=len(data), + event_name=charged_event_name, + is_default_dataset=True, + ) if pushed_items_count < len(data): await dataset.push_data(data[:pushed_items_count]) elif pushed_items_count > 0: await dataset.push_data(data) - return await self.get_charging_manager().charge( + # Only charge explicit events; synthetic events will be processed within the client. + return await charging_manager.charge( event_name=charged_event_name, count=pushed_items_count, ) @@ -726,9 +733,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult: event_name: Name of the event to be charged for. count: Number of events to charge for. """ - # Acquire lock to prevent race conditions with concurrent charge/push_data calls. - async with self._charge_lock: - return await self.get_charging_manager().charge(event_name, count) + # charging_manager.charge() acquires charge_lock internally. + charging_manager = self.get_charging_manager() + return await charging_manager.charge(event_name, count) @overload def on( diff --git a/src/apify/_charging.py b/src/apify/_charging.py index c5ed7826..9efa700d 100644 --- a/src/apify/_charging.py +++ b/src/apify/_charging.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from contextvars import ContextVar from dataclasses import dataclass from datetime import datetime, timezone from decimal import Decimal @@ -16,7 +17,7 @@ PricePerDatasetItemActorPricingInfo, PricingModel, ) -from apify._utils import docs_group, ensure_context +from apify._utils import ReentrantLock, docs_group, ensure_context from apify.log import logger from apify.storages import Dataset @@ -29,6 +30,12 @@ run_validator = TypeAdapter[ActorRun | None](ActorRun | None) +DEFAULT_DATASET_ITEM_EVENT = 'apify-default-dataset-item' + +# Context variable to hold the current `ChargingManager` instance, if any. This allows PPE-aware dataset clients to +# access the charging manager without needing to pass it explicitly. +charging_manager_ctx: ContextVar[ChargingManager | None] = ContextVar('charging_manager_ctx', default=None) + _ensure_context = ensure_context('active') @@ -45,6 +52,9 @@ class ChargingManager(Protocol): - Apify platform documentation: https://docs.apify.com/platform/actors/publishing/monetize """ + charge_lock: ReentrantLock + """Lock to synchronize charge operations. Prevents race conditions between `charge` and `push_data` calls.""" + async def charge(self, event_name: str, count: int = 1) -> ChargeResult: """Charge for a specified number of events - sub-operations of the Actor. @@ -81,6 +91,34 @@ def get_charged_event_count(self, event_name: str) -> int: def get_max_total_charge_usd(self) -> Decimal: """Get the configured maximum total charge for this Actor run.""" + def compute_push_data_limit( + self, + items_count: int, + event_name: str, + *, + is_default_dataset: bool, + ) -> int: + """Compute how many items can be pushed and charged within the current budget. + + Accounts for both the explicit event and the synthetic `DEFAULT_DATASET_ITEM_EVENT` event, + so that the combined cost per item does not exceed the remaining budget. + + Args: + items_count: The number of items to be pushed. + event_name: The explicit event name to charge for each item. + is_default_dataset: Whether the data is pushed to the default dataset. + If True, the synthetic event cost is included in the combined price. + + Returns: + Max number of items that can be pushed within the budget. + """ + + def is_event_charge_limit_reached(self, event_name: str) -> bool: + """Return True if the remaining budget is insufficient to charge even a single event of the given type.""" + + def compute_chargeable(self) -> dict[str, int | None]: + """Compute the maximum number of events of each type that can be charged within the current budget.""" + @docs_group('Charging') @dataclass(frozen=True) @@ -137,6 +175,8 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No self._not_ppe_warning_printed = False self.active = False + self.charge_lock = ReentrantLock() + async def __aenter__(self) -> None: """Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually.""" # Validate config @@ -190,6 +230,11 @@ async def __aenter__(self) -> None: self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME) + # if the Actor runs with the pay-per-event pricing model, set the context variable so that PPE-aware dataset + # clients can access the charging manager and charge for synthetic events. + if self._pricing_model == 'PAY_PER_EVENT': + charging_manager_ctx.set(self) + async def __aexit__( self, exc_type: type[BaseException] | None, @@ -199,17 +244,11 @@ async def __aexit__( if not self.active: raise RuntimeError('Exiting an uninitialized ChargingManager') + charging_manager_ctx.set(None) self.active = False @_ensure_context async def charge(self, event_name: str, count: int = 1) -> ChargeResult: - def calculate_chargeable() -> dict[str, int | None]: - """Calculate the maximum number of events of each type that can be charged within the current budget.""" - return { - event_name: self.calculate_max_event_charge_count_within_limit(event_name) - for event_name in self._pricing_info - } - # For runs that do not use the pay-per-event pricing model, just print a warning and return if self._pricing_model != 'PAY_PER_EVENT': if not self._not_ppe_warning_printed: @@ -221,75 +260,81 @@ def calculate_chargeable() -> dict[str, int | None]: return ChargeResult( event_charge_limit_reached=False, charged_count=0, - chargeable_within_limit=calculate_chargeable(), + chargeable_within_limit=self.compute_chargeable(), ) - # START OF CRITICAL SECTION - no awaits here - - # Determine the maximum amount of events that can be charged within the budget - max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name) - charged_count = min(count, max_chargeable if max_chargeable is not None else count) - - if charged_count == 0: + if count <= 0: return ChargeResult( - event_charge_limit_reached=True, + event_charge_limit_reached=self.is_event_charge_limit_reached(event_name), charged_count=0, - chargeable_within_limit=calculate_chargeable(), + chargeable_within_limit=self.compute_chargeable(), ) - pricing_info = self._pricing_info.get( - event_name, - PricingInfoItem( - # Use a nonzero price for local development so that the maximum budget can be reached. - price=Decimal() if self._is_at_home else Decimal(1), - title=f"Unknown event '{event_name}'", - ), - ) - - # Update the charging state - self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) - self._charging_state[event_name].charge_count += charged_count - self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price + async with self.charge_lock(): + # Determine the maximum amount of events that can be charged within the budget + max_chargeable = self.calculate_max_event_charge_count_within_limit(event_name) + charged_count = min(count, max_chargeable if max_chargeable is not None else count) - # END OF CRITICAL SECTION + if charged_count == 0: + return ChargeResult( + event_charge_limit_reached=True, + charged_count=0, + chargeable_within_limit=self.compute_chargeable(), + ) - # If running on the platform, call the charge endpoint - if self._is_at_home: - if self._actor_run_id is None: - raise RuntimeError('Actor run ID not configured') - - if event_name in self._pricing_info: - await self._client.run(self._actor_run_id).charge(event_name, charged_count) - else: - logger.warning(f"Attempting to charge for an unknown event '{event_name}'") - - # Log the charged operation (if enabled) - if self._charging_log_dataset: - await self._charging_log_dataset.push_data( - { - 'event_name': event_name, - 'event_title': pricing_info.title, - 'event_price_usd': round(pricing_info.price, 3), - 'charged_count': charged_count, - 'timestamp': datetime.now(timezone.utc).isoformat(), - } + pricing_info = self._pricing_info.get( + event_name, + PricingInfoItem( + # Use a nonzero price for local development so that the maximum budget can be reached. + price=Decimal() if self._is_at_home else Decimal(1), + title=f"Unknown event '{event_name}'", + ), ) - # If it is not possible to charge the full amount, log that fact - if charged_count < count: - subject = 'instance' if count == 1 else 'instances' - logger.info( - f"Charging {count} {subject} of '{event_name}' event would exceed max_total_charge_usd " - f'- only {charged_count} events were charged' - ) + # Update the charging state + self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal())) + self._charging_state[event_name].charge_count += charged_count + self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price + + # If running on the platform, call the charge endpoint + if self._is_at_home: + if self._actor_run_id is None: + raise RuntimeError('Actor run ID not configured') + + if event_name.startswith('apify-'): + # Synthetic events (e.g. apify-default-dataset-item) are tracked internally only, + # the platform handles them automatically based on dataset writes. + pass + elif event_name in self._pricing_info: + await self._client.run(self._actor_run_id).charge(event_name, charged_count) + else: + logger.warning(f"Attempting to charge for an unknown event '{event_name}'") + + # Log the charged operation (if enabled) + if self._charging_log_dataset: + await self._charging_log_dataset.push_data( + { + 'event_name': event_name, + 'event_title': pricing_info.title, + 'event_price_usd': round(pricing_info.price, 3), + 'charged_count': charged_count, + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + ) - max_charge_count = self.calculate_max_event_charge_count_within_limit(event_name) + # If it is not possible to charge the full amount, log that fact + if charged_count < count: + subject = 'instance' if count == 1 else 'instances' + logger.info( + f"Charging {count} {subject} of '{event_name}' event would exceed max_total_charge_usd " + f'- only {charged_count} events were charged' + ) - return ChargeResult( - event_charge_limit_reached=max_charge_count is not None and max_charge_count <= 0, - charged_count=charged_count, - chargeable_within_limit=calculate_chargeable(), - ) + return ChargeResult( + event_charge_limit_reached=self.is_event_charge_limit_reached(event_name), + charged_count=charged_count, + chargeable_within_limit=self.compute_chargeable(), + ) @_ensure_context def calculate_total_charged_amount(self) -> Decimal: @@ -300,14 +345,7 @@ def calculate_total_charged_amount(self) -> Decimal: @_ensure_context def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None: - pricing_info = self._pricing_info.get(event_name) - - if pricing_info is not None: - price = pricing_info.price - elif not self._is_at_home: - price = Decimal(1) # Use a nonzero price for local development so that the maximum budget can be reached - else: - price = Decimal() + price = self._get_event_price(event_name) if not price: return None @@ -337,6 +375,37 @@ def get_charged_event_count(self, event_name: str) -> int: def get_max_total_charge_usd(self) -> Decimal: return self._max_total_charge_usd + @_ensure_context + def compute_push_data_limit( + self, + items_count: int, + event_name: str, + *, + is_default_dataset: bool, + ) -> int: + explicit_price = self._get_event_price(event_name) + synthetic_price = self._get_event_price(DEFAULT_DATASET_ITEM_EVENT) if is_default_dataset else Decimal(0) + combined_price = explicit_price + synthetic_price + + if not combined_price: + return items_count + + result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / combined_price + max_count = max(0, math.floor(result)) if result.is_finite() else items_count + return min(items_count, max_count) + + @_ensure_context + def is_event_charge_limit_reached(self, event_name: str) -> bool: + max_charge_count = self.calculate_max_event_charge_count_within_limit(event_name) + return max_charge_count is not None and max_charge_count <= 0 + + @_ensure_context + def compute_chargeable(self) -> dict[str, int | None]: + return { + event_name: self.calculate_max_event_charge_count_within_limit(event_name) + for event_name in self._pricing_info + } + async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict: """Fetch pricing information from environment variables or API.""" # Check if pricing info is available via environment variables @@ -370,6 +439,12 @@ async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict: max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'), ) + def _get_event_price(self, event_name: str) -> Decimal: + pricing_info = self._pricing_info.get(event_name) + if pricing_info is not None: + return pricing_info.price + return Decimal(0) if self._is_at_home else Decimal(1) + @dataclass class ChargingStateItem: diff --git a/src/apify/_utils.py b/src/apify/_utils.py index 33cb3d54..38e9a66e 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -1,13 +1,18 @@ from __future__ import annotations +import asyncio import builtins import inspect import sys from collections.abc import Callable +from contextlib import asynccontextmanager from enum import Enum from functools import wraps from importlib import metadata -from typing import Any, Literal, TypeVar, cast +from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast + +if TYPE_CHECKING: + from collections.abc import AsyncIterator T = TypeVar('T', bound=Callable[..., Any]) @@ -123,3 +128,25 @@ def maybe_extract_enum_member_value(maybe_enum_member: Any) -> Any: if isinstance(maybe_enum_member, Enum): return maybe_enum_member.value return maybe_enum_member + + +class ReentrantLock: + """A reentrant lock implementation for asyncio using asyncio.Lock.""" + + def __init__(self) -> None: + self._lock = asyncio.Lock() + self._owner: asyncio.Task | None = None + + @asynccontextmanager + async def __call__(self) -> AsyncIterator[None]: + """Acquire the lock if it's not already owned by the current task, otherwise proceed without acquiring.""" + me = asyncio.current_task() + if self._owner is me: + yield + return + async with self._lock: + self._owner = me + try: + yield + finally: + self._owner = None diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index a918bddd..2287ec66 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -13,6 +13,7 @@ from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata from ._api_client_creation import create_storage_api_client +from apify.storage_clients._ppe_dataset_mixin import DatasetClientPpeMixin if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -25,7 +26,7 @@ logger = getLogger(__name__) -class ApifyDatasetClient(DatasetClient): +class ApifyDatasetClient(DatasetClient, DatasetClientPpeMixin): """An Apify platform implementation of the dataset client.""" _MAX_PAYLOAD_SIZE = ByteSize.from_mb(9) @@ -48,6 +49,9 @@ def __init__( Preferably use the `ApifyDatasetClient.open` class method to create a new instance. """ + DatasetClient.__init__(self) + DatasetClientPpeMixin.__init__(self) + self._api_client = api_client """The Apify dataset client for API operations.""" @@ -108,12 +112,18 @@ async def open( id=id, ) - return cls( + dataset_client = cls( api_client=api_client, api_public_base_url='', # Remove in version 4.0, https://github.com/apify/apify-sdk-python/issues/635 lock=asyncio.Lock(), ) + dataset_client.is_default_dataset = ( + alias is None and name is None and (id is None or id == configuration.default_dataset_id) + ) + + return dataset_client + @override async def purge(self) -> None: raise NotImplementedError( @@ -128,21 +138,19 @@ async def drop(self) -> None: @override async def push_data(self, data: list[Any] | dict[str, Any]) -> None: - async def payloads_generator() -> AsyncIterator[str]: - for index, item in enumerate(data): + async def payloads_generator(items: list[Any]) -> AsyncIterator[str]: + for index, item in enumerate(items): yield await self._check_and_serialize(item, index) - async with self._lock: - # Handle lists - if isinstance(data, list): - # Invoke client in series to preserve the order of data - async for items in self._chunk_by_size(payloads_generator()): - await self._api_client.push_items(items=items) + async with self._charge_lock(), self._lock: + items = data if isinstance(data, list) else [data] + limit = self._compute_limit_for_push(len(items)) + items = items[:limit] - # Handle singular items - else: - items = await self._check_and_serialize(data) - await self._api_client.push_items(items=items) + async for chunk in self._chunk_by_size(payloads_generator(items)): + await self._api_client.push_items(items=chunk) + + await self._charge_for_items(count_items=limit) @override async def get_data( diff --git a/src/apify/storage_clients/_file_system/_dataset_client.py b/src/apify/storage_clients/_file_system/_dataset_client.py new file mode 100644 index 00000000..04af9ae2 --- /dev/null +++ b/src/apify/storage_clients/_file_system/_dataset_client.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self, override + +from crawlee.storage_clients._file_system import FileSystemDatasetClient + +from apify.storage_clients._ppe_dataset_mixin import DatasetClientPpeMixin + +if TYPE_CHECKING: + from crawlee.configuration import Configuration + + +class ApifyFileSystemDatasetClient(FileSystemDatasetClient, DatasetClientPpeMixin): + """Apify-specific implementation of the `FileSystemDatasetClient`. + + It extends the functionality of `FileSystemDatasetClient` using `DatasetClientPpeMixin` and updates `push_data` to + limit and charge for the synthetic `apify-default-dataset-item` event. This is necessary for consistent behavior + when locally testing the `PAY_PER_EVENT` pricing model. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + FileSystemDatasetClient.__init__(self, *args, **kwargs) + DatasetClientPpeMixin.__init__(self) + + @override + @classmethod + async def open( + cls, + *, + id: str | None, + name: str | None, + alias: str | None, + configuration: Configuration, + ) -> Self: + + dataset_client = await super().open( + id=id, + name=name, + alias=alias, + configuration=configuration, + ) + + dataset_client.is_default_dataset = all(v is None for v in (id, name, alias)) + + return dataset_client + + @override + async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: + async with self._charge_lock(): + items = data if isinstance(data, list) else [data] + limit = self._compute_limit_for_push(len(items)) + + await super().push_data(items[:limit]) + + await self._charge_for_items(limit) diff --git a/src/apify/storage_clients/_file_system/_storage_client.py b/src/apify/storage_clients/_file_system/_storage_client.py index 2b7134c7..8332f0fd 100644 --- a/src/apify/storage_clients/_file_system/_storage_client.py +++ b/src/apify/storage_clients/_file_system/_storage_client.py @@ -7,6 +7,7 @@ from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient +from ._dataset_client import ApifyFileSystemDatasetClient from ._key_value_store_client import ApifyFileSystemKeyValueStoreClient if TYPE_CHECKING: @@ -48,3 +49,22 @@ async def create_kvs_client( ) await self._purge_if_needed(client, configuration) return client + + @override + async def create_dataset_client( + self, + *, + id: str | None = None, + name: str | None = None, + alias: str | None = None, + configuration: Configuration | None = None, + ) -> ApifyFileSystemDatasetClient: + configuration = configuration or Configuration.get_global_configuration() + client = await ApifyFileSystemDatasetClient.open( + id=id, + name=name, + alias=alias, + configuration=configuration, + ) + await self._purge_if_needed(client, configuration) + return client diff --git a/src/apify/storage_clients/_ppe_dataset_mixin.py b/src/apify/storage_clients/_ppe_dataset_mixin.py new file mode 100644 index 00000000..f68361ad --- /dev/null +++ b/src/apify/storage_clients/_ppe_dataset_mixin.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from contextlib import asynccontextmanager +from typing import TYPE_CHECKING + +from apify._charging import DEFAULT_DATASET_ITEM_EVENT, charging_manager_ctx + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + +class DatasetClientPpeMixin: + """A mixin for dataset clients to add support for PPE pricing model and tracking synthetic events.""" + + def __init__(self) -> None: + self.is_default_dataset = False + + def _compute_limit_for_push(self, items_count: int) -> int: + if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): + max_charged_count = charging_manager.calculate_max_event_charge_count_within_limit( + event_name=DEFAULT_DATASET_ITEM_EVENT + ) + return min(max_charged_count, items_count) if max_charged_count is not None else items_count + return items_count + + async def _charge_for_items(self, count_items: int) -> None: + if self.is_default_dataset and (charging_manager := charging_manager_ctx.get()): + await charging_manager.charge( + event_name=DEFAULT_DATASET_ITEM_EVENT, + count=count_items, + ) + + @asynccontextmanager + async def _charge_lock(self) -> AsyncIterator[None]: + """Context manager to acquire the charge lock if PPE charging manager is active.""" + charging_manager = charging_manager_ctx.get() + if charging_manager: + async with charging_manager.charge_lock(): + yield + else: + yield diff --git a/tests/e2e/test_actor_charge.py b/tests/e2e/test_actor_charge.py index d72062bc..0e2e98a0 100644 --- a/tests/e2e/test_actor_charge.py +++ b/tests/e2e/test_actor_charge.py @@ -20,6 +20,53 @@ from .conftest import MakeActorFunction, RunActorFunction +@pytest_asyncio.fixture(scope='module', loop_scope='module') +async def ppe_push_data_actor_build(make_actor: MakeActorFunction) -> str: + async def main() -> None: + async with Actor: + await Actor.push_data( + [{'id': i} for i in range(5)], + 'push-item', + ) + + actor_client = await make_actor('ppe-push-data', main_func=main) + + await actor_client.update( + pricing_infos=[ + { + 'pricingModel': 'PAY_PER_EVENT', + 'pricingPerEvent': { + 'actorChargeEvents': { + 'push-item': { + 'eventTitle': 'Push item', + 'eventPriceUsd': 0.05, + 'eventDescription': 'One pushed item', + }, + 'apify-default-dataset-item': { + 'eventTitle': 'Default dataset item', + 'eventPriceUsd': 0.05, + 'eventDescription': 'One item written to the default dataset', + }, + }, + }, + }, + ] + ) + + actor = await actor_client.get() + + assert actor is not None + return str(actor['id']) + + +@pytest_asyncio.fixture(scope='function', loop_scope='module') +async def ppe_push_data_actor( + ppe_push_data_actor_build: str, + apify_client_async: ApifyClientAsync, +) -> ActorClientAsync: + return apify_client_async.actor(ppe_push_data_actor_build) + + @pytest_asyncio.fixture(scope='module', loop_scope='module') async def ppe_actor_build(make_actor: MakeActorFunction) -> str: async def main() -> None: @@ -114,3 +161,58 @@ async def test_actor_charge_limit( except AssertionError: if is_last_attempt: raise + + +async def test_actor_push_data_charges_both_events( + ppe_push_data_actor: ActorClientAsync, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" + run = await run_actor(ppe_push_data_actor) + + # Refetch until the platform gets its act together + for is_last_attempt, _ in retry_counter(30): + await asyncio.sleep(1) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + try: + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == { + 'push-item': 5, + 'apify-default-dataset-item': 5, + } + break + except AssertionError: + if is_last_attempt: + raise + + +async def test_actor_push_data_combined_budget_limit( + ppe_push_data_actor: ActorClientAsync, + run_actor: RunActorFunction, + apify_client_async: ApifyClientAsync, +) -> None: + """Test that push_data respects combined budget: explicit ($0.05) + synthetic ($0.05) = $0.10/item. + + With max_total_charge_usd=$0.20, only 2 of 5 items fit in the budget. + """ + run = await run_actor(ppe_push_data_actor, max_total_charge_usd=Decimal('0.20')) + + # Refetch until the platform gets its act together + for is_last_attempt, _ in retry_counter(30): + await asyncio.sleep(1) + updated_run = await apify_client_async.run(run.id).get() + run = ActorRun.model_validate(updated_run) + + try: + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == { + 'push-item': 2, + 'apify-default-dataset-item': 2, + } + break + except AssertionError: + if is_last_attempt: + raise diff --git a/tests/unit/actor/test_actor_charge.py b/tests/unit/actor/test_actor_charge.py index c9b14a88..4e452e78 100644 --- a/tests/unit/actor/test_actor_charge.py +++ b/tests/unit/actor/test_actor_charge.py @@ -1,3 +1,4 @@ +import asyncio from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from decimal import Decimal @@ -18,11 +19,15 @@ class MockedChargingSetup(NamedTuple): @asynccontextmanager -async def setup_mocked_charging(configuration: Configuration) -> AsyncGenerator[MockedChargingSetup]: +async def setup_mocked_charging( + configuration: Configuration, pricing_info: dict[str, Decimal] +) -> AsyncGenerator[MockedChargingSetup]: """Context manager that sets up an Actor with mocked charging on Apify platform. Usage: - async with setup_mocked_charging(Decimal('10.0')) as setup: + configuration = Configuration( max_total_charge_usd=Decimal('1.5'), test_pay_per_event=True) + pricing_info = {'event': Decimal('1.0')} + async with setup_mocked_charging(configuration, pricing_info) as setup: # Add pricing info for events setup.charging_mgr._pricing_info['event'] = PricingInfoItem(Decimal('1.0'), 'Event') @@ -46,12 +51,17 @@ async def setup_mocked_charging(configuration: Configuration) -> AsyncGenerator[ patch.object(charging_mgr_impl, '_actor_run_id', 'test-run-id'), patch.object(charging_mgr_impl, '_client', mock_client), ): - yield MockedChargingSetup( + setup = MockedChargingSetup( charging_mgr=charging_mgr_impl, # ty: ignore[invalid-argument-type] mock_charge=mock_charge, mock_client=mock_client, ) + for event_name, price in pricing_info.items(): + setup.charging_mgr._pricing_info[event_name] = PricingInfoItem(price, title=event_name.title()) + + yield setup + async def test_actor_charge_push_data_with_no_remaining_budget() -> None: """Test that the API client is NOT called when budget is exhausted during push_data. @@ -59,12 +69,9 @@ async def test_actor_charge_push_data_with_no_remaining_budget() -> None: When push_data can't afford to charge for any items, it correctly avoids calling the API. """ async with setup_mocked_charging( - Configuration(max_total_charge_usd=Decimal('1.5'), test_pay_per_event=True) + Configuration(max_total_charge_usd=Decimal('1.5'), test_pay_per_event=True), + {'some-event': Decimal('1.0'), 'another-event': Decimal('1.0')}, ) as setup: - # Add pricing info for the events - setup.charging_mgr._pricing_info['some-event'] = PricingInfoItem(Decimal('1.0'), 'Some Event') - setup.charging_mgr._pricing_info['another-event'] = PricingInfoItem(Decimal('1.0'), 'Another Event') - # Exhaust most of the budget (events cost $1 each) result1 = await Actor.charge('some-event', count=1) # Costs $1, leaving $0.5 @@ -95,11 +102,8 @@ async def test_actor_charge_push_data_with_no_remaining_budget() -> None: async def test_actor_charge_api_call_verification() -> None: """Verify that charge() makes API calls correctly.""" async with setup_mocked_charging( - Configuration(max_total_charge_usd=Decimal('10.0'), test_pay_per_event=True) + Configuration(max_total_charge_usd=Decimal('10.0'), test_pay_per_event=True), {'test-event': Decimal('1.0')} ) as setup: - # Add pricing info for the event - setup.charging_mgr._pricing_info['test-event'] = PricingInfoItem(Decimal('1.0'), 'Test Event') - # Call charge directly with count=0 - this should NOT call the API result1 = await Actor.charge('test-event', count=0) setup.mock_charge.assert_not_called() @@ -138,11 +142,93 @@ async def test_max_event_charge_count_within_limit_tolerates_overdraw() -> None: test_pay_per_event=True, ) - async with setup_mocked_charging(configuration) as setup: + async with setup_mocked_charging(configuration, {}) as setup: max_count = setup.charging_mgr.calculate_max_event_charge_count_within_limit('event') assert max_count == 0 +async def test_push_data_combined_price_limits_items() -> None: + """Test that push_data limits items when the combined explicit + synthetic event price exceeds the budget.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('3.00'), test_pay_per_event=True), + {'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}, + ): + data = [{'id': i} for i in range(5)] + result = await Actor.push_data(data, 'scrape') + + assert result is not None + assert result.charged_count == 1 + + dataset = await Actor.open_dataset() + items = await dataset.get_data() + assert len(items.items) == 1 + assert items.items[0] == {'id': 0} + + +async def test_push_data_charges_synthetic_event_for_default_dataset() -> None: + """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('10.00'), test_pay_per_event=True), + {'test': Decimal('0.10'), 'apify-default-dataset-item': Decimal('0.05')}, + ) as setup: + data = [{'id': i} for i in range(3)] + result = await Actor.push_data(data, 'test') + + assert result is not None + assert result.charged_count == 3 + + # Both explicit and synthetic events should be charged + assert setup.charging_mgr.get_charged_event_count('test') == 3 + assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 3 + + +async def test_charge_lock_concurrent_actor_and_dataset_push() -> None: + """Test that charge_lock properly synchronizes concurrent Actor.push_data and dataset.push_data calls.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('10.00'), test_pay_per_event=True), + {'event': Decimal('0.10'), 'apify-default-dataset-item': Decimal('0.10')}, + ) as setup: + dataset = await Actor.open_dataset() + + # Run concurrent pushes - Actor.push_data and direct dataset.push_data + await asyncio.gather( + Actor.push_data([{'source': 'actor', 'id': i} for i in range(5)], 'event'), + dataset.push_data([{'source': 'dataset', 'id': i} for i in range(5)]), + ) + + # Verify all items were pushed + items = await dataset.get_data() + assert len(items.items) == 10 + + # Verify charging was tracked correctly: + # - Actor.push_data charged 'event' (5) + 'apify-default-dataset-item' (5) + # - dataset.push_data charged 'apify-default-dataset-item' (5) + assert setup.charging_mgr.get_charged_event_count('event') == 5 + assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 10 + + +async def test_charge_lock_concurrent_with_limited_budget() -> None: + """Test that charge_lock correctly limits items when concurrent pushes compete for limited budget.""" + async with setup_mocked_charging( + Configuration(max_total_charge_usd=Decimal('0.50'), test_pay_per_event=True), + {'apify-default-dataset-item': Decimal('0.10')}, + ) as setup: + dataset = await Actor.open_dataset() + + # Both try to push 5 items, but budget only allows 5 total + await asyncio.gather( + dataset.push_data([{'source': 'a', 'id': i} for i in range(5)]), + dataset.push_data([{'source': 'b', 'id': i} for i in range(5)]), + ) + + # Verify total items pushed does not exceed budget limit + items = await dataset.get_data() + assert len(items.items) == 5 # Budget allows max 5 items at $0.10 each + + # Verify total charged events matches items pushed + assert setup.charging_mgr.get_charged_event_count('apify-default-dataset-item') == 5 + + async def test_charge_with_overdrawn_budget() -> None: configuration = Configuration( max_total_charge_usd=Decimal('0.00025'), @@ -167,7 +253,7 @@ async def test_charge_with_overdrawn_budget() -> None: test_pay_per_event=True, ) - async with setup_mocked_charging(configuration) as setup: + async with setup_mocked_charging(configuration, {}) as setup: charge_result = await Actor.charge('event', 1) assert charge_result.charged_count == 0 # The budget doesn't allow another event diff --git a/tests/unit/actor/test_actor_helpers.py b/tests/unit/actor/test_actor_helpers.py index 5b15ce82..de2ff920 100644 --- a/tests/unit/actor/test_actor_helpers.py +++ b/tests/unit/actor/test_actor_helpers.py @@ -184,13 +184,13 @@ async def test_set_terminal_status_message_locally(caplog: pytest.LogCaptureFixt async def test_push_data_with_empty_data() -> None: - """Test that push_data returns None when data is empty.""" + """Test that push_data returns result with `charged_count` 0 when data is empty.""" async with Actor: result = await Actor.push_data([]) - assert result is None + assert result.charged_count == 0 result = await Actor.push_data({}) - assert result is None + assert result.charged_count == 0 async def test_off_removes_event_listener(monkeypatch: pytest.MonkeyPatch) -> None: diff --git a/tests/unit/actor/test_charging_manager.py b/tests/unit/actor/test_charging_manager.py index 4a1ef480..10a8474b 100644 --- a/tests/unit/actor/test_charging_manager.py +++ b/tests/unit/actor/test_charging_manager.py @@ -247,6 +247,78 @@ async def test_get_max_total_charge_usd(mock_client: MagicMock) -> None: assert cm.get_max_total_charge_usd() == Decimal('42.50') +async def test_compute_push_data_limit_no_ppe(mock_client: MagicMock) -> None: + """Returns items_count when no PPE pricing is configured (prices are zero).""" + config = _make_config(actor_pricing_info=None, charged_event_counts={}) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + result = cm.compute_push_data_limit(10, 'some-event', is_default_dataset=True) + assert result == 10 + + +async def test_compute_push_data_limit_within_budget(mock_client: MagicMock) -> None: + """Returns full items_count when combined budget is sufficient for all items.""" + pricing_info = _make_ppe_pricing_info({'click': Decimal('0.01'), 'apify-default-dataset-item': Decimal('0.01')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('10.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # combined price = 0.02/item, budget = 10.00, max = 500 + result = cm.compute_push_data_limit(5, 'click', is_default_dataset=True) + assert result == 5 + + +async def test_compute_push_data_limit_budget_exceeded(mock_client: MagicMock) -> None: + """Returns capped count when combined price (explicit + synthetic) exceeds budget.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # combined price = 2.00/item, budget = 3.00, max = floor(3/2) = 1 + result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=True) + assert result == 1 + + +async def test_compute_push_data_limit_without_default_dataset(mock_client: MagicMock) -> None: + """When not pushing to the default dataset, only explicit event price is considered.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00'), 'apify-default-dataset-item': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + # explicit price only = 1.00/item, budget = 3.00, max = floor(3/1) = 3 + result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=False) + assert result == 3 + + +async def test_compute_push_data_limit_exhausted_budget(mock_client: MagicMock) -> None: + """Returns 0 when the budget is fully exhausted before the push.""" + pricing_info = _make_ppe_pricing_info({'scrape': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={'scrape': 3}, + max_total_charge_usd=Decimal('3.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + result = cm.compute_push_data_limit(5, 'scrape', is_default_dataset=False) + assert result == 0 + + async def test_charge_limit_reached(mock_client: MagicMock) -> None: """Test that event_charge_limit_reached is True when budget is exhausted.""" pricing_info = _make_ppe_pricing_info({'search': Decimal('5.00')}) @@ -265,3 +337,75 @@ async def test_charge_limit_reached(mock_client: MagicMock) -> None: result2 = await cm.charge('search', count=1) assert result2.charged_count == 0 assert result2.event_charge_limit_reached is True + + +async def test_is_event_charge_limit_reached_within_budget(mock_client: MagicMock) -> None: + """Test that is_event_charge_limit_reached returns False when budget allows at least one more charge.""" + pricing_info = _make_ppe_pricing_info({'search': Decimal('1.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('5.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + assert cm.is_event_charge_limit_reached('search') is False + await cm.charge('search', count=3) + assert cm.is_event_charge_limit_reached('search') is False + + +async def test_is_event_charge_limit_reached_budget_exhausted(mock_client: MagicMock) -> None: + """Test that is_event_charge_limit_reached returns True when budget can no longer cover a single event.""" + pricing_info = _make_ppe_pricing_info({'search': Decimal('5.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('5.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + assert cm.is_event_charge_limit_reached('search') is False + await cm.charge('search', count=1) + assert cm.is_event_charge_limit_reached('search') is True + + +async def test_compute_chargeable_returns_all_known_events(mock_client: MagicMock) -> None: + """Test that compute_chargeable returns entries for all known event types.""" + pricing_info = _make_ppe_pricing_info({'search': Decimal('1.00'), 'scrape': Decimal('2.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('10.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + chargeable = cm.compute_chargeable() + assert set(chargeable.keys()) == {'search', 'scrape'} + assert chargeable['search'] == 10 + assert chargeable['scrape'] == 5 + + +async def test_compute_chargeable_updates_after_charge(mock_client: MagicMock) -> None: + """Test that compute_chargeable reflects remaining budget after charging.""" + pricing_info = _make_ppe_pricing_info({'search': Decimal('1.00'), 'scrape': Decimal('2.00')}) + config = _make_config( + test_pay_per_event=True, + actor_pricing_info=pricing_info, + charged_event_counts={}, + max_total_charge_usd=Decimal('10.00'), + ) + cm = ChargingManagerImplementation(config, mock_client) + async with cm: + chargeable = cm.compute_chargeable() + assert chargeable['search'] == 10 + assert chargeable['scrape'] == 5 + + await cm.charge('search', count=4) + chargeable = cm.compute_chargeable() + + # $6.00 remaining: search=$1.00 → 6, scrape=$2.00 → 3 + assert chargeable['search'] == 6 + assert chargeable['scrape'] == 3