Skip to content

Commit 7cced2e

Browse files
fix(app): guard against possible race conditions during enqueue
In #7724 we made a number of perf optimisations related to enqueuing. One of these optimisations included moving the enqueue logic - including expensive prep work and db writes - to a separate thread. At the same time manual DB locking was abandoned in favor of WAL mode. Finally, we set `check_same_thread=False` to allow multiple threads to access the connection at a given time. I think this may be the cause of #7950: - We start an enqueue in a thread (running in bg) - We dequeue - Dequeue pulls a partially-written queue item from DB and we get the errors in the linked issue To be honest, I don't understand enough about SQLite to confidently say that this kind of race condition is actually possible. But: - The error started popping up around the time we made this change. - I have reviewed the logic from enqueue to dequeue very carefully _many_ times over the past month or so, and I am confident that the error is only possible if we are getting unexpectedly `NULL` values from the DB. - The DB schema includes `NOT NULL` constraints for the column that is apparently returning `NULL`. - Therefore, without some kind of race condition or schema issue, the error should not be possible. - The `enqueue_batch` call is the only place I can find where we have the possibility of a race condition due to async logic. Everywhere else, all DB interaction for the queue is synchronous, as far as I can tell. This change retains the perf benefits by running the heavy enqueue prep logic in a separate thread, but moves back to the main thread for the DB write. It also uses an explicit transaction for the write. Will just have to wait and see if this fixes the issue.
1 parent 3b4d1b8 commit 7cced2e

File tree

1 file changed

+15
-18
lines changed

1 file changed

+15
-18
lines changed

invokeai/app/services/session_queue/session_queue_sqlite.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,7 @@ def _get_highest_priority(self, queue_id: str) -> int:
104104
return cast(Union[int, None], cursor.fetchone()[0]) or 0
105105

106106
async def enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueBatchResult:
107-
return await asyncio.to_thread(self._enqueue_batch, queue_id, batch, prepend)
108-
109-
def _enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueBatchResult:
110107
try:
111-
cursor = self._conn.cursor()
112108
# TODO: how does this work in a multi-user scenario?
113109
current_queue_size = self._get_current_queue_size(queue_id)
114110
max_queue_size = self.__invoker.services.configuration.max_queue_size
@@ -118,28 +114,29 @@ def _enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueB
118114
if prepend:
119115
priority = self._get_highest_priority(queue_id) + 1
120116

121-
requested_count = calc_session_count(batch)
122-
values_to_insert = prepare_values_to_insert(
117+
requested_count = await asyncio.to_thread(
118+
calc_session_count,
119+
batch=batch,
120+
)
121+
values_to_insert = await asyncio.to_thread(
122+
prepare_values_to_insert,
123123
queue_id=queue_id,
124124
batch=batch,
125125
priority=priority,
126126
max_new_queue_items=max_new_queue_items,
127127
)
128128
enqueued_count = len(values_to_insert)
129129

130-
if requested_count > enqueued_count:
131-
values_to_insert = values_to_insert[:max_new_queue_items]
132-
133-
cursor.executemany(
134-
"""--sql
135-
INSERT INTO session_queue (queue_id, session, session_id, batch_id, field_values, priority, workflow, origin, destination, retried_from_item_id)
136-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
137-
""",
138-
values_to_insert,
139-
)
140-
self._conn.commit()
130+
with self._conn:
131+
cursor = self._conn.cursor()
132+
cursor.executemany(
133+
"""--sql
134+
INSERT INTO session_queue (queue_id, session, session_id, batch_id, field_values, priority, workflow, origin, destination, retried_from_item_id)
135+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
136+
""",
137+
values_to_insert,
138+
)
141139
except Exception:
142-
self._conn.rollback()
143140
raise
144141
enqueue_result = EnqueueBatchResult(
145142
queue_id=queue_id,

0 commit comments

Comments
 (0)