Skip to content

Use minimal scope with pytest.raises #582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 61 additions & 57 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,37 +1113,37 @@ async def cancel_child(self) -> None:
@pytest.mark.parametrize("use_execute", [True, False])
async def test_workflow_cancel_child_started(client: Client, use_execute: bool):
async with new_worker(client, CancelChildWorkflow, LongSleepWorkflow) as worker:
with pytest.raises(WorkflowFailureError) as err:
# Start workflow
handle = await client.start_workflow(
CancelChildWorkflow.run,
use_execute,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Start workflow
handle = await client.start_workflow(
CancelChildWorkflow.run,
use_execute,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

# Wait until child started
async def child_started() -> bool:
try:
return await handle.query(
CancelChildWorkflow.ready
) and await client.get_workflow_handle_for(
LongSleepWorkflow.run, # type: ignore[arg-type]
workflow_id=f"{handle.id}_child",
).query(LongSleepWorkflow.started)
except RPCError as err:
# Ignore not-found or failed precondition because child may
# not have started yet
if (
err.status == RPCStatusCode.NOT_FOUND
or err.status == RPCStatusCode.FAILED_PRECONDITION
):
return False
raise
# Wait until child started
async def child_started() -> bool:
try:
return await handle.query(
CancelChildWorkflow.ready
) and await client.get_workflow_handle_for(
LongSleepWorkflow.run, # type: ignore[arg-type]
workflow_id=f"{handle.id}_child",
).query(LongSleepWorkflow.started)
except RPCError as err:
# Ignore not-found or failed precondition because child may
# not have started yet
if (
err.status == RPCStatusCode.NOT_FOUND
or err.status == RPCStatusCode.FAILED_PRECONDITION
):
return False
raise

await assert_eq_eventually(True, child_started)
# Send cancel signal and wait on the handle
await handle.signal(CancelChildWorkflow.cancel_child)
await assert_eq_eventually(True, child_started)
# Send cancel signal and wait on the handle
await handle.signal(CancelChildWorkflow.cancel_child)
with pytest.raises(WorkflowFailureError) as err:
await handle.result()
assert isinstance(err.value.cause, ChildWorkflowError)
assert isinstance(err.value.cause.cause, CancelledError)
Expand Down Expand Up @@ -2374,17 +2374,17 @@ async def test_workflow_already_started(client: Client, env: WorkflowEnvironment
async with new_worker(client, LongSleepWorkflow) as worker:
id = f"workflow-{uuid.uuid4()}"
# Try to start it twice
await client.start_workflow(
LongSleepWorkflow.run,
id=id,
task_queue=worker.task_queue,
)
with pytest.raises(WorkflowAlreadyStartedError):
await client.start_workflow(
LongSleepWorkflow.run,
id=id,
task_queue=worker.task_queue,
)
await client.start_workflow(
LongSleepWorkflow.run,
id=id,
task_queue=worker.task_queue,
)


@workflow.defn
Expand Down Expand Up @@ -3263,15 +3263,15 @@ async def test_workflow_custom_failure_converter(client: Client):
client = Client(**config)

# Run workflow and confirm error
with pytest.raises(WorkflowFailureError) as err:
async with new_worker(
client, CustomErrorWorkflow, activities=[custom_error_activity]
) as worker:
handle = await client.start_workflow(
CustomErrorWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
async with new_worker(
client, CustomErrorWorkflow, activities=[custom_error_activity]
) as worker:
handle = await client.start_workflow(
CustomErrorWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
with pytest.raises(WorkflowFailureError) as err:
await handle.result()

# Check error is as expected
Expand Down Expand Up @@ -4606,13 +4606,13 @@ async def test_workflow_timeout_support(client: Client, approach: str):
client, TimeoutSupportWorkflow, activities=[wait_cancel]
) as worker:
# Run and confirm activity gets cancelled
handle = await client.start_workflow(
TimeoutSupportWorkflow.run,
approach,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
with pytest.raises(WorkflowFailureError) as err:
handle = await client.start_workflow(
TimeoutSupportWorkflow.run,
approach,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
await handle.result()
assert isinstance(err.value.cause, ActivityError)
assert isinstance(err.value.cause.cause, CancelledError)
Expand Down Expand Up @@ -4946,8 +4946,8 @@ async def run(self, param: str) -> None:


async def test_workflow_fail_on_bad_input(client: Client):
with pytest.raises(WorkflowFailureError) as err:
async with new_worker(client, FailOnBadInputWorkflow) as worker:
async with new_worker(client, FailOnBadInputWorkflow) as worker:
with pytest.raises(WorkflowFailureError) as err:
await client.execute_workflow(
"FailOnBadInputWorkflow",
123,
Expand Down Expand Up @@ -5484,15 +5484,19 @@ async def _run_workflow_and_get_warning(self) -> bool:
assert update_task
with pytest.raises(RPCError) as update_err:
await update_task
assert (
update_err.value.status == RPCStatusCode.NOT_FOUND
and "workflow execution already completed"
in str(update_err.value).lower()
)
assert update_err.value.status == RPCStatusCode.NOT_FOUND and (
str(update_err.value).lower()
== "workflow execution already completed"
)

with pytest.raises(WorkflowFailureError) as err:
await handle.result()
assert "workflow execution failed" in str(err.value).lower()
assert isinstance(
err.value.cause,
{"cancellation": CancelledError, "failure": ApplicationError}[
self.workflow_termination_type
],
)

unfinished_handler_warning_emitted = any(
issubclass(w.category, self._unfinished_handler_warning_cls)
Expand Down
Loading