Skip to content

Ensure extra data on task fail logs #502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ def activate(
logger.warning(
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
exc_info=activation_err,
extra={"temporal_workflow": self._info._logger_details()},
)
# Set completion failure
self._current_completion.failed.failure.SetInParent()
Expand Down
121 changes: 98 additions & 23 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import typing
import uuid
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import (
Expand All @@ -30,6 +31,7 @@
from google.protobuf.timestamp_pb2 import Timestamp
from typing_extensions import Protocol, runtime_checkable

import temporalio.worker
from temporalio import activity, workflow
from temporalio.api.common.v1 import Payload, Payloads, WorkflowExecution
from temporalio.api.enums.v1 import EventType
Expand Down Expand Up @@ -1876,22 +1878,37 @@ def last_signal(self) -> str:
return self._last_signal


async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
# Use queue to capture log statements
log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
handler = logging.handlers.QueueHandler(log_queue)
workflow.logger.base_logger.addHandler(handler)
prev_level = workflow.logger.base_logger.level
workflow.logger.base_logger.setLevel(logging.INFO)
workflow.logger.full_workflow_info_on_extra = True
class LogCapturer:
def __init__(self) -> None:
self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue()

def find_log(starts_with: str) -> Optional[logging.LogRecord]:
for record in cast(List[logging.LogRecord], log_queue.queue):
@contextmanager
def logs_captured(self, *loggers: logging.Logger):
handler = logging.handlers.QueueHandler(self.log_queue)

prev_levels = [l.level for l in loggers]
for l in loggers:
l.setLevel(logging.INFO)
l.addHandler(handler)
try:
yield self
finally:
for i, l in enumerate(loggers):
l.removeHandler(handler)
l.setLevel(prev_levels[i])

def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
for record in cast(List[logging.LogRecord], self.log_queue.queue):
if record.message.startswith(starts_with):
return record
return None

try:

async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
workflow.logger.full_workflow_info_on_extra = True
with LogCapturer().logs_captured(
workflow.logger.base_logger, activity.logger.base_logger
) as capturer:
# Log two signals and kill worker before completing. Need to disable
# workflow cache since we restart the worker and don't want to pay the
# sticky queue penalty.
Expand All @@ -1909,11 +1926,11 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)

# Confirm two logs happened
assert find_log("Signal: signal 1 ({'attempt':")
assert find_log("Signal: signal 2")
assert not find_log("Signal: signal 3")
assert capturer.find_log("Signal: signal 1 ({'attempt':")
assert capturer.find_log("Signal: signal 2")
assert not capturer.find_log("Signal: signal 3")
# Also make sure it has some workflow info and correct funcName
record = find_log("Signal: signal 1")
record = capturer.find_log("Signal: signal 1")
assert (
record
and record.__dict__["temporal_workflow"]["workflow_type"]
Expand All @@ -1924,7 +1941,7 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
assert isinstance(record.__dict__["workflow_info"], workflow.Info)

# Clear queue and start a new one with more signals
log_queue.queue.clear()
capturer.log_queue.queue.clear()
async with new_worker(
client,
LoggingWorkflow,
Expand All @@ -1937,13 +1954,71 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
await handle.result()

# Confirm replayed logs are not present but new ones are
assert not find_log("Signal: signal 1")
assert not find_log("Signal: signal 2")
assert find_log("Signal: signal 3")
assert find_log("Signal: finish")
finally:
workflow.logger.base_logger.removeHandler(handler)
workflow.logger.base_logger.setLevel(prev_level)
assert not capturer.find_log("Signal: signal 1")
assert not capturer.find_log("Signal: signal 2")
assert capturer.find_log("Signal: signal 3")
assert capturer.find_log("Signal: finish")


@activity.defn
async def task_fail_once_activity() -> None:
if activity.info().attempt == 1:
raise RuntimeError("Intentional activity task failure")


task_fail_once_workflow_has_failed = False


@workflow.defn(sandboxed=False)
class TaskFailOnceWorkflow:
@workflow.run
async def run(self) -> None:
# Fail on first attempt
global task_fail_once_workflow_has_failed
if not task_fail_once_workflow_has_failed:
task_fail_once_workflow_has_failed = True
raise RuntimeError("Intentional workflow task failure")

# Execute activity that will fail once
await workflow.execute_activity(
task_fail_once_activity,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(milliseconds=1),
backoff_coefficient=1.0,
maximum_attempts=2,
),
)


async def test_workflow_logging_task_fail(client: Client):
with LogCapturer().logs_captured(
activity.logger.base_logger, temporalio.worker._workflow_instance.logger
) as capturer:
async with new_worker(
client, TaskFailOnceWorkflow, activities=[task_fail_once_activity]
) as worker:
await client.execute_workflow(
TaskFailOnceWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

wf_task_record = capturer.find_log("Failed activation on workflow")
assert wf_task_record
assert "Intentional workflow task failure" in wf_task_record.message
assert (
getattr(wf_task_record, "temporal_workflow")["workflow_type"]
== "TaskFailOnceWorkflow"
)

act_task_record = capturer.find_log("Completing activity as failed")
assert act_task_record
assert "Intentional activity task failure" in act_task_record.message
assert (
getattr(act_task_record, "temporal_activity")["activity_type"]
== "task_fail_once_activity"
)


@workflow.defn
Expand Down