Skip to content

Use exponentially increasing retry delays for pending runs #2519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions src/dstack/_internal/server/background/tasks/process_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)
RETRY_DELAY = datetime.timedelta(seconds=15)


async def process_runs(batch_size: int = 1):
Expand Down Expand Up @@ -130,11 +129,7 @@ async def _process_run(session: AsyncSession, run_model: RunModel):
async def _process_pending_run(session: AsyncSession, run_model: RunModel):
"""Jobs are not created yet"""
run = run_model_to_run(run_model)
if (
run.latest_job_submission is not None
and common.get_current_datetime() - run.latest_job_submission.last_processed_at
< RETRY_DELAY
):
if not _pending_run_ready_for_resubmission(run_model, run):
logger.debug("%s: pending run is not yet ready for resubmission", fmt(run_model))
return

Expand Down Expand Up @@ -183,6 +178,37 @@ async def _process_pending_run(session: AsyncSession, run_model: RunModel):
logger.info("%s: run status has changed PENDING -> SUBMITTED", fmt(run_model))


def _pending_run_ready_for_resubmission(run_model: RunModel, run: Run) -> bool:
if run.latest_job_submission is None:
# Should not be possible
return True
duration_since_processing = (
common.get_current_datetime() - run.latest_job_submission.last_processed_at
)
if duration_since_processing < _get_retry_delay(run_model.resubmission_attempt):
return False
return True


# We use exponentially increasing retry delays for pending runs.
# This prevents creation of too many job submissions for runs stuck in pending,
# e.g. when users set retry for a long period without capacity.
_PENDING_RETRY_DELAYS = [
datetime.timedelta(seconds=15),
datetime.timedelta(seconds=30),
datetime.timedelta(minutes=1),
datetime.timedelta(minutes=2),
datetime.timedelta(minutes=5),
datetime.timedelta(minutes=10),
]


def _get_retry_delay(resubmission_attempt: int) -> datetime.timedelta:
if resubmission_attempt - 1 < len(_PENDING_RETRY_DELAYS):
return _PENDING_RETRY_DELAYS[resubmission_attempt - 1]
return _PENDING_RETRY_DELAYS[-1]


async def _process_active_run(session: AsyncSession, run_model: RunModel):
"""
Run is submitted, provisioning, or running.
Expand Down Expand Up @@ -341,6 +367,11 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel):
)
run_model.status = new_status
run_model.termination_reason = termination_reason
# While a run goes to pending without provisioning, resubmission_attempt increases.
if new_status == RunStatus.PROVISIONING:
run_model.resubmission_attempt = 0
elif new_status == RunStatus.PENDING:
run_model.resubmission_attempt += 1


def _should_retry_job(run: Run, job: Job, job_model: JobModel) -> Optional[datetime.timedelta]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Add RunModel.resubmission_attempt

Revision ID: 7ba3b59d7ca6
Revises: 7bc2586e8b9e
Create Date: 2025-04-15 18:00:35.320906

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "7ba3b59d7ca6"
down_revision = "7bc2586e8b9e"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("runs", schema=None) as batch_op:
batch_op.add_column(sa.Column("resubmission_attempt", sa.Integer(), nullable=True))
batch_op.execute("UPDATE runs SET resubmission_attempt = 0")
with op.batch_alter_table("runs", schema=None) as batch_op:
batch_op.alter_column("resubmission_attempt", nullable=False)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("runs", schema=None) as batch_op:
batch_op.drop_column("resubmission_attempt")

# ### end Alembic commands ###
3 changes: 3 additions & 0 deletions src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ class RunModel(BaseModel):
termination_reason: Mapped[Optional[RunTerminationReason]] = mapped_column(
Enum(RunTerminationReason)
)
# resubmission_attempt counts consecutive transitions to pending without provisioning.
# Can be used to choose retry delay depending on the attempt number.
resubmission_attempt: Mapped[int] = mapped_column(Integer, default=0)
run_spec: Mapped[str] = mapped_column(Text)
service_spec: Mapped[Optional[str]] = mapped_column(Text)

Expand Down
Loading