-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
gh-113884: Make queue.SimpleQueue thread-safe in --disable-gil builds #114161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
bc7a640
Use ParkingLot to manage waiting threads
mpage 65c0b90
Make SimpleQueue thread-safe with the GIL disabled
mpage d999dd6
📜🤖 Added by blurb_it.
blurb-it[bot] 511f59f
Update Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issu…
erlend-aasland ad5217a
Remove unnecessary overflow check
mpage d3b5547
Use a clearer name for field indicating whether or not threads are wa…
mpage 6fef770
Fix type
mpage 98dfcf6
Apply suggestions from code review
erlend-aasland 60fd0ec
Fix suggestion
erlend-aasland 80200f8
Address review comments
mpage File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
1 change: 1 addition & 0 deletions
1
Misc/NEWS.d/next/Core and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Make :class:`queue.SimpleQueue` thread safe when the GIL is disabled. Existing semantics and behavior should be preserved, but the implementation is changed to ensure thread-safety when the GIL is disabled. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,9 @@ | |
#endif | ||
|
||
#include "Python.h" | ||
#include "pycore_ceval.h" // _PyEval_MakePendingCalls() | ||
#include "pycore_ceval.h" // Py_MakePendingCalls() | ||
#include "pycore_moduleobject.h" // _PyModule_GetState() | ||
#include "pycore_parking_lot.h" | ||
#include "pycore_time.h" // _PyTime_t | ||
|
||
#include <stdbool.h> | ||
|
@@ -151,7 +152,9 @@ RingBuf_Get(RingBuf *buf) | |
return item; | ||
} | ||
|
||
// Returns 0 on success or -1 if the buffer failed to grow | ||
// Returns 0 on success or -1 if the buffer failed to grow. | ||
// | ||
// Steals a reference to item. | ||
static int | ||
RingBuf_Put(RingBuf *buf, PyObject *item) | ||
{ | ||
|
@@ -164,7 +167,7 @@ RingBuf_Put(RingBuf *buf, PyObject *item) | |
return -1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to decref item here isn't it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think so |
||
} | ||
} | ||
buf->items[buf->put_idx] = Py_NewRef(item); | ||
buf->items[buf->put_idx] = item; | ||
buf->put_idx = (buf->put_idx + 1) % buf->items_cap; | ||
buf->num_items++; | ||
return 0; | ||
|
@@ -184,9 +187,13 @@ RingBuf_IsEmpty(RingBuf *buf) | |
|
||
typedef struct { | ||
PyObject_HEAD | ||
PyThread_type_lock lock; | ||
int locked; | ||
|
||
// Are there threads waiting for items | ||
bool has_threads_waiting; | ||
|
||
// Items in the queue | ||
RingBuf buf; | ||
|
||
PyObject *weakreflist; | ||
} simplequeueobject; | ||
|
||
|
@@ -209,12 +216,6 @@ simplequeue_dealloc(simplequeueobject *self) | |
PyTypeObject *tp = Py_TYPE(self); | ||
|
||
PyObject_GC_UnTrack(self); | ||
if (self->lock != NULL) { | ||
/* Unlock the lock so it's safe to free it */ | ||
if (self->locked > 0) | ||
PyThread_release_lock(self->lock); | ||
PyThread_free_lock(self->lock); | ||
} | ||
(void)simplequeue_clear(self); | ||
if (self->weakreflist != NULL) | ||
PyObject_ClearWeakRefs((PyObject *) self); | ||
|
@@ -249,12 +250,6 @@ simplequeue_new_impl(PyTypeObject *type) | |
self = (simplequeueobject *) type->tp_alloc(type, 0); | ||
if (self != NULL) { | ||
self->weakreflist = NULL; | ||
self->lock = PyThread_allocate_lock(); | ||
if (self->lock == NULL) { | ||
Py_DECREF(self); | ||
PyErr_SetString(PyExc_MemoryError, "can't allocate lock"); | ||
return NULL; | ||
} | ||
if (RingBuf_Init(&self->buf) < 0) { | ||
Py_DECREF(self); | ||
return NULL; | ||
|
@@ -263,8 +258,29 @@ simplequeue_new_impl(PyTypeObject *type) | |
|
||
return (PyObject *) self; | ||
} | ||
typedef struct { | ||
erlend-aasland marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int handed_off; | ||
simplequeueobject *queue; | ||
PyObject *item; | ||
} HandoffData; | ||
|
||
static void | ||
maybe_unparked_thread(HandoffData *data, PyObject **item, int has_more_waiters) | ||
{ | ||
if (item == NULL) { | ||
// Didn't unpark a thread | ||
data->handed_off = 0; | ||
} | ||
else { | ||
// Successfully unparked a thread | ||
*item = data->item; | ||
data->handed_off = 1; | ||
} | ||
data->queue->has_threads_waiting = has_more_waiters; | ||
} | ||
mpage marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/*[clinic input] | ||
@critical_section | ||
_queue.SimpleQueue.put | ||
item: object | ||
block: bool = True | ||
|
@@ -280,21 +296,28 @@ never blocks. They are provided for compatibility with the Queue class. | |
static PyObject * | ||
_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, | ||
int block, PyObject *timeout) | ||
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/ | ||
/*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/ | ||
{ | ||
/* BEGIN GIL-protected critical section */ | ||
if (RingBuf_Put(&self->buf, item) < 0) | ||
return NULL; | ||
if (self->locked) { | ||
/* A get() may be waiting, wake it up */ | ||
self->locked = 0; | ||
PyThread_release_lock(self->lock); | ||
HandoffData data = { | ||
.handed_off = 0, | ||
.item = Py_NewRef(item), | ||
.queue = self, | ||
}; | ||
if (self->has_threads_waiting) { | ||
// Try to hand the item off directly if there are threads waiting | ||
_PyParkingLot_Unpark(&self->has_threads_waiting, | ||
(_Py_unpark_fn_t *)maybe_unparked_thread, &data); | ||
} | ||
if (!data.handed_off) { | ||
if (RingBuf_Put(&self->buf, item) < 0) { | ||
return NULL; | ||
} | ||
} | ||
/* END GIL-protected critical section */ | ||
Py_RETURN_NONE; | ||
} | ||
|
||
/*[clinic input] | ||
@critical_section | ||
_queue.SimpleQueue.put_nowait | ||
item: object | ||
|
||
|
@@ -307,12 +330,22 @@ for compatibility with the Queue class. | |
|
||
static PyObject * | ||
_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item) | ||
/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/ | ||
/*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/ | ||
{ | ||
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None); | ||
} | ||
|
||
static PyObject * | ||
empty_error(PyTypeObject *cls) | ||
{ | ||
PyObject *module = PyType_GetModule(cls); | ||
erlend-aasland marked this conversation as resolved.
Show resolved
Hide resolved
|
||
simplequeue_state *state = simplequeue_get_state(module); | ||
PyErr_SetNone(state->EmptyError); | ||
return NULL; | ||
} | ||
|
||
/*[clinic input] | ||
@critical_section | ||
_queue.SimpleQueue.get | ||
|
||
cls: defining_class | ||
|
@@ -335,23 +368,15 @@ in that case). | |
static PyObject * | ||
_queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, | ||
int block, PyObject *timeout_obj) | ||
/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/ | ||
/*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/ | ||
{ | ||
_PyTime_t endtime = 0; | ||
_PyTime_t timeout; | ||
PyObject *item; | ||
PyLockStatus r; | ||
PY_TIMEOUT_T microseconds; | ||
PyThreadState *tstate = PyThreadState_Get(); | ||
|
||
// XXX Use PyThread_ParseTimeoutArg(). | ||
|
||
if (block == 0) { | ||
/* Non-blocking */ | ||
microseconds = 0; | ||
} | ||
else if (timeout_obj != Py_None) { | ||
if (block != 0 && timeout_obj != Py_None) { | ||
erlend-aasland marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/* With timeout */ | ||
_PyTime_t timeout; | ||
if (_PyTime_FromSecondsObject(&timeout, | ||
timeout_obj, _PyTime_ROUND_CEILING) < 0) { | ||
return NULL; | ||
|
@@ -361,65 +386,64 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, | |
"'timeout' must be a non-negative number"); | ||
return NULL; | ||
} | ||
microseconds = _PyTime_AsMicroseconds(timeout, | ||
_PyTime_ROUND_CEILING); | ||
if (microseconds > PY_TIMEOUT_MAX) { | ||
PyErr_SetString(PyExc_OverflowError, | ||
"timeout value is too large"); | ||
return NULL; | ||
} | ||
endtime = _PyDeadline_Init(timeout); | ||
} | ||
else { | ||
/* Infinitely blocking */ | ||
microseconds = -1; | ||
} | ||
|
||
/* put() signals the queue to be non-empty by releasing the lock. | ||
* So we simply try to acquire the lock in a loop, until the condition | ||
* (queue non-empty) becomes true. | ||
*/ | ||
while (RingBuf_IsEmpty(&self->buf)) { | ||
/* First a simple non-blocking try without releasing the GIL */ | ||
r = PyThread_acquire_lock_timed(self->lock, 0, 0); | ||
if (r == PY_LOCK_FAILURE && microseconds != 0) { | ||
Py_BEGIN_ALLOW_THREADS | ||
r = PyThread_acquire_lock_timed(self->lock, microseconds, 1); | ||
Py_END_ALLOW_THREADS | ||
for (;;) { | ||
if (!RingBuf_IsEmpty(&self->buf)) { | ||
return RingBuf_Get(&self->buf); | ||
} | ||
|
||
if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) { | ||
return NULL; | ||
} | ||
if (r == PY_LOCK_FAILURE) { | ||
PyObject *module = PyType_GetModule(cls); | ||
simplequeue_state *state = simplequeue_get_state(module); | ||
/* Timed out */ | ||
PyErr_SetNone(state->EmptyError); | ||
return NULL; | ||
if (!block) { | ||
return empty_error(cls); | ||
} | ||
self->locked = 1; | ||
|
||
/* Adjust timeout for next iteration (if any) */ | ||
if (microseconds > 0) { | ||
timeout = _PyDeadline_Get(endtime); | ||
microseconds = _PyTime_AsMicroseconds(timeout, | ||
_PyTime_ROUND_CEILING); | ||
int64_t timeout_ns = -1; | ||
if (endtime != 0) { | ||
timeout_ns = _PyDeadline_Get(endtime); | ||
if (timeout_ns < 0) { | ||
return empty_error(cls); | ||
} | ||
} | ||
} | ||
|
||
/* BEGIN GIL-protected critical section */ | ||
item = RingBuf_Get(&self->buf); | ||
if (self->locked) { | ||
PyThread_release_lock(self->lock); | ||
self->locked = 0; | ||
bool waiting = 1; | ||
self->has_threads_waiting = waiting; | ||
|
||
PyObject *item = NULL; | ||
int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting, | ||
sizeof(bool), timeout_ns, &item, | ||
/* detach */ 1); | ||
switch (st) { | ||
case Py_PARK_OK: { | ||
assert(item != NULL); | ||
return item; | ||
} | ||
case Py_PARK_TIMEOUT: { | ||
return empty_error(cls); | ||
} | ||
case Py_PARK_INTR: { | ||
// Interrupted | ||
if (Py_MakePendingCalls() < 0) { | ||
return NULL; | ||
} | ||
break; | ||
} | ||
case Py_PARK_AGAIN: { | ||
// This should be impossible with the current implementation of | ||
// PyParkingLot, but would be possible if critical sections / | ||
// the GIL were released before the thread was added to the | ||
// internal thread queue in the parking lot. | ||
mpage marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break; | ||
} | ||
default: { | ||
Py_UNREACHABLE(); | ||
} | ||
} | ||
} | ||
/* END GIL-protected critical section */ | ||
|
||
return item; | ||
} | ||
|
||
/*[clinic input] | ||
@critical_section | ||
_queue.SimpleQueue.get_nowait | ||
|
||
cls: defining_class | ||
|
@@ -434,33 +458,35 @@ raise the Empty exception. | |
static PyObject * | ||
_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self, | ||
PyTypeObject *cls) | ||
/*[clinic end generated code: output=620c58e2750f8b8a input=842f732bf04216d3]*/ | ||
/*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/ | ||
{ | ||
return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None); | ||
} | ||
|
||
/*[clinic input] | ||
@critical_section | ||
_queue.SimpleQueue.empty -> bool | ||
|
||
Return True if the queue is empty, False otherwise (not reliable!). | ||
[clinic start generated code]*/ | ||
|
||
static int | ||
_queue_SimpleQueue_empty_impl(simplequeueobject *self) | ||
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/ | ||
/*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/ | ||
{ | ||
return RingBuf_IsEmpty(&self->buf); | ||
} | ||
|
||
/*[clinic input] | ||
@critical_section | ||
_queue.SimpleQueue.qsize -> Py_ssize_t | ||
|
||
Return the approximate size of the queue (not reliable!). | ||
[clinic start generated code]*/ | ||
|
||
static Py_ssize_t | ||
_queue_SimpleQueue_qsize_impl(simplequeueobject *self) | ||
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/ | ||
/*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/ | ||
{ | ||
return RingBuf_Len(&self->buf); | ||
} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.