@@ -40,9 +40,10 @@ static void PlatformWorkerThread(void* data) {
40
40
worker_data->platform_workers_ready ->Signal (lock);
41
41
}
42
42
43
- while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop ()) {
43
+ while (std::unique_ptr<Task> task =
44
+ pending_worker_tasks->Lock ().BlockingPop ()) {
44
45
task->Run ();
45
- pending_worker_tasks->NotifyOfCompletion ();
46
+ pending_worker_tasks->Lock (). NotifyOfCompletion ();
46
47
}
47
48
}
48
49
@@ -73,13 +74,15 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
73
74
}
74
75
75
76
void PostDelayedTask (std::unique_ptr<Task> task, double delay_in_seconds) {
76
- tasks_.Push (std::make_unique<ScheduleTask>(this , std::move (task),
77
- delay_in_seconds));
77
+ auto locked = tasks_.Lock ();
78
+ locked.Push (std::make_unique<ScheduleTask>(
79
+ this , std::move (task), delay_in_seconds));
78
80
uv_async_send (&flush_tasks_);
79
81
}
80
82
81
83
void Stop () {
82
- tasks_.Push (std::make_unique<StopTask>(this ));
84
+ auto locked = tasks_.Lock ();
85
+ locked.Push (std::make_unique<StopTask>(this ));
83
86
uv_async_send (&flush_tasks_);
84
87
}
85
88
@@ -100,8 +103,8 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
100
103
static void FlushTasks (uv_async_t * flush_tasks) {
101
104
DelayedTaskScheduler* scheduler =
102
105
ContainerOf (&DelayedTaskScheduler::loop_, flush_tasks->loop );
103
- while (std::unique_ptr<Task> task = scheduler->tasks_ .Pop ())
104
- task->Run ();
106
+ auto locked = scheduler->tasks_ .Lock ();
107
+ while (std::unique_ptr<Task> task = locked. Pop ()) task->Run ();
105
108
}
106
109
107
110
class StopTask : public Task {
@@ -149,7 +152,8 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
149
152
static void RunTask (uv_timer_t * timer) {
150
153
DelayedTaskScheduler* scheduler =
151
154
ContainerOf (&DelayedTaskScheduler::loop_, timer->loop );
152
- scheduler->pending_worker_tasks_ ->Push (scheduler->TakeTimerTask (timer));
155
+ scheduler->pending_worker_tasks_ ->Lock ().Push (
156
+ scheduler->TakeTimerTask (timer));
153
157
}
154
158
155
159
std::unique_ptr<Task> TakeTimerTask (uv_timer_t * timer) {
@@ -203,7 +207,7 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
203
207
}
204
208
205
209
void WorkerThreadsTaskRunner::PostTask (std::unique_ptr<Task> task) {
206
- pending_worker_tasks_.Push (std::move (task));
210
+ pending_worker_tasks_.Lock (). Push (std::move (task));
207
211
}
208
212
209
213
void WorkerThreadsTaskRunner::PostDelayedTask (std::unique_ptr<Task> task,
@@ -212,11 +216,11 @@ void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
212
216
}
213
217
214
218
void WorkerThreadsTaskRunner::BlockingDrain () {
215
- pending_worker_tasks_.BlockingDrain ();
219
+ pending_worker_tasks_.Lock (). BlockingDrain ();
216
220
}
217
221
218
222
void WorkerThreadsTaskRunner::Shutdown () {
219
- pending_worker_tasks_.Stop ();
223
+ pending_worker_tasks_.Lock (). Stop ();
220
224
delayed_task_scheduler_->Stop ();
221
225
for (size_t i = 0 ; i < threads_.size (); i++) {
222
226
CHECK_EQ (0 , uv_thread_join (threads_[i].get ()));
@@ -253,20 +257,22 @@ void PerIsolatePlatformData::PostIdleTaskImpl(
253
257
254
258
void PerIsolatePlatformData::PostTaskImpl (std::unique_ptr<Task> task,
255
259
const v8::SourceLocation& location) {
256
- if (flush_tasks_ == nullptr ) {
260
+ // Check if flush_tasks is valid before using it
261
+ if (!flush_tasks_valid_.load ()) {
257
262
// V8 may post tasks during Isolate disposal. In that case, the only
258
263
// sensible path forward is to discard the task.
259
264
return ;
260
265
}
261
- foreground_tasks_.Push (std::move (task));
266
+ foreground_tasks_.Lock (). Push (std::move (task));
262
267
uv_async_send (flush_tasks_);
263
268
}
264
269
265
270
void PerIsolatePlatformData::PostDelayedTaskImpl (
266
271
std::unique_ptr<Task> task,
267
272
double delay_in_seconds,
268
273
const v8::SourceLocation& location) {
269
- if (flush_tasks_ == nullptr ) {
274
+ // Check if flush_tasks is valid before using it
275
+ if (!flush_tasks_valid_.load ()) {
270
276
// V8 may post tasks during Isolate disposal. In that case, the only
271
277
// sensible path forward is to discard the task.
272
278
return ;
@@ -275,7 +281,7 @@ void PerIsolatePlatformData::PostDelayedTaskImpl(
275
281
delayed->task = std::move (task);
276
282
delayed->platform_data = shared_from_this ();
277
283
delayed->timeout = delay_in_seconds;
278
- foreground_delayed_tasks_.Push (std::move (delayed));
284
+ foreground_delayed_tasks_.Lock (). Push (std::move (delayed));
279
285
uv_async_send (flush_tasks_);
280
286
}
281
287
@@ -301,15 +307,19 @@ void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
301
307
}
302
308
303
309
void PerIsolatePlatformData::Shutdown () {
304
- if (flush_tasks_ == nullptr )
310
+ // First mark the flush_tasks as invalid
311
+ bool expected = true ;
312
+ if (!flush_tasks_valid_.compare_exchange_strong (expected, false )) {
313
+ // Already marked invalid
305
314
return ;
315
+ }
306
316
307
317
// While there should be no V8 tasks in the queues at this point, it is
308
318
// possible that Node.js-internal tasks from e.g. the inspector are still
309
319
// lying around. We clear these queues and ignore the return value,
310
320
// effectively deleting the tasks instead of running them.
311
- foreground_delayed_tasks_.PopAll ();
312
- foreground_tasks_.PopAll ();
321
+ foreground_delayed_tasks_.Lock (). PopAll ();
322
+ foreground_tasks_.Lock (). PopAll ();
313
323
scheduled_delayed_tasks_.clear ();
314
324
315
325
// Both destroying the scheduled_delayed_tasks_ lists and closing
@@ -472,33 +482,36 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
472
482
bool PerIsolatePlatformData::FlushForegroundTasksInternal () {
473
483
bool did_work = false ;
474
484
475
- while (std::unique_ptr<DelayedTask> delayed =
476
- foreground_delayed_tasks_.Pop ()) {
477
- did_work = true ;
478
- uint64_t delay_millis = llround (delayed->timeout * 1000 );
479
-
480
- delayed->timer .data = static_cast <void *>(delayed.get ());
481
- uv_timer_init (loop_, &delayed->timer );
482
- // Timers may not guarantee queue ordering of events with the same delay if
483
- // the delay is non-zero. This should not be a problem in practice.
484
- uv_timer_start (&delayed->timer , RunForegroundTask, delay_millis, 0 );
485
- uv_unref (reinterpret_cast <uv_handle_t *>(&delayed->timer ));
486
- uv_handle_count_++;
487
-
488
- scheduled_delayed_tasks_.emplace_back (delayed.release (),
489
- [](DelayedTask* delayed) {
490
- uv_close (reinterpret_cast <uv_handle_t *>(&delayed->timer ),
491
- [](uv_handle_t * handle) {
492
- std::unique_ptr<DelayedTask> task {
493
- static_cast <DelayedTask*>(handle->data ) };
494
- task->platform_data ->DecreaseHandleCount ();
495
- });
496
- });
485
+ {
486
+ auto locked_tasks = foreground_delayed_tasks_.Lock ();
487
+ while (std::unique_ptr<DelayedTask> delayed = locked_tasks.Pop ()) {
488
+ did_work = true ;
489
+ uint64_t delay_millis = llround (delayed->timeout * 1000 );
490
+
491
+ delayed->timer .data = static_cast <void *>(delayed.get ());
492
+ uv_timer_init (loop_, &delayed->timer );
493
+ // Timers may not guarantee queue ordering of events with the same delay
494
+ // if the delay is non-zero. This should not be a problem in practice.
495
+ uv_timer_start (&delayed->timer , RunForegroundTask, delay_millis, 0 );
496
+ uv_unref (reinterpret_cast <uv_handle_t *>(&delayed->timer ));
497
+ uv_handle_count_++;
498
+
499
+ scheduled_delayed_tasks_.emplace_back (
500
+ delayed.release (), [](DelayedTask* delayed) {
501
+ uv_close (reinterpret_cast <uv_handle_t *>(&delayed->timer ),
502
+ [](uv_handle_t * handle) {
503
+ std::unique_ptr<DelayedTask> task{
504
+ static_cast <DelayedTask*>(handle->data )};
505
+ task->platform_data ->DecreaseHandleCount ();
506
+ });
507
+ });
508
+ }
497
509
}
510
+
498
511
// Move all foreground tasks into a separate queue and flush that queue.
499
512
// This way tasks that are posted while flushing the queue will be run on the
500
513
// next call of FlushForegroundTasksInternal.
501
- std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll ();
514
+ std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.Lock (). PopAll ();
502
515
while (!tasks.empty ()) {
503
516
std::unique_ptr<Task> task = std::move (tasks.front ());
504
517
tasks.pop ();
@@ -593,68 +606,4 @@ TaskQueue<T>::TaskQueue()
593
606
: lock_(), tasks_available_(), tasks_drained_(),
594
607
outstanding_tasks_(0 ), stopped_(false ), task_queue_() { }
595
608
596
- template <class T >
597
- void TaskQueue<T>::Push(std::unique_ptr<T> task) {
598
- Mutex::ScopedLock scoped_lock (lock_);
599
- outstanding_tasks_++;
600
- task_queue_.push (std::move (task));
601
- tasks_available_.Signal (scoped_lock);
602
- }
603
-
604
- template <class T >
605
- std::unique_ptr<T> TaskQueue<T>::Pop() {
606
- Mutex::ScopedLock scoped_lock (lock_);
607
- if (task_queue_.empty ()) {
608
- return std::unique_ptr<T>(nullptr );
609
- }
610
- std::unique_ptr<T> result = std::move (task_queue_.front ());
611
- task_queue_.pop ();
612
- return result;
613
- }
614
-
615
- template <class T >
616
- std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
617
- Mutex::ScopedLock scoped_lock (lock_);
618
- while (task_queue_.empty () && !stopped_) {
619
- tasks_available_.Wait (scoped_lock);
620
- }
621
- if (stopped_) {
622
- return std::unique_ptr<T>(nullptr );
623
- }
624
- std::unique_ptr<T> result = std::move (task_queue_.front ());
625
- task_queue_.pop ();
626
- return result;
627
- }
628
-
629
- template <class T >
630
- void TaskQueue<T>::NotifyOfCompletion() {
631
- Mutex::ScopedLock scoped_lock (lock_);
632
- if (--outstanding_tasks_ == 0 ) {
633
- tasks_drained_.Broadcast (scoped_lock);
634
- }
635
- }
636
-
637
- template <class T >
638
- void TaskQueue<T>::BlockingDrain() {
639
- Mutex::ScopedLock scoped_lock (lock_);
640
- while (outstanding_tasks_ > 0 ) {
641
- tasks_drained_.Wait (scoped_lock);
642
- }
643
- }
644
-
645
- template <class T >
646
- void TaskQueue<T>::Stop() {
647
- Mutex::ScopedLock scoped_lock (lock_);
648
- stopped_ = true ;
649
- tasks_available_.Broadcast (scoped_lock);
650
- }
651
-
652
- template <class T >
653
- std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
654
- Mutex::ScopedLock scoped_lock (lock_);
655
- std::queue<std::unique_ptr<T>> result;
656
- result.swap (task_queue_);
657
- return result;
658
- }
659
-
660
609
} // namespace node
0 commit comments