|
40 | 40 | from dstack._internal.utils.logging import get_logger
|
41 | 41 |
|
42 | 42 | logger = get_logger(__name__)
|
43 |
| -RETRY_DELAY = datetime.timedelta(seconds=15) |
44 | 43 |
|
45 | 44 |
|
46 | 45 | async def process_runs(batch_size: int = 1):
|
@@ -130,11 +129,7 @@ async def _process_run(session: AsyncSession, run_model: RunModel):
|
130 | 129 | async def _process_pending_run(session: AsyncSession, run_model: RunModel):
|
131 | 130 | """Jobs are not created yet"""
|
132 | 131 | run = run_model_to_run(run_model)
|
133 |
| - if ( |
134 |
| - run.latest_job_submission is not None |
135 |
| - and common.get_current_datetime() - run.latest_job_submission.last_processed_at |
136 |
| - < RETRY_DELAY |
137 |
| - ): |
| 132 | + if not _pending_run_ready_for_resubmission(run_model, run): |
138 | 133 | logger.debug("%s: pending run is not yet ready for resubmission", fmt(run_model))
|
139 | 134 | return
|
140 | 135 |
|
@@ -183,6 +178,37 @@ async def _process_pending_run(session: AsyncSession, run_model: RunModel):
|
183 | 178 | logger.info("%s: run status has changed PENDING -> SUBMITTED", fmt(run_model))
|
184 | 179 |
|
185 | 180 |
|
| 181 | +def _pending_run_ready_for_resubmission(run_model: RunModel, run: Run) -> bool: |
| 182 | + if run.latest_job_submission is None: |
| 183 | + # Should not be possible |
| 184 | + return True |
| 185 | + duration_since_processing = ( |
| 186 | + common.get_current_datetime() - run.latest_job_submission.last_processed_at |
| 187 | + ) |
| 188 | + if duration_since_processing < _get_retry_delay(run_model.resubmission_attempt): |
| 189 | + return False |
| 190 | + return True |
| 191 | + |
| 192 | + |
| 193 | +# We use exponentially increasing retry delays for pending runs. |
| 194 | +# This prevents creation of too many job submissions for runs stuck in pending, |
| 195 | +# e.g. when users set retry for a long period without capacity. |
| 196 | +_PENDING_RETRY_DELAYS = [ |
| 197 | + datetime.timedelta(seconds=15), |
| 198 | + datetime.timedelta(seconds=30), |
| 199 | + datetime.timedelta(minutes=1), |
| 200 | + datetime.timedelta(minutes=2), |
| 201 | + datetime.timedelta(minutes=5), |
| 202 | + datetime.timedelta(minutes=10), |
| 203 | +] |
| 204 | + |
| 205 | + |
| 206 | +def _get_retry_delay(resubmission_attempt: int) -> datetime.timedelta: |
| 207 | + if resubmission_attempt - 1 < len(_PENDING_RETRY_DELAYS): |
| 208 | + return _PENDING_RETRY_DELAYS[resubmission_attempt - 1] |
| 209 | + return _PENDING_RETRY_DELAYS[-1] |
| 210 | + |
| 211 | + |
186 | 212 | async def _process_active_run(session: AsyncSession, run_model: RunModel):
|
187 | 213 | """
|
188 | 214 | Run is submitted, provisioning, or running.
|
@@ -341,6 +367,11 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel):
|
341 | 367 | )
|
342 | 368 | run_model.status = new_status
|
343 | 369 | run_model.termination_reason = termination_reason
|
| 370 | + # While a run goes to pending without provisioning, resubmission_attempt increases. |
| 371 | + if new_status == RunStatus.PROVISIONING: |
| 372 | + run_model.resubmission_attempt = 0 |
| 373 | + elif new_status == RunStatus.PENDING: |
| 374 | + run_model.resubmission_attempt += 1 |
344 | 375 |
|
345 | 376 |
|
346 | 377 | def _should_retry_job(run: Run, job: Job, job_model: JobModel) -> Optional[datetime.timedelta]:
|
|
0 commit comments