class Fluent::Plugin::TailInput::TailWatcher::IOHandler
Constants
- BYTES_TO_READ
- SHUTDOWN_TIMEOUT
Attributes
shutdown_timeout[RW]
Public Class Methods
new(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1011 def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) @watcher = watcher @path = path @read_lines_limit = read_lines_limit @read_bytes_limit_per_second = read_bytes_limit_per_second @receive_lines = receive_lines @open_on_every_update = open_on_every_update @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT) @iobuf = ''.force_encoding('ASCII-8BIT') @lines = [] @io = nil @notify_mutex = Mutex.new @log = log @start_reading_time = nil @number_bytes_read = 0 @shutdown_start_time = nil @shutdown_timeout = SHUTDOWN_TIMEOUT @shutdown_mutex = Mutex.new @eof = false @metrics = metrics @log.info "following tail of #{@path}" end
Public Instance Methods
close()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1050 def close if @io && !@io.closed? @io.close @io = nil @metrics.closed.inc end end
eof?()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1062 def eof? @eof end
group_watcher()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1035 def group_watcher @watcher.group_watcher end
on_notify()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1039 def on_notify @notify_mutex.synchronize { handle_notify } end
opened?()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1058 def opened? !!@io end
ready_to_shutdown(shutdown_start_time = nil)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1043 def ready_to_shutdown(shutdown_start_time = nil) @shutdown_mutex.synchronize { @shutdown_start_time = shutdown_start_time || Fluent::Clock.now } end
Private Instance Methods
handle_notify()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1094 def handle_notify return if limit_bytes_per_second_reached? return if group_watcher&.limit_lines_reached?(@path) with_io do |io| begin read_more = false if !io.nil? && @lines.empty? begin while true @start_reading_time ||= Fluent::Clock.now group_watcher&.update_reading_time(@path) data = io.readpartial(BYTES_TO_READ, @iobuf) @eof = false @number_bytes_read += data.bytesize @fifo << data n_lines_before_read = @lines.size @fifo.read_lines(@lines) group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read) group_watcher_limit = group_watcher&.limit_lines_reached?(@path) @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" if group_watcher_limit if group_watcher_limit || limit_bytes_per_second_reached? || should_shutdown_now? # Just get out from tailing loop. read_more = false break end if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large read_more = true break end end rescue EOFError @eof = true end end unless @lines.empty? if @receive_lines.call(@lines, @watcher) @watcher.pe.update_pos(io.pos - @fifo.bytesize) @lines.clear else read_more = false end end end while read_more end end
limit_bytes_per_second_reached?()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1068 def limit_bytes_per_second_reached? return false if @read_bytes_limit_per_second < 0 # not enabled by conf return false if @number_bytes_read < @read_bytes_limit_per_second @start_reading_time ||= Fluent::Clock.now time_spent_reading = Fluent::Clock.now - @start_reading_time @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") if time_spent_reading < 1 true else @start_reading_time = nil @number_bytes_read = 0 false end end
open()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1149 def open io = Fluent::FileWrapper.open(@path) io.seek(@watcher.pe.read_pos + @fifo.bytesize) @metrics.opened.inc io rescue RangeError io.close if io raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" rescue Errno::EACCES => e @log.warn "#{e}" nil rescue Errno::ENOENT nil end
should_shutdown_now?()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1085 def should_shutdown_now? # Ensure to read all remaining lines, but abort immediately if it # seems to take too long time. @shutdown_mutex.synchronize { return false if @shutdown_start_time.nil? return Fluent::Clock.now - @shutdown_start_time > @shutdown_timeout } end
with_io() { |io| ... }
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1164 def with_io if @open_on_every_update io = open begin yield io ensure io.close unless io.nil? end else @io ||= open yield @io @eof = true if @io.nil? end rescue WatcherSetupError => e close @eof = true raise e rescue @log.error $!.to_s @log.error_backtrace close @eof = true end