-
Notifications
You must be signed in to change notification settings - Fork 416
Asynchronous pruning for RubyThreadPoolExecutor #1082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
module Concurrent | ||
module Collection | ||
# @!visibility private | ||
# @!macro ruby_timeout_queue | ||
class RubyTimeoutQueue < ::Queue | ||
def initialize(*args) | ||
if RUBY_VERSION >= '3.2' | ||
raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead" | ||
end | ||
|
||
super(*args) | ||
|
||
@mutex = Mutex.new | ||
@cond_var = ConditionVariable.new | ||
end | ||
|
||
def push(obj) | ||
@mutex.synchronize do | ||
super(obj) | ||
@cond_var.signal | ||
end | ||
end | ||
alias_method :enq, :push | ||
alias_method :<<, :push | ||
|
||
def pop(non_block = false, timeout: nil) | ||
if non_block && timeout | ||
raise ArgumentError, "can't set a timeout if non_block is enabled" | ||
end | ||
|
||
if non_block | ||
super(true) | ||
elsif timeout | ||
@mutex.synchronize do | ||
deadline = Concurrent.monotonic_time + timeout | ||
while (now = Concurrent.monotonic_time) < deadline && empty? | ||
@cond_var.wait(@mutex, deadline - now) | ||
end | ||
begin | ||
return super(true) | ||
rescue ThreadError | ||
# still empty | ||
nil | ||
end | ||
end | ||
else | ||
super(false) | ||
end | ||
end | ||
alias_method :deq, :pop | ||
alias_method :shift, :pop | ||
end | ||
private_constant :RubyTimeoutQueue | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
module Concurrent | ||
module Collection | ||
# @!visibility private | ||
# @!macro internal_implementation_note | ||
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2' | ||
::Queue | ||
else | ||
require 'concurrent/collection/ruby_timeout_queue' | ||
RubyTimeoutQueue | ||
end | ||
private_constant :TimeoutQueueImplementation | ||
|
||
# @!visibility private | ||
# @!macro timeout_queue | ||
class TimeoutQueue < TimeoutQueueImplementation | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,15 @@ | |
require 'concurrent/concern/logging' | ||
require 'concurrent/executor/ruby_executor_service' | ||
require 'concurrent/utility/monotonic_time' | ||
require 'concurrent/collection/timeout_queue' | ||
|
||
module Concurrent | ||
|
||
# @!macro thread_pool_executor | ||
# @!macro thread_pool_options | ||
# @!visibility private | ||
class RubyThreadPoolExecutor < RubyExecutorService | ||
include Concern::Deprecation | ||
|
||
# @!macro thread_pool_executor_constant_default_max_pool_size | ||
DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE | ||
|
@@ -94,9 +96,28 @@ def remaining_capacity | |
end | ||
end | ||
|
||
# removes the worker if it can be pruned | ||
# | ||
# @return [true, false] if the worker was pruned | ||
# | ||
# @!visibility private | ||
def remove_busy_worker(worker) | ||
synchronize { ns_remove_busy_worker worker } | ||
def prune_worker(worker) | ||
synchronize do | ||
if ns_prunable_capacity > 0 | ||
remove_worker worker | ||
true | ||
else | ||
false | ||
end | ||
end | ||
end | ||
|
||
# @!visibility private | ||
def remove_worker(worker) | ||
synchronize do | ||
ns_remove_ready_worker worker | ||
ns_remove_busy_worker worker | ||
end | ||
end | ||
|
||
# @!visibility private | ||
|
@@ -116,7 +137,7 @@ def worker_task_completed | |
|
||
# @!macro thread_pool_executor_method_prune_pool | ||
def prune_pool | ||
joshuay03 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
synchronize { ns_prune_pool } | ||
deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082." | ||
end | ||
|
||
private | ||
|
@@ -146,9 +167,6 @@ def ns_initialize(opts) | |
@largest_length = 0 | ||
@workers_counter = 0 | ||
@ruby_pid = $$ # detects if Ruby has forked | ||
|
||
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented | ||
@next_gc_time = Concurrent.monotonic_time + @gc_interval | ||
end | ||
|
||
# @!visibility private | ||
|
@@ -162,12 +180,10 @@ def ns_execute(*args, &task) | |
|
||
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task) | ||
@scheduled_task_count += 1 | ||
nil | ||
else | ||
return fallback_action(*args, &task) | ||
fallback_action(*args, &task) | ||
end | ||
|
||
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time | ||
nil | ||
end | ||
|
||
# @!visibility private | ||
|
@@ -218,7 +234,7 @@ def ns_assign_worker(*args, &task) | |
# @!visibility private | ||
def ns_enqueue(*args, &task) | ||
return false if @synchronous | ||
|
||
if !ns_limited_queue? || @queue.size < @max_queue | ||
@queue << [task, args] | ||
true | ||
|
@@ -265,7 +281,7 @@ def ns_ready_worker(worker, last_message, success = true) | |
end | ||
end | ||
|
||
# removes a worker which is not in not tracked in @ready | ||
# removes a worker which is not tracked in @ready | ||
# | ||
# @!visibility private | ||
def ns_remove_busy_worker(worker) | ||
|
@@ -274,25 +290,27 @@ def ns_remove_busy_worker(worker) | |
true | ||
end | ||
|
||
# try oldest worker if it is idle for enough time, it's returned back at the start | ||
# | ||
# @!visibility private | ||
def ns_prune_pool | ||
now = Concurrent.monotonic_time | ||
stopped_workers = 0 | ||
while [email protected]? && (@pool.size - stopped_workers > @min_length) | ||
worker, last_message = @ready.first | ||
if now - last_message > self.idletime | ||
stopped_workers += 1 | ||
@ready.shift | ||
worker << :stop | ||
else break | ||
end | ||
def ns_remove_ready_worker(worker) | ||
if index = @ready.index { |rw, _| rw == worker } | ||
@ready.delete_at(index) | ||
end | ||
true | ||
end | ||
|
||
@next_gc_time = Concurrent.monotonic_time + @gc_interval | ||
# @return [Integer] number of excess idle workers which can be removed without | ||
# going below min_length, or all workers if not running | ||
# | ||
# @!visibility private | ||
def ns_prunable_capacity | ||
joshuay03 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if running? | ||
[@pool.size - @min_length, @ready.size].min | ||
else | ||
@pool.size | ||
end | ||
end | ||
|
||
# @!visibility private | ||
def ns_reset_if_forked | ||
if $$ != @ruby_pid | ||
@queue.clear | ||
|
@@ -312,7 +330,7 @@ class Worker | |
|
||
def initialize(pool, id) | ||
# instance variables accessed only under pool's lock so no need to sync here again | ||
@queue = Queue.new | ||
@queue = Collection::TimeoutQueue.new | ||
@pool = pool | ||
@thread = create_worker @queue, pool, pool.idletime | ||
|
||
|
@@ -338,17 +356,22 @@ def kill | |
def create_worker(queue, pool, idletime) | ||
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime| | ||
catch(:stop) do | ||
loop do | ||
prunable = true | ||
|
||
case message = my_queue.pop | ||
loop do | ||
timeout = prunable && my_pool.running? ? my_idletime : nil | ||
case message = my_queue.pop(timeout: timeout) | ||
when nil | ||
throw :stop if my_pool.prune_worker(self) | ||
prunable = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it impossible if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If a worker is busy they will be marked as ready when the task has completed, during which we stop them (in |
||
when :stop | ||
my_pool.remove_busy_worker(self) | ||
my_pool.remove_worker(self) | ||
throw :stop | ||
|
||
else | ||
task, args = message | ||
run_task my_pool, task, args | ||
my_pool.ready_worker(self, Concurrent.monotonic_time) | ||
prunable = true | ||
end | ||
end | ||
end | ||
|
Uh oh!
There was an error while loading. Please reload this page.