class Fluent::Plugin::ForwardOutput::SocketCache
Constants
- TimedSocket
Public Class Methods
new(timeout, log)
click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 24 def initialize(timeout, log) @log = log @timeout = timeout @available_sockets = Hash.new { |obj, k| obj[k] = [] } @inflight_sockets = {} @inactive_sockets = [] @mutex = Mutex.new end
Public Instance Methods
checkin(sock)
click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 50 def checkin(sock) @mutex.synchronize do if (s = @inflight_sockets.delete(sock)) s.timeout = timeout @available_sockets[s.key] << s else @log.debug("there is no socket #{sock}") end end end
checkout_or(key) { || ... }
click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 33 def checkout_or(key) @mutex.synchronize do tsock = pick_socket(key) if tsock tsock.sock else sock = yield new_tsock = TimedSocket.new(timeout, key, sock) @log.debug("connect new socket #{new_tsock}") @inflight_sockets[sock] = new_tsock new_tsock.sock end end end
clear()
click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 98 def clear sockets = [] @mutex.synchronize do sockets += @available_sockets.values.flat_map { |v| v } sockets += @inflight_sockets.values sockets += @inactive_sockets @available_sockets.clear @inflight_sockets.clear @inactive_sockets.clear end sockets.each do |s| s.sock.close rescue nil end end
purge_obsolete_socks()
click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 71 def purge_obsolete_socks sockets = [] @mutex.synchronize do # don't touch @inflight_sockets @available_sockets.each do |_, socks| socks.each do |sock| if expired_socket?(sock) sockets << sock socks.delete(sock) end end end # reuse same object (@available_sockets) @available_sockets.reject! { |_, v| v.empty? } sockets += @inactive_sockets @inactive_sockets.clear end sockets.each do |s| s.sock.close rescue nil end end
revoke(sock)
click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 61 def revoke(sock) @mutex.synchronize do if (s = @inflight_sockets.delete(sock)) @inactive_sockets << s else @log.debug("there is no socket #{sock}") end end end
Private Instance Methods
expired_socket?(sock, time: Time.now)
click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 137 def expired_socket?(sock, time: Time.now) sock.timeout ? sock.timeout < time : false end
pick_socket(key)
click to toggle source
this method is not thread safe
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 118 def pick_socket(key) if @available_sockets[key].empty? return nil end t = Time.now if (s = @available_sockets[key].find { |sock| !expired_socket?(sock, time: t) }) @inflight_sockets[s.sock] = @available_sockets[key].delete(s) s.timeout = timeout s else nil end end
timeout()
click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 133 def timeout @timeout && Time.now + @timeout end