class Fluent::Plugin::ForwardOutput::ConnectionManager
Constants
- RequestInfo
Public Class Methods
new(log:, secure:, connection_factory:, socket_cache:)
click to toggle source
@param log [Logger] @param secure [Boolean] @param connection_factory [Proc] @param SocketCache
[Fluent::ForwardOutput::SocketCache]
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 28 def initialize(log:, secure:, connection_factory:, socket_cache:) @log = log @secure = secure @connection_factory = connection_factory @socket_cache = socket_cache end
Public Instance Methods
close(sock)
click to toggle source
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 72 def close(sock) if @socket_cache @socket_cache.checkin(sock) else sock.close_write rescue nil sock.close rescue nil end end
connect(host:, port:, hostname:, ack: nil) { |socket, request_info| ... }
click to toggle source
@param ack [Fluent::Plugin::ForwardOutput::AckHander::Ack|nil]
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 40 def connect(host:, port:, hostname:, ack: nil, &block) if @socket_cache return connect_keepalive(host: host, port: port, hostname: hostname, ack: ack, &block) end @log.debug('connect new socket') socket = @connection_factory.call(host, port, hostname) request_info = RequestInfo.new(@secure ? :helo : :established) unless block_given? return [socket, request_info] end begin yield(socket, request_info) ensure if ack ack.enqueue(socket) else socket.close_write rescue nil socket.close rescue nil end end end
purge_obsolete_socks()
click to toggle source
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 65 def purge_obsolete_socks unless @socket_cache raise "Do not call this method without keepalive option" end @socket_cache.purge_obsolete_socks end
stop()
click to toggle source
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 35 def stop @socket_cache && @socket_cache.clear end
Private Instance Methods
connect_keepalive(host:, port:, hostname:, ack: nil) { |socket, request_info| ... }
click to toggle source
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 83 def connect_keepalive(host:, port:, hostname:, ack: nil) request_info = RequestInfo.new(:established) socket = @socket_cache.checkout_or([host, port, hostname]) do s = @connection_factory.call(host, port, hostname) request_info = RequestInfo.new(@secure ? :helo : :established) # overwrite if new connection s end unless block_given? return [socket, request_info] end ret = nil begin ret = yield(socket, request_info) rescue @socket_cache.revoke(socket) raise else if ack ack.enqueue(socket) else @socket_cache.checkin(socket) end end ret end