diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index be8bb610e..910166376 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -40,7 +40,6 @@ from dstack._internal.utils.logging import get_logger logger = get_logger(__name__) -RETRY_DELAY = datetime.timedelta(seconds=15) async def process_runs(batch_size: int = 1): @@ -130,11 +129,7 @@ async def _process_run(session: AsyncSession, run_model: RunModel): async def _process_pending_run(session: AsyncSession, run_model: RunModel): """Jobs are not created yet""" run = run_model_to_run(run_model) - if ( - run.latest_job_submission is not None - and common.get_current_datetime() - run.latest_job_submission.last_processed_at - < RETRY_DELAY - ): + if not _pending_run_ready_for_resubmission(run_model, run): logger.debug("%s: pending run is not yet ready for resubmission", fmt(run_model)) return @@ -183,6 +178,37 @@ async def _process_pending_run(session: AsyncSession, run_model: RunModel): logger.info("%s: run status has changed PENDING -> SUBMITTED", fmt(run_model)) +def _pending_run_ready_for_resubmission(run_model: RunModel, run: Run) -> bool: + if run.latest_job_submission is None: + # Should not be possible + return True + duration_since_processing = ( + common.get_current_datetime() - run.latest_job_submission.last_processed_at + ) + if duration_since_processing < _get_retry_delay(run_model.resubmission_attempt): + return False + return True + + +# We use exponentially increasing retry delays for pending runs. +# This prevents creation of too many job submissions for runs stuck in pending, +# e.g. when users set retry for a long period without capacity. +_PENDING_RETRY_DELAYS = [ + datetime.timedelta(seconds=15), + datetime.timedelta(seconds=30), + datetime.timedelta(minutes=1), + datetime.timedelta(minutes=2), + datetime.timedelta(minutes=5), + datetime.timedelta(minutes=10), +] + + +def _get_retry_delay(resubmission_attempt: int) -> datetime.timedelta: + if resubmission_attempt - 1 < len(_PENDING_RETRY_DELAYS): + return _PENDING_RETRY_DELAYS[resubmission_attempt - 1] + return _PENDING_RETRY_DELAYS[-1] + + async def _process_active_run(session: AsyncSession, run_model: RunModel): """ Run is submitted, provisioning, or running. @@ -341,6 +367,11 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): ) run_model.status = new_status run_model.termination_reason = termination_reason + # While a run goes to pending without provisioning, resubmission_attempt increases. + if new_status == RunStatus.PROVISIONING: + run_model.resubmission_attempt = 0 + elif new_status == RunStatus.PENDING: + run_model.resubmission_attempt += 1 def _should_retry_job(run: Run, job: Job, job_model: JobModel) -> Optional[datetime.timedelta]: diff --git a/src/dstack/_internal/server/migrations/versions/7ba3b59d7ca6_add_runmodel_resubmission_attempt.py b/src/dstack/_internal/server/migrations/versions/7ba3b59d7ca6_add_runmodel_resubmission_attempt.py new file mode 100644 index 000000000..ddb7a30ed --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/7ba3b59d7ca6_add_runmodel_resubmission_attempt.py @@ -0,0 +1,35 @@ +"""Add RunModel.resubmission_attempt + +Revision ID: 7ba3b59d7ca6 +Revises: 7bc2586e8b9e +Create Date: 2025-04-15 18:00:35.320906 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "7ba3b59d7ca6" +down_revision = "7bc2586e8b9e" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("runs", schema=None) as batch_op: + batch_op.add_column(sa.Column("resubmission_attempt", sa.Integer(), nullable=True)) + batch_op.execute("UPDATE runs SET resubmission_attempt = 0") + with op.batch_alter_table("runs", schema=None) as batch_op: + batch_op.alter_column("resubmission_attempt", nullable=False) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("runs", schema=None) as batch_op: + batch_op.drop_column("resubmission_attempt") + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 66b6fc910..b2fa65500 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -343,6 +343,9 @@ class RunModel(BaseModel): termination_reason: Mapped[Optional[RunTerminationReason]] = mapped_column( Enum(RunTerminationReason) ) + # resubmission_attempt counts consecutive transitions to pending without provisioning. + # Can be used to choose retry delay depending on the attempt number. + resubmission_attempt: Mapped[int] = mapped_column(Integer, default=0) run_spec: Mapped[str] = mapped_column(Text) service_spec: Mapped[Optional[str]] = mapped_column(Text)