Skip to content

Feat: Adds type annotations #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Oct 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:

strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.8", "3.9", "3.10", "3.11"]

services:
redis:
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include setup.py
include README.md
include LICENSE
include saq/py.typed
recursive-include saq/static *
7 changes: 7 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[mypy]
show_error_codes = True
disallow_untyped_defs = True
check_untyped_defs = True
warn_redundant_casts = True
warn_unreachable = True
warn_unused_ignores = True
1 change: 1 addition & 0 deletions run_checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

python -m pylint saq/ tests/ setup.py
python -m black --check saq/ tests/ setup.py
python -m mypy saq/ tests/ setup.py
python -m unittest
7 changes: 7 additions & 0 deletions saq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,12 @@
from saq.queue import Queue
from saq.worker import Worker

__all__ = [
"CronJob",
"Job",
"Queue",
"Status",
"Worker",
]

__version__ = "0.9.0"
2 changes: 1 addition & 1 deletion saq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from saq.worker import check_health, start


def main():
def main() -> None:
parser = argparse.ArgumentParser(description="Start Simple Async Queue Worker")
parser.add_argument(
"settings",
Expand Down
125 changes: 68 additions & 57 deletions saq/job.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
from __future__ import annotations

import dataclasses
import enum
import typing
import typing as t

from saq.utils import now, seconds, uuid1, exponential_backoff
from saq.utils import exponential_backoff, now, seconds, uuid1

ABORT_ID_PREFIX = "saq:abort:"

if t.TYPE_CHECKING:
from saq.queue import Queue
from saq.types import DurationKind, Function


def get_default_job_key():
def get_default_job_key() -> str:
return uuid1()


Expand Down Expand Up @@ -37,13 +43,13 @@ class CronJob:
Remaining kwargs are pass through to Job
"""

function: typing.Callable
function: Function
cron: str
unique: bool = True
timeout: typing.Optional[int] = None
heartbeat: typing.Optional[int] = None
retries: typing.Optional[int] = None
ttl: typing.Optional[int] = None
timeout: int | None = None
heartbeat: int | None = None
retries: int | None = None
ttl: int | None = None


@dataclasses.dataclass
Expand All @@ -57,7 +63,7 @@ class Job:
queue: the saq.Queue object associated with the job
key: unique identifier of a job, defaults to uuid1, can be passed in to avoid duplicate jobs
timeout: the maximum amount of time a job can run for in seconds, defaults to 10 (0 means disabled)
heartbeat: the maximum amount of time a job can survive without a heartebat in seconds, defaults to 0 (disabled)
heartbeat: the maximum amount of time a job can survive without a heartbeat in seconds, defaults to 0 (disabled)
a heartbeat can be triggered manually within a job by calling await job.update()
retries: the maximum number of attempts to retry a job, defaults to 1
ttl: the maximum time in seconds to store information about a job including results, defaults to 600 (0 means indefinitely, -1 means disabled)
Expand All @@ -77,39 +83,39 @@ class Job:
started: job started time epoch seconds
touched: job touched/updated time epoch seconds
results: payload containing the results, this is the return of the function provided, must be serializable, defaults to json
error: stack trace if an runtime error occurs
status: Status Enum, defaulst to Status.New
error: stack trace if a runtime error occurs
status: Status Enum, default to Status.New
"""

function: str
kwargs: typing.Optional[dict] = None
queue: typing.Optional["Queue"] = None
kwargs: dict[str, t.Any] | None = None
queue: Queue | None = None
key: str = dataclasses.field(default_factory=get_default_job_key)
timeout: int = 10
heartbeat: int = 0
retries: int = 1
ttl: int = 600
retry_delay: float = 0.0
retry_backoff: typing.Union[bool, float] = False
retry_backoff: bool | float = False
scheduled: int = 0
progress: float = 0.0
attempts: int = 0
completed: int = 0
queued: int = 0
started: int = 0
touched: int = 0
result: typing.Any = None
error: typing.Optional[str] = None
result: t.Any = None
error: str | None = None
status: Status = Status.NEW
meta: dict = dataclasses.field(default_factory=dict)
meta: dict[t.Any, t.Any] = dataclasses.field(default_factory=dict)

def __repr__(self):
def __repr__(self) -> str:
kwargs = ", ".join(
f"{k}={v}"
for k, v in {
"function": self.function,
"kwargs": self.kwargs,
"queue": self.queue.name,
"queue": self.get_queue().name,
"id": self.id,
"scheduled": self.scheduled,
"progress": self.progress,
Expand All @@ -126,22 +132,22 @@ def __repr__(self):
)
return f"Job<{kwargs}>"

def __hash__(self):
def __hash__(self) -> int:
return hash(self.key)

@property
def id(self):
return self.queue.job_id(self.key)
def id(self) -> str:
return self.get_queue().job_id(self.key)

@classmethod
def key_from_id(cls, job_id):
def key_from_id(cls, job_id: str) -> str:
return job_id.split(":")[-1]

@property
def abort_id(self):
def abort_id(self) -> str:
return f"{ABORT_ID_PREFIX}{self.key}"

def to_dict(self):
def to_dict(self) -> dict[str, t.Any]:
result = {}
for field in dataclasses.fields(self):
key = field.name
Expand All @@ -155,7 +161,7 @@ def to_dict(self):
result[key] = value
return result

def duration(self, kind):
def duration(self, kind: DurationKind) -> int | None:
"""
Returns the duration of the job given kind.

Expand All @@ -172,23 +178,21 @@ def duration(self, kind):
return self._duration(now(), self.started)
raise ValueError(f"Unknown duration type: {kind}")

def _duration(self, a, b):
def _duration(self, a: int, b: int) -> int | None:
return a - b if a and b else None

@property
def stuck(self):
"""Checks if an active job is passed it's timeout or heartbeat."""
def stuck(self) -> bool:
"""Checks if an active job is passed its timeout or heartbeat."""
current = now()
return (self.status == Status.ACTIVE) and (
return (self.status == Status.ACTIVE) and bool(
seconds(current - self.started) > self.timeout
or (self.heartbeat and seconds(current - self.touched) > self.heartbeat)
)

def next_retry_delay(self):
if self.retry_backoff:
max_delay = self.retry_delay
if max_delay is True:
max_delay = None
def next_retry_delay(self) -> float:
if self.retry_backoff is not False:
max_delay = None if self.retry_backoff is True else self.retry_backoff
return exponential_backoff(
attempts=self.attempts,
base_delay=self.retry_delay,
Expand All @@ -197,48 +201,49 @@ def next_retry_delay(self):
)
return self.retry_delay

async def enqueue(self, queue=None):
async def enqueue(self, queue: Queue | None = None) -> None:
"""
Enqueues the job to it's queue or a provided one.

A job that already has a queue cannot be re-enqueued. Job uniqueness is determined by its id.
If a job has already been queued, it will update it's properties to match what is stored in the db.
If a job has already been queued, it will update its properties to match what is stored in the db.
"""
queue = queue or self.queue
assert queue, "Queue unspecified"
queue = queue or self.get_queue()
if not await queue.enqueue(self):
await self.refresh()

async def abort(self, error, ttl=5):
async def abort(self, error: str, ttl: int = 5) -> None:
"""Tries to abort the job."""
await self.queue.abort(self, error, ttl=ttl)
await self.get_queue().abort(self, error, ttl=ttl)

async def finish(self, status, *, result=None, error=None):
async def finish(
self, status: Status, *, result: t.Any = None, error: str | None = None
) -> None:
"""Finishes the job with a Job.Status, result, and or error."""
await self.queue.finish(self, status, result=result, error=error)
await self.get_queue().finish(self, status, result=result, error=error)

async def retry(self, error):
"""Retries the job by removing it from active and requeueing it."""
await self.queue.retry(self, error)
async def retry(self, error: str | None) -> None:
"""Retries the job by removing it from active and enqueueing it again."""
await self.get_queue().retry(self, error)

async def update(self, **kwargs):
async def update(self, **kwargs: t.Any) -> None:
"""
Updates the stored job in redis.

Set properties with passed in kwargs.
"""
for k, v in kwargs.items():
setattr(self, k, v)
await self.queue.update(self)
await self.get_queue().update(self)

async def refresh(self, until_complete=None):
async def refresh(self, until_complete: float | None = None) -> None:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this syntax legal in 3.7?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just about to write an explanation.

The from __future__ import annotations means that all annotations are strings at runtime, so no runtime errors irrespective of version, and type checkers are able to use the current syntax irrespective of the version.

I've been running the tests locally on 3.8 without issue and that syntax was introduced in 3.10. Similarly, the builtin subscription syntax, e.g., list[...] is 3.9+ but the future annotations mean we don't need to worry about the runtime errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, if there is an issue I'm missing here, or you'd prefer to stay syntactically valid with the min version irrespective of the future annotations, I'll be happy to roll them back. Just that the current syntax is a lot nicer than what was available in 3.7.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mypy will automatically add an Optional type when the default value is None? I could have that wrong, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, but reliance on it is discouraged: python/peps#689 and python/mypy#9091

"""
Refresh the current job with the latest data from the db.

until_complete: None or Numeric seconds. if None (default), don't wait,
else wait seconds until the job is complete or the interval has been reached. 0 means wait forever
"""
job = await self.queue.job(self.key)
job = await self.get_queue().job(self.key)

if not job:
raise RuntimeError(f"{self} doesn't exist")
Expand All @@ -247,14 +252,20 @@ async def refresh(self, until_complete=None):

if until_complete is not None and not self.completed:

async def callback(_id, status):
if status in TERMINAL_STATUSES:
return True
async def callback(_id: str, status: Status) -> bool:
return status in TERMINAL_STATUSES

await self.queue.listen([self.key], callback, until_complete)
await self.get_queue().listen([self.key], callback, until_complete)
await self.refresh()

def replace(self, job):
def replace(self, job: Job) -> None:
"""Replace current attributes with job attributes."""
for field in job.__dataclass_fields__:
setattr(self, field, getattr(job, field))
for field in dataclasses.fields(job):
setattr(self, field.name, getattr(job, field.name))

def get_queue(self) -> Queue:
if self.queue is None:
raise TypeError(
"`Job` must be associated with a `Queue` before this operation can proceed"
)
return self.queue
Empty file added saq/py.typed
Empty file.
Loading