Skip to content

Commit 78888c4

Browse files
authored
feat: Support pay-per-event via Actor.charge (#393)
- closes #374 - documentation will be added after apify/apify-sdk-js#356 is finished - the functionality should be used mainly via `Actor.charge` - directly accessing `ChargingManager` is needed only for advanced use cases - see [integration tests](https://github.com/apify/apify-sdk-python/pull/393/files#diff-c209584a944d571ec9bd7d62563827b91c80eed00e4c91170ea9e500d39c0477) for an illustration of how this works
1 parent b9a0cfd commit 78888c4

File tree

12 files changed

+954
-374
lines changed

12 files changed

+954
-374
lines changed

poetry.lock

Lines changed: 261 additions & 297 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ keywords = [
4343

4444
[tool.poetry.dependencies]
4545
python = "^3.9"
46-
apify-client = ">=1.9.1"
46+
apify-client = ">=1.9.2"
4747
apify-shared = ">=1.2.1"
4848
crawlee = "~0.5.1"
4949
cryptography = ">=42.0.0"

src/apify/_actor.py

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
EventSystemInfoData,
2525
)
2626

27+
from apify._charging import ChargeResult, ChargingManager, ChargingManagerImplementation
2728
from apify._configuration import Configuration
2829
from apify._consts import EVENT_LISTENERS_TIMEOUT
2930
from apify._crypto import decrypt_input_secrets, load_private_key
@@ -97,6 +98,8 @@ def __init__(
9798
)
9899
)
99100

101+
self._charging_manager = ChargingManagerImplementation(self._configuration, self._apify_client)
102+
100103
self._is_initialized = False
101104

102105
@ignore_docs
@@ -227,6 +230,10 @@ async def init(self) -> None:
227230
# https://github.com/apify/apify-sdk-python/issues/146
228231

229232
await self._event_manager.__aenter__()
233+
self.log.debug('Event manager initialized')
234+
235+
await self._charging_manager.__aenter__()
236+
self.log.debug('Charging manager initialized')
230237

231238
self._is_initialized = True
232239
_ActorType._is_any_instance_initialized = True
@@ -269,6 +276,7 @@ async def finalize() -> None:
269276
await self._event_manager.wait_for_all_listeners_to_complete(timeout=event_listeners_timeout)
270277

271278
await self._event_manager.__aexit__(None, None, None)
279+
await self._charging_manager.__aexit__(None, None, None)
272280

273281
await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds())
274282
self._is_initialized = False
@@ -452,19 +460,46 @@ async def open_request_queue(
452460
storage_client=storage_client,
453461
)
454462

455-
async def push_data(self, data: dict | list[dict]) -> None:
463+
@overload
464+
async def push_data(self, data: dict | list[dict]) -> None: ...
465+
@overload
466+
async def push_data(self, data: dict | list[dict], charged_event_name: str) -> ChargeResult: ...
467+
async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult | None:
456468
"""Store an object or a list of objects to the default dataset of the current Actor run.
457469
458470
Args:
459471
data: The data to push to the default dataset.
472+
charged_event_name: If provided and if the Actor uses the pay-per-event pricing model,
473+
the method will attempt to charge for the event for each pushed item.
460474
"""
461475
self._raise_if_not_initialized()
462476

463477
if not data:
464-
return
478+
return None
479+
480+
data = data if isinstance(data, list) else [data]
481+
482+
max_charged_count = (
483+
self._charging_manager.calculate_max_event_charge_count_within_limit(charged_event_name)
484+
if charged_event_name is not None
485+
else None
486+
)
465487

466488
dataset = await self.open_dataset()
467-
await dataset.push_data(data)
489+
490+
if max_charged_count is not None and len(data) > max_charged_count:
491+
# Push as many items as we can charge for
492+
await dataset.push_data(data[:max_charged_count])
493+
else:
494+
await dataset.push_data(data)
495+
496+
if charged_event_name:
497+
return await self._charging_manager.charge(
498+
event_name=charged_event_name,
499+
count=min(max_charged_count, len(data)) if max_charged_count is not None else len(data),
500+
)
501+
502+
return None
468503

469504
async def get_input(self) -> Any:
470505
"""Get the Actor input value from the default key-value store associated with the current Actor run."""
@@ -513,6 +548,23 @@ async def set_value(
513548
key_value_store = await self.open_key_value_store()
514549
return await key_value_store.set_value(key, value, content_type=content_type)
515550

551+
def get_charging_manager(self) -> ChargingManager:
552+
"""Retrieve the charging manager to access granular pricing information."""
553+
self._raise_if_not_initialized()
554+
return self._charging_manager
555+
556+
async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
557+
"""Charge for a specified number of events - sub-operations of the Actor.
558+
559+
This is relevant only for the pay-per-event pricing model.
560+
561+
Args:
562+
event_name: Name of the event to be charged for.
563+
count: Number of events to charge for.
564+
"""
565+
self._raise_if_not_initialized()
566+
return await self._charging_manager.charge(event_name, count)
567+
516568
@overload
517569
def on(
518570
self, event_name: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData]

0 commit comments

Comments
 (0)