Skip to content

feat: Support pay-per-event via Actor.charge #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
558 changes: 261 additions & 297 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ keywords = [

[tool.poetry.dependencies]
python = "^3.9"
apify-client = ">=1.9.1"
apify-client = ">=1.9.2"
apify-shared = ">=1.2.1"
crawlee = "~0.5.1"
cryptography = ">=42.0.0"
Expand Down
58 changes: 55 additions & 3 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
EventSystemInfoData,
)

from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation
from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
from apify._crypto import decrypt_input_secrets, load_private_key
Expand Down Expand Up @@ -97,6 +98,8 @@ def __init__(
)
)

self._charging_manager = ChargingManagerImplementation(self._configuration, self._apify_client)

self._is_initialized = False

@ignore_docs
Expand Down Expand Up @@ -227,6 +230,10 @@ async def init(self) -> None:
# https://github.com/apify/apify-sdk-python/issues/146

await self._event_manager.__aenter__()
self.log.debug('Event manager initialized')

await self._charging_manager.__aenter__()
self.log.debug('Charging manager initialized')

self._is_initialized = True
_ActorType._is_any_instance_initialized = True
Expand Down Expand Up @@ -269,6 +276,7 @@ async def finalize() -> None:
await self._event_manager.wait_for_all_listeners_to_complete(timeout=event_listeners_timeout)

await self._event_manager.__aexit__(None, None, None)
await self._charging_manager.__aexit__(None, None, None)

await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds())
self._is_initialized = False
Expand Down Expand Up @@ -452,19 +460,46 @@ async def open_request_queue(
storage_client=storage_client,
)

async def push_data(self, data: dict | list[dict]) -> None:
@overload
async def push_data(self, data: dict | list[dict]) -> None: ...
@overload
async def push_data(self, data: dict | list[dict], charged_event_name: str) -> ChargeResult: ...
async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult | None:
"""Store an object or a list of objects to the default dataset of the current Actor run.

Args:
data: The data to push to the default dataset.
charged_event_name: If provided and if the Actor uses the pay-per-event pricing model,
the method will attempt to charge for the event for each pushed item.
"""
self._raise_if_not_initialized()

if not data:
return
return None

data = data if isinstance(data, list) else [data]

max_charged_count = (
self._charging_manager.calculate_max_event_charge_count_within_limit(charged_event_name)
if charged_event_name is not None
else None
)

dataset = await self.open_dataset()
await dataset.push_data(data)

if max_charged_count is not None and len(data) > max_charged_count:
# Push as many items as we can charge for
await dataset.push_data(data[:max_charged_count])
else:
await dataset.push_data(data)

if charged_event_name:
return await self._charging_manager.charge(
event_name=charged_event_name,
count=min(max_charged_count, len(data)) if max_charged_count is not None else len(data),
)

return None

async def get_input(self) -> Any:
"""Get the Actor input value from the default key-value store associated with the current Actor run."""
Expand Down Expand Up @@ -513,6 +548,23 @@ async def set_value(
key_value_store = await self.open_key_value_store()
return await key_value_store.set_value(key, value, content_type=content_type)

def get_charging_manager(self) -> ChargingManager:
"""Retrieve the charging manager to access granular pricing information."""
self._raise_if_not_initialized()
return self._charging_manager

async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
"""Charge for a specified number of events - sub-operations of the Actor.

This is relevant only for the pay-per-event pricing model.

Args:
event_name: Name of the event to be charged for.
count: Number of events to charge for.
"""
self._raise_if_not_initialized()
return await self._charging_manager.charge(event_name, count)

@overload
def on(
self, event_name: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData]
Expand Down
Loading