diff --git a/src/debug_utils.h b/src/debug_utils.h index d4391ac987ba5b..7f073e1ea8b37a 100644 --- a/src/debug_utils.h +++ b/src/debug_utils.h @@ -55,6 +55,8 @@ void NODE_EXTERN_PRIVATE FWrite(FILE* file, const std::string& str); V(MKSNAPSHOT) \ V(SNAPSHOT_SERDES) \ V(PERMISSION_MODEL) \ + V(PLATFORM_MINIMAL) \ + V(PLATFORM_VERBOSE) \ V(QUIC) enum class DebugCategory : unsigned int { diff --git a/src/node_platform.cc b/src/node_platform.cc index 50f5d4ef02aabd..a4def82142d8b7 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -13,23 +13,47 @@ using v8::Isolate; using v8::Object; using v8::Platform; using v8::Task; +using v8::TaskPriority; namespace { struct PlatformWorkerData { - TaskQueue* task_queue; + TaskQueue* task_queue; Mutex* platform_workers_mutex; ConditionVariable* platform_workers_ready; int* pending_platform_workers; int id; + PlatformDebugLogLevel debug_log_level; }; +const char* GetTaskPriorityName(TaskPriority priority) { + switch (priority) { + case TaskPriority::kUserBlocking: + return "UserBlocking"; + case TaskPriority::kUserVisible: + return "UserVisible"; + case TaskPriority::kBestEffort: + return "BestEffort"; + default: + return "Unknown"; + } +} + +static void PrintSourceLocation(const v8::SourceLocation& location) { + auto loc = location.ToString(); + if (!loc.empty()) { + fprintf(stderr, " %s\n", loc.c_str()); + } else { + fprintf(stderr, " \n"); + } +} + static void PlatformWorkerThread(void* data) { uv_thread_setname("V8Worker"); std::unique_ptr worker_data(static_cast(data)); - TaskQueue* pending_worker_tasks = worker_data->task_queue; + TaskQueue* pending_worker_tasks = worker_data->task_queue; TRACE_EVENT_METADATA1("__metadata", "thread_name", "name", "PlatformWorkerThread"); @@ -40,10 +64,24 @@ static void PlatformWorkerThread(void* data) { worker_data->platform_workers_ready->Signal(lock); } - while (std::unique_ptr task = + bool debug_log_enabled = + worker_data->debug_log_level != PlatformDebugLogLevel::kNone; + int id = worker_data->id; + while (std::unique_ptr entry = pending_worker_tasks->Lock().BlockingPop()) { - task->Run(); - pending_worker_tasks->Lock().NotifyOfCompletion(); + if (debug_log_enabled) { + fprintf(stderr, + "\nPlatformWorkerThread %d running task %p %s\n", + id, + entry->task.get(), + GetTaskPriorityName(entry->priority)); + fflush(stderr); + } + entry->task->Run(); + // See NodePlatform::DrainTasks(). + if (entry->is_outstanding()) { + pending_worker_tasks->Lock().NotifyOfOutstandingCompletion(); + } } } @@ -58,8 +96,8 @@ static int GetActualThreadPoolSize(int thread_pool_size) { class WorkerThreadsTaskRunner::DelayedTaskScheduler { public: - explicit DelayedTaskScheduler(TaskQueue* tasks) - : pending_worker_tasks_(tasks) {} + explicit DelayedTaskScheduler(TaskQueue* tasks) + : pending_worker_tasks_(tasks) {} std::unique_ptr Start() { auto start_thread = [](void* data) { @@ -73,10 +111,21 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { return t; } - void PostDelayedTask(std::unique_ptr task, double delay_in_seconds) { + void PostDelayedTask(v8::TaskPriority priority, + std::unique_ptr task, + double delay_in_seconds) { auto locked = tasks_.Lock(); - locked.Push(std::make_unique( - this, std::move(task), delay_in_seconds)); + + auto entry = std::make_unique(std::move(task), priority); + auto delayed = std::make_unique( + this, std::move(entry), delay_in_seconds); + + // The delayed task scheuler is on is own thread with its own loop that + // runs the timers for the scheduled tasks to pop the original task back + // into the the worker task queue. This first pushes the tasks that + // schedules the timers into the local task queue that will be flushed + // by the local event loop. + locked.Push(std::move(delayed)); uv_async_send(&flush_tasks_); } @@ -104,11 +153,16 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { DelayedTaskScheduler* scheduler = ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop); - std::queue> tasks_to_run = - scheduler->tasks_.Lock().PopAll(); + auto tasks_to_run = scheduler->tasks_.Lock().PopAll(); while (!tasks_to_run.empty()) { - std::unique_ptr task = std::move(tasks_to_run.front()); + // We have to use const_cast because std::priority_queue::top() does not + // return a movable item. + std::unique_ptr task = + std::move(const_cast&>(tasks_to_run.top())); tasks_to_run.pop(); + // This runs either the ScheduleTasks that scheduels the timers to + // pop the tasks back into the worker task runner queue, or the + // or the StopTasks to stop the timers and drop all the pending tasks. task->Run(); } } @@ -134,11 +188,11 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { class ScheduleTask : public Task { public: ScheduleTask(DelayedTaskScheduler* scheduler, - std::unique_ptr task, + std::unique_ptr task, double delay_in_seconds) - : scheduler_(scheduler), - task_(std::move(task)), - delay_in_seconds_(delay_in_seconds) {} + : scheduler_(scheduler), + task_(std::move(task)), + delay_in_seconds_(delay_in_seconds) {} void Run() override { uint64_t delay_millis = llround(delay_in_seconds_ * 1000); @@ -151,37 +205,46 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler { private: DelayedTaskScheduler* scheduler_; - std::unique_ptr task_; + std::unique_ptr task_; double delay_in_seconds_; }; static void RunTask(uv_timer_t* timer) { DelayedTaskScheduler* scheduler = ContainerOf(&DelayedTaskScheduler::loop_, timer->loop); - scheduler->pending_worker_tasks_->Lock().Push( - scheduler->TakeTimerTask(timer)); + auto entry = scheduler->TakeTimerTask(timer); + bool is_outstanding = entry->is_outstanding(); + scheduler->pending_worker_tasks_->Lock().Push(std::move(entry), + is_outstanding); } - std::unique_ptr TakeTimerTask(uv_timer_t* timer) { - std::unique_ptr task(static_cast(timer->data)); + std::unique_ptr TakeTimerTask(uv_timer_t* timer) { + std::unique_ptr task_entry( + static_cast(timer->data)); uv_timer_stop(timer); uv_close(reinterpret_cast(timer), [](uv_handle_t* handle) { delete reinterpret_cast(handle); }); timers_.erase(timer); - return task; + return task_entry; } uv_sem_t ready_; - TaskQueue* pending_worker_tasks_; + // Task queue in the worker thread task runner, we push the delayed task back + // to it when the timer expires. + TaskQueue* pending_worker_tasks_; + // Locally scheduled tasks to be poped into the worker task runner queue. + // It is flushed whenever the next closest timer expires. TaskQueue tasks_; uv_loop_t loop_; uv_async_t flush_tasks_; std::unordered_set timers_; }; -WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { +WorkerThreadsTaskRunner::WorkerThreadsTaskRunner( + int thread_pool_size, PlatformDebugLogLevel debug_log_level) + : debug_log_level_(debug_log_level) { Mutex platform_workers_mutex; ConditionVariable platform_workers_ready; @@ -193,10 +256,13 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { threads_.push_back(delayed_task_scheduler_->Start()); for (int i = 0; i < thread_pool_size; i++) { - PlatformWorkerData* worker_data = new PlatformWorkerData{ - &pending_worker_tasks_, &platform_workers_mutex, - &platform_workers_ready, &pending_platform_workers, i - }; + PlatformWorkerData* worker_data = + new PlatformWorkerData{&pending_worker_tasks_, + &platform_workers_mutex, + &platform_workers_ready, + &pending_platform_workers, + i, + debug_log_level_}; std::unique_ptr t { new uv_thread_t() }; if (uv_thread_create(t.get(), PlatformWorkerThread, worker_data) != 0) { @@ -212,13 +278,21 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { } } -void WorkerThreadsTaskRunner::PostTask(std::unique_ptr task) { - pending_worker_tasks_.Lock().Push(std::move(task)); +void WorkerThreadsTaskRunner::PostTask(v8::TaskPriority priority, + std::unique_ptr task, + const v8::SourceLocation& location) { + auto entry = std::make_unique(std::move(task), priority); + bool is_outstanding = entry->is_outstanding(); + pending_worker_tasks_.Lock().Push(std::move(entry), is_outstanding); } -void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr task, - double delay_in_seconds) { - delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds); +void WorkerThreadsTaskRunner::PostDelayedTask( + v8::TaskPriority priority, + std::unique_ptr task, + const v8::SourceLocation& location, + double delay_in_seconds) { + delayed_task_scheduler_->PostDelayedTask( + priority, std::move(task), delay_in_seconds); } void WorkerThreadsTaskRunner::BlockingDrain() { @@ -238,8 +312,8 @@ int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const { } PerIsolatePlatformData::PerIsolatePlatformData( - Isolate* isolate, uv_loop_t* loop) - : isolate_(isolate), loop_(loop) { + Isolate* isolate, uv_loop_t* loop, PlatformDebugLogLevel debug_log_level) + : isolate_(isolate), loop_(loop), debug_log_level_(debug_log_level) { flush_tasks_ = new uv_async_t(); CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks)); flush_tasks_->data = static_cast(this); @@ -267,9 +341,20 @@ void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr task, // the foreground task runner is being cleaned up by Shutdown(). In that // case, make sure we wait until the shutdown is completed (which leads // to flush_tasks_ == nullptr, and the task will be discarded). + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, "\nPerIsolatePlatformData::PostTaskImpl %p", task.get()); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } + auto locked = foreground_tasks_.Lock(); if (flush_tasks_ == nullptr) return; - locked.Push(std::move(task)); + // All foreground tasks are treated as user blocking tasks. + locked.Push(std::make_unique( + std::move(task), v8::TaskPriority::kUserBlocking)); uv_async_send(flush_tasks_); } @@ -277,12 +362,26 @@ void PerIsolatePlatformData::PostDelayedTaskImpl( std::unique_ptr task, double delay_in_seconds, const v8::SourceLocation& location) { + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, + "\nPerIsolatePlatformData::PostDelayedTaskImpl %p %f", + task.get(), + delay_in_seconds); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } + auto locked = foreground_delayed_tasks_.Lock(); if (flush_tasks_ == nullptr) return; std::unique_ptr delayed(new DelayedTask()); delayed->task = std::move(task); delayed->platform_data = shared_from_this(); delayed->timeout = delay_in_seconds; + // All foreground tasks are treated as user blocking tasks. + delayed->priority = v8::TaskPriority::kUserBlocking; locked.Push(std::move(delayed)); uv_async_send(flush_tasks_); } @@ -346,6 +445,16 @@ void PerIsolatePlatformData::DecreaseHandleCount() { NodePlatform::NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller, v8::PageAllocator* page_allocator) { + if (per_process::enabled_debug_list.enabled( + DebugCategory::PLATFORM_VERBOSE)) { + debug_log_level_ = PlatformDebugLogLevel::kVerbose; + } else if (per_process::enabled_debug_list.enabled( + DebugCategory::PLATFORM_MINIMAL)) { + debug_log_level_ = PlatformDebugLogLevel::kMinimal; + } else { + debug_log_level_ = PlatformDebugLogLevel::kNone; + } + if (tracing_controller != nullptr) { tracing_controller_ = tracing_controller; } else { @@ -362,8 +471,8 @@ NodePlatform::NodePlatform(int thread_pool_size, DCHECK_EQ(GetTracingController(), tracing_controller_); thread_pool_size = GetActualThreadPoolSize(thread_pool_size); - worker_thread_task_runner_ = - std::make_shared(thread_pool_size); + worker_thread_task_runner_ = std::make_shared( + thread_pool_size, debug_log_level_); } NodePlatform::~NodePlatform() { @@ -372,7 +481,8 @@ NodePlatform::~NodePlatform() { void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) { Mutex::ScopedLock lock(per_isolate_mutex_); - auto delegate = std::make_shared(isolate, loop); + auto delegate = + std::make_shared(isolate, loop, debug_log_level_); IsolatePlatformDelegate* ptr = delegate.get(); auto insertion = per_isolate_.emplace( isolate, @@ -470,7 +580,25 @@ void NodePlatform::DrainTasks(Isolate* isolate) { if (!per_isolate) return; do { - // Worker tasks aren't associated with an Isolate. + // FIXME(54918): we should not be blocking on the worker tasks on the + // main thread in one go. Doing so leads to two problems: + // 1. If any of the worker tasks post another foreground task and wait + // for it to complete, and that foreground task is posted right after + // we flush the foreground task queue and before the foreground thread + // goes into sleep, we'll never be able to wake up to execute that + // foreground task and in turn the worker task will never complete, and + // we have a deadlock. + // 2. Worker tasks can be posted from any thread, not necessarily associated + // with the current isolate, and we can be blocking on a worker task that + // is associated with a completely unrelated isolate in the event loop. + // This is suboptimal. + // + // However, not blocking on the worker tasks at all can lead to loss of some + // critical user-blocking worker tasks e.g. wasm async compilation tasks, + // which should block the main thread until they are completed, as the + // documentation suggets. As a compromise, we currently only block on + // user-blocking tasks to reduce the chance of deadlocks while making sure + // that criticl user-blocking tasks are not lost. worker_thread_task_runner_->BlockingDrain(); } while (per_isolate->FlushForegroundTasksInternal()); } @@ -478,11 +606,13 @@ void NodePlatform::DrainTasks(Isolate* isolate) { bool PerIsolatePlatformData::FlushForegroundTasksInternal() { bool did_work = false; - std::queue> delayed_tasks_to_schedule = - foreground_delayed_tasks_.Lock().PopAll(); + auto delayed_tasks_to_schedule = foreground_delayed_tasks_.Lock().PopAll(); while (!delayed_tasks_to_schedule.empty()) { + // We have to use const_cast because std::priority_queue::top() does not + // return a movable item. std::unique_ptr delayed = - std::move(delayed_tasks_to_schedule.front()); + std::move(const_cast&>( + delayed_tasks_to_schedule.top())); delayed_tasks_to_schedule.pop(); did_work = true; @@ -507,17 +637,20 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() { }); } - std::queue> tasks; + TaskQueue::PriorityQueue tasks; { auto locked = foreground_tasks_.Lock(); tasks = locked.PopAll(); } while (!tasks.empty()) { - std::unique_ptr task = std::move(tasks.front()); + // We have to use const_cast because std::priority_queue::top() does not + // return a movable item. + std::unique_ptr entry = + std::move(const_cast&>(tasks.top())); tasks.pop(); did_work = true; - RunForegroundTask(std::move(task)); + RunForegroundTask(std::move(entry->task)); } return did_work; @@ -527,7 +660,18 @@ void NodePlatform::PostTaskOnWorkerThreadImpl( v8::TaskPriority priority, std::unique_ptr task, const v8::SourceLocation& location) { - worker_thread_task_runner_->PostTask(std::move(task)); + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, + "\nNodePlatform::PostTaskOnWorkerThreadImpl %s %p", + GetTaskPriorityName(priority), + task.get()); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } + worker_thread_task_runner_->PostTask(priority, std::move(task), location); } void NodePlatform::PostDelayedTaskOnWorkerThreadImpl( @@ -535,8 +679,20 @@ void NodePlatform::PostDelayedTaskOnWorkerThreadImpl( std::unique_ptr task, double delay_in_seconds, const v8::SourceLocation& location) { - worker_thread_task_runner_->PostDelayedTask(std::move(task), - delay_in_seconds); + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, + "\nNodePlatform::PostDelayedTaskOnWorkerThreadImpl %s %p %f", + GetTaskPriorityName(priority), + task.get(), + delay_in_seconds); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } + worker_thread_task_runner_->PostDelayedTask( + priority, std::move(task), location, delay_in_seconds); } IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) { @@ -564,6 +720,17 @@ std::unique_ptr NodePlatform::CreateJobImpl( v8::TaskPriority priority, std::unique_ptr job_task, const v8::SourceLocation& location) { + if (debug_log_level_ != PlatformDebugLogLevel::kNone) { + fprintf(stderr, + "\nNodePlatform::CreateJobImpl %s %p", + GetTaskPriorityName(priority), + job_task.get()); + PrintSourceLocation(location); + if (debug_log_level_ == PlatformDebugLogLevel::kVerbose) { + DumpNativeBacktrace(stderr); + } + fflush(stderr); + } return v8::platform::NewDefaultJobHandle( this, priority, std::move(job_task), NumberOfWorkerThreads()); } @@ -605,16 +772,22 @@ v8::PageAllocator* NodePlatform::GetPageAllocator() { template TaskQueue::TaskQueue() - : lock_(), tasks_available_(), tasks_drained_(), - outstanding_tasks_(0), stopped_(false), task_queue_() { } + : lock_(), + tasks_available_(), + outstanding_tasks_drained_(), + outstanding_tasks_(0), + stopped_(false), + task_queue_() {} template TaskQueue::Locked::Locked(TaskQueue* queue) : queue_(queue), lock_(queue->lock_) {} template -void TaskQueue::Locked::Push(std::unique_ptr task) { - queue_->outstanding_tasks_++; +void TaskQueue::Locked::Push(std::unique_ptr task, bool outstanding) { + if (outstanding) { + queue_->outstanding_tasks_++; + } queue_->task_queue_.push(std::move(task)); queue_->tasks_available_.Signal(lock_); } @@ -624,7 +797,8 @@ std::unique_ptr TaskQueue::Locked::Pop() { if (queue_->task_queue_.empty()) { return std::unique_ptr(nullptr); } - std::unique_ptr result = std::move(queue_->task_queue_.front()); + std::unique_ptr result = std::move( + std::move(const_cast&>(queue_->task_queue_.top()))); queue_->task_queue_.pop(); return result; } @@ -637,22 +811,23 @@ std::unique_ptr TaskQueue::Locked::BlockingPop() { if (queue_->stopped_) { return std::unique_ptr(nullptr); } - std::unique_ptr result = std::move(queue_->task_queue_.front()); + std::unique_ptr result = std::move( + std::move(const_cast&>(queue_->task_queue_.top()))); queue_->task_queue_.pop(); return result; } template -void TaskQueue::Locked::NotifyOfCompletion() { +void TaskQueue::Locked::NotifyOfOutstandingCompletion() { if (--queue_->outstanding_tasks_ == 0) { - queue_->tasks_drained_.Broadcast(lock_); + queue_->outstanding_tasks_drained_.Broadcast(lock_); } } template void TaskQueue::Locked::BlockingDrain() { while (queue_->outstanding_tasks_ > 0) { - queue_->tasks_drained_.Wait(lock_); + queue_->outstanding_tasks_drained_.Wait(lock_); } } @@ -663,8 +838,8 @@ void TaskQueue::Locked::Stop() { } template -std::queue> TaskQueue::Locked::PopAll() { - std::queue> result; +TaskQueue::PriorityQueue TaskQueue::Locked::PopAll() { + TaskQueue::PriorityQueue result; result.swap(queue_->task_queue_); return result; } diff --git a/src/node_platform.h b/src/node_platform.h index 6462f06f6983b2..cee61eecf1f864 100644 --- a/src/node_platform.h +++ b/src/node_platform.h @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -19,18 +20,41 @@ class NodePlatform; class IsolateData; class PerIsolatePlatformData; +template +struct has_priority : std::false_type {}; + +template +struct has_priority().priority)>> + : std::true_type {}; + template class TaskQueue { public: + // If the entry type has a priority memeber, order the priority queue by + // that - higher priority first. Otherwise, maintain insertion order. + struct EntryCompare { + bool operator()(const std::unique_ptr& a, + const std::unique_ptr& b) const { + if constexpr (has_priority::value) { + return a->priority < b->priority; + } else { + return false; + } + } + }; + + using PriorityQueue = std::priority_queue, + std::vector>, + EntryCompare>; class Locked { public: - void Push(std::unique_ptr task); + void Push(std::unique_ptr task, bool outstanding = false); std::unique_ptr Pop(); std::unique_ptr BlockingPop(); - void NotifyOfCompletion(); + void NotifyOfOutstandingCompletion(); void BlockingDrain(); void Stop(); - std::queue> PopAll(); + PriorityQueue PopAll(); private: friend class TaskQueue; @@ -48,26 +72,46 @@ class TaskQueue { private: Mutex lock_; ConditionVariable tasks_available_; - ConditionVariable tasks_drained_; + ConditionVariable outstanding_tasks_drained_; int outstanding_tasks_; bool stopped_; - std::queue> task_queue_; + PriorityQueue task_queue_; +}; + +struct TaskQueueEntry { + std::unique_ptr task; + v8::TaskPriority priority; + TaskQueueEntry(std::unique_ptr t, v8::TaskPriority p) + : task(std::move(t)), priority(p) {} + inline bool is_outstanding() const { + return priority == v8::TaskPriority::kUserBlocking; + } }; struct DelayedTask { std::unique_ptr task; + v8::TaskPriority priority; uv_timer_t timer; double timeout; std::shared_ptr platform_data; }; +enum class PlatformDebugLogLevel { + kNone = 0, + kMinimal = 1, + kVerbose = 2, +}; + // This acts as the foreground task runner for a given Isolate. class PerIsolatePlatformData : public IsolatePlatformDelegate, public v8::TaskRunner, public std::enable_shared_from_this { public: - PerIsolatePlatformData(v8::Isolate* isolate, uv_loop_t* loop); + PerIsolatePlatformData( + v8::Isolate* isolate, + uv_loop_t* loop, + PlatformDebugLogLevel debug_log_level = PlatformDebugLogLevel::kNone); ~PerIsolatePlatformData() override; std::shared_ptr GetForegroundTaskRunner() override; @@ -127,22 +171,29 @@ class PerIsolatePlatformData // When acquiring locks for both task queues, lock foreground_tasks_ // first then foreground_delayed_tasks_ to avoid deadlocks. - TaskQueue foreground_tasks_; + TaskQueue foreground_tasks_; TaskQueue foreground_delayed_tasks_; // Use a custom deleter because libuv needs to close the handle first. typedef std::unique_ptr DelayedTaskPointer; std::vector scheduled_delayed_tasks_; + PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone; }; // This acts as the single worker thread task runner for all Isolates. class WorkerThreadsTaskRunner { public: - explicit WorkerThreadsTaskRunner(int thread_pool_size); + explicit WorkerThreadsTaskRunner(int thread_pool_size, + PlatformDebugLogLevel debug_log_level); - void PostTask(std::unique_ptr task); - void PostDelayedTask(std::unique_ptr task, double delay_in_seconds); + void PostTask(v8::TaskPriority priority, + std::unique_ptr task, + const v8::SourceLocation& location); + void PostDelayedTask(v8::TaskPriority priority, + std::unique_ptr task, + const v8::SourceLocation& location, + double delay_in_seconds); void BlockingDrain(); void Shutdown(); @@ -150,12 +201,21 @@ class WorkerThreadsTaskRunner { int NumberOfWorkerThreads() const; private: - TaskQueue pending_worker_tasks_; + // A queue shared by all threads. The consumers are the worker threads which + // take tasks from it to run in PlatformWorkerThread(). The producers can be + // any thread. Both the foreground thread and the worker threads can push + // tasks into the queue via v8::Platform::PostTaskOnWorkerThread() which + // eventually calls PostTask() on this class. When any thread calls + // v8::Platform::PostDelayedTaskOnWorkerThread(), the DelayedTaskScheduler + // thread will schedule a timer that pushes the delayed tasks back into this + // queue when the timer expires. + TaskQueue pending_worker_tasks_; class DelayedTaskScheduler; std::unique_ptr delayed_task_scheduler_; std::vector> threads_; + PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone; }; class NodePlatform : public MultiIsolatePlatform { @@ -216,6 +276,7 @@ class NodePlatform : public MultiIsolatePlatform { v8::PageAllocator* page_allocator_; std::shared_ptr worker_thread_task_runner_; bool has_shut_down_ = false; + PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone; }; } // namespace node diff --git a/test/parallel/test-file-write-stream4.js b/test/parallel/test-file-write-stream4.js index e741cdd79036b4..6b3862fa714d7c 100644 --- a/test/parallel/test-file-write-stream4.js +++ b/test/parallel/test-file-write-stream4.js @@ -1,4 +1,3 @@ -// Flags: --expose-gc 'use strict'; // Test that 'close' emits once and not twice when `emitClose: true` is set. @@ -18,8 +17,4 @@ const fileWriteStream = fs.createWriteStream(filepath, { }); fileReadStream.pipe(fileWriteStream); -fileWriteStream.on('close', common.mustCall(() => { - // TODO(lpinca): Remove the forced GC when - // https://github.com/nodejs/node/issues/54918 is fixed. - globalThis.gc({ type: 'major' }); -})); +fileWriteStream.on('close', common.mustCall()); diff --git a/test/parallel/test-net-write-fully-async-buffer.js b/test/parallel/test-net-write-fully-async-buffer.js index 4dfb905d23b69e..93074c3c49d6b6 100644 --- a/test/parallel/test-net-write-fully-async-buffer.js +++ b/test/parallel/test-net-write-fully-async-buffer.js @@ -23,7 +23,7 @@ const server = net.createServer(common.mustCall(function(conn) { } while (conn.write(Buffer.from(data))); - globalThis.gc({ type: 'major' }); + globalThis.gc({ type: 'minor' }); // The buffer allocated above should still be alive. } diff --git a/test/parallel/test-net-write-fully-async-hex-string.js b/test/parallel/test-net-write-fully-async-hex-string.js index c1ebe7e68b534e..2719ad6b5b5f80 100644 --- a/test/parallel/test-net-write-fully-async-hex-string.js +++ b/test/parallel/test-net-write-fully-async-hex-string.js @@ -21,7 +21,7 @@ const server = net.createServer(common.mustCall(function(conn) { } while (conn.write(data, 'hex')); - globalThis.gc({ type: 'major' }); + globalThis.gc({ type: 'minor' }); // The buffer allocated inside the .write() call should still be alive. }