class Fluent::Plugin::TailInput::TailWatcher::IOHandler

Constants

BYTES_TO_READ
SHUTDOWN_TIMEOUT

Attributes

shutdown_timeout[RW]

Public Class Methods

new(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1011
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
  @watcher = watcher
  @path = path
  @read_lines_limit = read_lines_limit
  @read_bytes_limit_per_second = read_bytes_limit_per_second
  @receive_lines = receive_lines
  @open_on_every_update = open_on_every_update
  @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
  @iobuf = ''.force_encoding('ASCII-8BIT')
  @lines = []
  @io = nil
  @notify_mutex = Mutex.new
  @log = log
  @start_reading_time = nil
  @number_bytes_read = 0
  @shutdown_start_time = nil
  @shutdown_timeout = SHUTDOWN_TIMEOUT
  @shutdown_mutex = Mutex.new
  @eof = false
  @metrics = metrics

  @log.info "following tail of #{@path}"
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1050
def close
  if @io && !@io.closed?
    @io.close
    @io = nil
    @metrics.closed.inc
  end
end
eof?() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1062
def eof?
  @eof
end
group_watcher() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1035
def group_watcher
  @watcher.group_watcher
end
on_notify() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1039
def on_notify
  @notify_mutex.synchronize { handle_notify }
end
opened?() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1058
def opened?
  !!@io
end
ready_to_shutdown(shutdown_start_time = nil) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1043
def ready_to_shutdown(shutdown_start_time = nil)
  @shutdown_mutex.synchronize {
    @shutdown_start_time =
      shutdown_start_time || Fluent::Clock.now
  }
end

Private Instance Methods

handle_notify() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1094
def handle_notify
  return if limit_bytes_per_second_reached?
  return if group_watcher&.limit_lines_reached?(@path)

  with_io do |io|
    begin
      read_more = false

      if !io.nil? && @lines.empty?
        begin
          while true
            @start_reading_time ||= Fluent::Clock.now
            group_watcher&.update_reading_time(@path)

            data = io.readpartial(BYTES_TO_READ, @iobuf)
            @eof = false
            @number_bytes_read += data.bytesize
            @fifo << data

            n_lines_before_read = @lines.size
            @fifo.read_lines(@lines)
            group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read)

            group_watcher_limit = group_watcher&.limit_lines_reached?(@path)
            @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" if group_watcher_limit

            if group_watcher_limit || limit_bytes_per_second_reached? || should_shutdown_now?
              # Just get out from tailing loop.
              read_more = false
              break
            end

            if @lines.size >= @read_lines_limit
              # not to use too much memory in case the file is very large
              read_more = true
              break
            end
          end
        rescue EOFError
          @eof = true
        end
      end

      unless @lines.empty?
        if @receive_lines.call(@lines, @watcher)
          @watcher.pe.update_pos(io.pos - @fifo.bytesize)
          @lines.clear
        else
          read_more = false
        end
      end
    end while read_more
  end
end
limit_bytes_per_second_reached?() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1068
def limit_bytes_per_second_reached?
  return false if @read_bytes_limit_per_second < 0 # not enabled by conf
  return false if @number_bytes_read < @read_bytes_limit_per_second

  @start_reading_time ||= Fluent::Clock.now
  time_spent_reading = Fluent::Clock.now - @start_reading_time
  @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")

  if time_spent_reading < 1
    true
  else
    @start_reading_time = nil
    @number_bytes_read = 0
    false
  end
end
open() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1149
def open
  io = Fluent::FileWrapper.open(@path)
  io.seek(@watcher.pe.read_pos + @fifo.bytesize)
  @metrics.opened.inc
  io
rescue RangeError
  io.close if io
  raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
rescue Errno::EACCES => e
  @log.warn "#{e}"
  nil
rescue Errno::ENOENT
  nil
end
should_shutdown_now?() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1085
def should_shutdown_now?
  # Ensure to read all remaining lines, but abort immediately if it
  # seems to take too long time.
  @shutdown_mutex.synchronize {
    return false if @shutdown_start_time.nil?
    return Fluent::Clock.now - @shutdown_start_time > @shutdown_timeout
  }
end
with_io() { |io| ... } click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 1164
def with_io
  if @open_on_every_update
    io = open
    begin
      yield io
    ensure
      io.close unless io.nil?
    end
  else
    @io ||= open
    yield @io
    @eof = true if @io.nil?
  end
rescue WatcherSetupError => e
  close
  @eof = true
  raise e
rescue
  @log.error $!.to_s
  @log.error_backtrace
  close
  @eof = true
end