Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 40 additions & 33 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
EventSystemInfoData,
)

from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation
from apify._charging import DEFAULT_DATASET_ITEM_EVENT, ChargeResult, ChargingManager, ChargingManagerImplementation
from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
from apify._crypto import decrypt_input_secrets, load_private_key
Expand Down Expand Up @@ -380,14 +380,6 @@ def event_manager(self) -> EventManager:
def _charging_manager_implementation(self) -> ChargingManagerImplementation:
return ChargingManagerImplementation(self.configuration, self.apify_client)

@cached_property
def _charge_lock(self) -> asyncio.Lock:
"""Lock to synchronize charge operations.

Prevents race conditions between Actor.charge and Actor.push_data calls.
"""
return asyncio.Lock()

@cached_property
def _storage_client(self) -> SmartApifyStorageClient:
"""Storage client used by the Actor.
Expand Down Expand Up @@ -621,48 +613,63 @@ async def open_request_queue(
storage_client=self._storage_client.get_suitable_storage_client(force_cloud=force_cloud),
)

@overload
async def push_data(self, data: dict | list[dict]) -> None: ...
@overload
async def push_data(self, data: dict | list[dict], charged_event_name: str) -> ChargeResult: ...
@_ensure_context
async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult | None:
async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult:
"""Store an object or a list of objects to the default dataset of the current Actor run.

Args:
data: The data to push to the default dataset.
charged_event_name: If provided and if the Actor uses the pay-per-event pricing model,
the method will attempt to charge for the event for each pushed item.
"""
if charged_event_name and charged_event_name.startswith('apify-'):
raise ValueError(f'Cannot charge for synthetic event "{charged_event_name}" manually')

charging_manager = self.get_charging_manager()

if not data:
return None
charged_event_name = charged_event_name or DEFAULT_DATASET_ITEM_EVENT
charge_limit_reached = charging_manager.is_event_charge_limit_reached(charged_event_name)

return ChargeResult(
event_charge_limit_reached=charge_limit_reached,
charged_count=0,
chargeable_within_limit=charging_manager.compute_chargeable(),
)

data = data if isinstance(data, list) else [data]

# No charging, just push the data without locking.
if charged_event_name is None:
dataset = await self.open_dataset()
await dataset.push_data(data)
return None
dataset = await self.open_dataset()

# If charging is requested, acquire the charge lock to prevent race conditions between concurrent
# Acquire the charge lock to prevent race conditions between concurrent
# push_data calls. We need to hold the lock for the entire push_data + charge sequence.
async with self._charge_lock:
max_charged_count = self.get_charging_manager().calculate_max_event_charge_count_within_limit(
charged_event_name
)

# Push as many items as we can charge for.
pushed_items_count = min(max_charged_count, len(data)) if max_charged_count is not None else len(data)
async with charging_manager.charge_lock():
# Synthetic events are handled within dataset.push_data, only get data for `ChargeResult`.
if charged_event_name is None:
before = charging_manager.get_charged_event_count(DEFAULT_DATASET_ITEM_EVENT)
await dataset.push_data(data)
after = charging_manager.get_charged_event_count(DEFAULT_DATASET_ITEM_EVENT)
return ChargeResult(
event_charge_limit_reached=charging_manager.is_event_charge_limit_reached(
DEFAULT_DATASET_ITEM_EVENT
),
charged_count=after - before,
chargeable_within_limit=charging_manager.compute_chargeable(),
)

dataset = await self.open_dataset()
pushed_items_count = charging_manager.compute_push_data_limit(
items_count=len(data),
event_name=charged_event_name,
is_default_dataset=True,
)

if pushed_items_count < len(data):
await dataset.push_data(data[:pushed_items_count])
elif pushed_items_count > 0:
await dataset.push_data(data)

return await self.get_charging_manager().charge(
# Only charge explicit events; synthetic events will be processed within the client.
return await charging_manager.charge(
event_name=charged_event_name,
count=pushed_items_count,
)
Expand Down Expand Up @@ -726,9 +733,9 @@ async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
event_name: Name of the event to be charged for.
count: Number of events to charge for.
"""
# Acquire lock to prevent race conditions with concurrent charge/push_data calls.
async with self._charge_lock:
return await self.get_charging_manager().charge(event_name, count)
# charging_manager.charge() acquires charge_lock internally.
charging_manager = self.get_charging_manager()
return await charging_manager.charge(event_name, count)

@overload
def on(
Expand Down
Loading
Loading