Skip to content

Commit c6e204a

Browse files
authored
Minor updates (#283)
Fixes #273 Fixes #282 Fixes #265 Fixes #266 Fixes #228
1 parent fbcba65 commit c6e204a

File tree

7 files changed

+120
-39
lines changed

7 files changed

+120
-39
lines changed

README.md

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,25 @@ The SDK is now ready for use. To build from source, see "Building" near the end
125125

126126
## Implementing a Workflow
127127

128-
Create the following script at `run_worker.py`:
128+
Create the following in `activities.py`:
129129

130130
```python
131-
import asyncio
132-
from datetime import datetime, timedelta
133-
from temporalio import workflow, activity
134-
from temporalio.client import Client
135-
from temporalio.worker import Worker
131+
from temporalio import activity
136132

137133
@activity.defn
138134
async def say_hello(name: str) -> str:
139135
return f"Hello, {name}!"
136+
```
137+
138+
Create the following in `workflows.py`:
139+
140+
```python
141+
from datetime import timedelta
142+
from temporalio import workflow
143+
144+
# Import our activity, passing it through the sandbox
145+
with workflow.unsafe.imports_passed_through():
146+
from .activities import say_hello
140147

141148
@workflow.defn
142149
class SayHello:
@@ -145,6 +152,18 @@ class SayHello:
145152
return await workflow.execute_activity(
146153
say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)
147154
)
155+
```
156+
157+
Create the following in `run_worker.py`:
158+
159+
```python
160+
import asyncio
161+
from temporalio.client import Client
162+
from temporalio.worker import Worker
163+
164+
# Import the activity and workflow from our other files
165+
from .activities import say_hello
166+
from .workflows import SayHello
148167

149168
async def main():
150169
# Create client connected to server at the given address
@@ -172,7 +191,7 @@ import asyncio
172191
from temporalio.client import Client
173192

174193
# Import the workflow from the previous code
175-
from run_worker import SayHello
194+
from .workflows import SayHello
176195

177196
async def main():
178197
# Create client connected to server at the given address
@@ -196,11 +215,16 @@ The output will be:
196215
Result: Hello, my-name!
197216

198217
## Next Steps
199-
Temporal can be implemented in your code in many different ways, to suit your application's needs. The links below will give you much more information about how Temporal works with Python:
200218

201-
* [Code Samples](https://github.com/temporalio/samples-python) - If you want to start with some code, we have provided some pre-built samples.
202-
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) Our Python specific Developer's Guide will give you much more information on how to build with Temporal in your Python applications than our SDK README ever could (or should).
203-
* [API Documentation](https://python.temporal.io) - Full Temporal Python SDK package documentation
219+
Temporal can be implemented in your code in many different ways, to suit your application's needs. The links below will
220+
give you much more information about how Temporal works with Python:
221+
222+
* [Code Samples](https://github.com/temporalio/samples-python) - If you want to start with some code, we have provided
223+
some pre-built samples.
224+
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) Our Python specific
225+
Developer's Guide will give you much more information on how to build with Temporal in your Python applications than
226+
our SDK README ever could (or should).
227+
* [API Documentation](https://python.temporal.io) - Full Temporal Python SDK package documentation.
204228

205229
---
206230

@@ -420,16 +444,12 @@ respectively. Here's an example of a workflow:
420444

421445
```python
422446
import asyncio
423-
from dataclasses import dataclass
424447
from datetime import timedelta
425-
from temporalio import activity, workflow
426-
from temporalio.client import Client
427-
from temporalio.worker import Worker
448+
from temporalio import workflow
428449

429-
@dataclass
430-
class GreetingInfo:
431-
salutation: str = "Hello"
432-
name: str = "<unknown>"
450+
# Pass the activities through the sandbox
451+
with workflow.unsafe.imports_passed_through():
452+
from .my_activities import GreetingInfo, create_greeting_activity
433453

434454
@workflow.defn
435455
class GreetingWorkflow:
@@ -477,16 +497,31 @@ class GreetingWorkflow:
477497
async def current_greeting(self) -> str:
478498
return self._current_greeting
479499

500+
```
501+
502+
This assumes there's an activity in `my_activities.py` like:
503+
504+
```python
505+
from dataclasses import dataclass
506+
from temporalio import workflow
507+
508+
@dataclass
509+
class GreetingInfo:
510+
salutation: str = "Hello"
511+
name: str = "<unknown>"
512+
480513
@activity.defn
481514
async def create_greeting_activity(info: GreetingInfo) -> str:
482515
return f"{info.salutation}, {info.name}!"
483516
```
484517

485-
Some things to note about the above code:
518+
Some things to note about the above workflow code:
486519

487-
* Workflows run in a sandbox by default. Users are encouraged to define workflows in files with no side effects or other
488-
complicated code or unnecessary imports to other third party libraries. See the [Workflow Sandbox](#workflow-sandbox)
489-
section for more details.
520+
* Workflows run in a sandbox by default.
521+
* Users are encouraged to define workflows in files with no side effects or other complicated code or unnecessary
522+
imports to other third party libraries.
523+
* Non-standard-library, non-`temporalio` imports should usually be "passed through" the sandbox. See the
524+
[Workflow Sandbox](#workflow-sandbox) section for more details.
490525
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
491526
a different signal
492527
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
@@ -791,7 +826,17 @@ early. Users are encouraged to define their workflows in files with no other sid
791826

792827
The sandbox offers a mechanism to pass through modules from outside the sandbox. By default this already includes all
793828
standard library modules and Temporal modules. **For performance and behavior reasons, users are encouraged to pass
794-
through all third party modules whose calls will be deterministic.** See "Passthrough Modules" below on how to do this.
829+
through all third party modules whose calls will be deterministic.** This includes modules containing the activities to
830+
be referenced in workflows. See "Passthrough Modules" below on how to do this.
831+
832+
If you are getting an error like:
833+
834+
> temporalio.worker.workflow_sandbox._restrictions.RestrictedWorkflowAccessError: Cannot access
835+
> http.client.IncompleteRead.\_\_mro_entries\_\_ from inside a workflow. If this is code from a module not used in a
836+
> workflow or known to only be used deterministically from a workflow, mark the import as pass through.
837+
838+
Then you are either using an invalid construct from the workflow, this is a known limitation of the sandbox, or most
839+
commonly this is from a module that is safe to pass through (see "Passthrough Modules" section below).
795840

796841
##### How the Sandbox Works
797842

@@ -1093,9 +1138,10 @@ occurs. Synchronous activities cannot call any of the `async` functions.
10931138

10941139
##### Heartbeating and Cancellation
10951140

1096-
In order for a non-local activity to be notified of cancellation requests, it must invoke
1097-
`temporalio.activity.heartbeat()`. It is strongly recommended that all but the fastest executing activities call this
1098-
function regularly. "Types of Activities" has specifics on cancellation for asynchronous and synchronous activities.
1141+
In order for a non-local activity to be notified of cancellation requests, it must be given a `heartbeat_timeout` at
1142+
invocation time and invoke `temporalio.activity.heartbeat()` inside the activity. It is strongly recommended that all
1143+
but the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellation
1144+
for asynchronous and synchronous activities.
10991145

11001146
In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server
11011147
for retrieval during activity retry. If an activity calls `temporalio.activity.heartbeat(123, 456)` and then fails and

temporalio/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1544,15 +1544,15 @@ async def heartbeat(
15441544

15451545
async def complete(
15461546
self,
1547-
result: Optional[Any] = None,
1547+
result: Optional[Any] = temporalio.common._arg_unset,
15481548
*,
15491549
rpc_metadata: Mapping[str, str] = {},
15501550
rpc_timeout: Optional[timedelta] = None,
15511551
) -> None:
15521552
"""Complete the activity.
15531553
15541554
Args:
1555-
result: Result of the activity.
1555+
result: Result of the activity if any.
15561556
rpc_metadata: Headers used on the RPC call. Keys here override
15571557
client-level RPC metadata keys.
15581558
rpc_timeout: Optional RPC deadline to set for the RPC call.
@@ -4421,7 +4421,7 @@ async def heartbeat_async_activity(
44214421
async def complete_async_activity(self, input: CompleteAsyncActivityInput) -> None:
44224422
result = (
44234423
None
4424-
if not input.result
4424+
if input.result is temporalio.common._arg_unset
44254425
else await self._client.data_converter.encode_wrapper([input.result])
44264426
)
44274427
if isinstance(input.id_or_token, AsyncActivityIDReference):

temporalio/worker/_activity.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ def __init__(
9292
if defn.name in self._activities:
9393
raise ValueError(f"More than one activity named {defn.name}")
9494

95+
# Do not allow classes, __call__ based activities must be instances
96+
if inspect.isclass(activity):
97+
raise TypeError(
98+
f"Activity named {defn.name} is a class instead of an instance"
99+
)
100+
95101
# Some extra requirements for sync functions
96102
if not defn.is_async:
97103
if not activity_executor:

temporalio/worker/_workflow_instance.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,7 +1460,7 @@ async def handle_signal(self, input: HandleSignalInput) -> None:
14601460
handler = self._instance.workflow_get_signal_handler(None)
14611461
dynamic = True
14621462
# Technically this is checked before the interceptor is invoked, but
1463-
# an # interceptor could have changed the name
1463+
# an interceptor could have changed the name
14641464
if not handler:
14651465
raise RuntimeError(
14661466
f"Signal handler for {input.signal} expected but not found"
@@ -1480,10 +1480,12 @@ async def handle_query(self, input: HandleQueryInput) -> Any:
14801480
handler = self._instance.workflow_get_query_handler(None)
14811481
dynamic = True
14821482
# Technically this is checked before the interceptor is invoked, but
1483-
# an # interceptor could have changed the name
1483+
# an interceptor could have changed the name
14841484
if not handler:
1485+
known_queries = sorted([k for k in self._instance._queries.keys() if k])
14851486
raise RuntimeError(
1486-
f"Query handler for '{input.query}' expected but not found"
1487+
f"Query handler for '{input.query}' expected but not found, "
1488+
f"known queries: [{' '.join(known_queries)}]"
14871489
)
14881490
# Put name first if dynamic
14891491
args = list(input.args) if not dynamic else [input.query] + list(input.args)

temporalio/worker/workflow_sandbox/_restrictions.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ class RestrictedWorkflowAccessError(temporalio.workflow.NondeterminismError):
5151

5252
def __init__(self, qualified_name: str) -> None:
5353
"""Create restricted workflow access error."""
54-
super().__init__(f"Cannot access {qualified_name} from inside a workflow.")
54+
super().__init__(
55+
f"Cannot access {qualified_name} from inside a workflow. "
56+
"If this is code from a module not used in a workflow or known to "
57+
"only be used deterministically from a workflow, mark the import "
58+
"as pass through."
59+
)
5560
self.qualified_name = qualified_name
5661

5762

tests/worker/test_activity.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -986,7 +986,7 @@ def __init__(self) -> None:
986986
self._info_set = asyncio.Event()
987987

988988
@activity.defn
989-
async def run(self) -> str:
989+
async def run(self) -> Optional[str]:
990990
self._info = activity.info()
991991
self._info_set.set()
992992
activity.raise_complete_async()
@@ -1012,15 +1012,24 @@ def async_handle(self, client: Client, use_task_token: bool) -> AsyncActivityHan
10121012
async def test_activity_async_success(
10131013
client: Client, worker: ExternalWorker, use_task_token: bool
10141014
):
1015-
wrapper = AsyncActivityWrapper()
10161015
# Start task, wait for info, complete with value, wait on workflow
1016+
wrapper = AsyncActivityWrapper()
10171017
task = asyncio.create_task(
10181018
_execute_workflow_with_activity(client, worker, wrapper.run)
10191019
)
10201020
await wrapper.wait_info()
10211021
await wrapper.async_handle(client, use_task_token).complete("some value")
10221022
assert "some value" == (await task).result
10231023

1024+
# Do again with a None value
1025+
wrapper = AsyncActivityWrapper()
1026+
task = asyncio.create_task(
1027+
_execute_workflow_with_activity(client, worker, wrapper.run)
1028+
)
1029+
await wrapper.wait_info()
1030+
await wrapper.async_handle(client, use_task_token).complete(None)
1031+
assert (await task).result is None
1032+
10241033

10251034
@pytest.mark.parametrize("use_task_token", [True, False])
10261035
async def test_activity_async_heartbeat_and_fail(

tests/worker/test_workflow.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,10 @@ def bad_signal(self) -> NoReturn:
352352
def bad_query(self) -> NoReturn:
353353
raise ApplicationError("query fail", 456)
354354

355+
@workflow.query
356+
def other_query(self) -> str:
357+
raise NotImplementedError
358+
355359

356360
async def test_workflow_signal_and_query_errors(client: Client):
357361
async with new_worker(client, SignalAndQueryErrorsWorkflow) as worker:
@@ -374,9 +378,9 @@ async def test_workflow_signal_and_query_errors(client: Client):
374378
# Unrecognized query
375379
with pytest.raises(WorkflowQueryFailedError) as rpc_err:
376380
await handle.query("non-existent query")
377-
assert (
378-
str(rpc_err.value)
379-
== "Query handler for 'non-existent query' expected but not found"
381+
assert str(rpc_err.value) == (
382+
"Query handler for 'non-existent query' expected but not found,"
383+
" known queries: [__stack_trace bad_query other_query]"
380384
)
381385

382386

@@ -2260,6 +2264,15 @@ async def test_workflow_activity_callable_class(client: Client):
22602264
assert result == MyDataClass(field1="in worker, workflow param")
22612265

22622266

2267+
async def test_workflow_activity_callable_class_bad_register(client: Client):
2268+
# Try to register the class instead of the instance
2269+
with pytest.raises(TypeError) as err:
2270+
new_worker(
2271+
client, ActivityCallableClassWorkflow, activities=[CallableClassActivity]
2272+
)
2273+
assert "is a class instead of an instance" in str(err.value)
2274+
2275+
22632276
class MethodActivity:
22642277
def __init__(self, orig_field1: str) -> None:
22652278
self.orig_field1 = orig_field1

0 commit comments

Comments
 (0)