class Fluent::Plugin::TailInput::PositionFile

Constants

POSITION_FILE_ENTRY_REGEX
UNWATCHED_POSITION

Public Class Methods

load(file, follow_inodes, existing_targets, logger:) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 25
def self.load(file, follow_inodes, existing_targets, logger:)
  pf = new(file, follow_inodes, logger: logger)
  pf.load(existing_targets)
  pf
end
new(file, follow_inodes, logger: nil) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 31
def initialize(file, follow_inodes, logger: nil)
  @file = file
  @logger = logger
  @file_mutex = Mutex.new
  @map = {}
  @follow_inodes = follow_inodes
end

Public Instance Methods

[](target_info) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 39
def [](target_info)
  if m = @map[@follow_inodes ? target_info.ino : target_info.path]
    return m
  end

  @file_mutex.synchronize {
    @file.seek(0, IO::SEEK_END)
    seek = @file.pos + target_info.path.bytesize + 1
    @file.write "#{target_info.path}\t0000000000000000\t0000000000000000\n"
    if @follow_inodes
      @map[target_info.ino] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
    else
      @map[target_info.path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
    end
  }
end
load(existing_targets = nil) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 68
def load(existing_targets = nil)
  compact(existing_targets)

  map = {}
  @file_mutex.synchronize do
    @file.pos = 0

    @file.each_line do |line|
      m = POSITION_FILE_ENTRY_REGEX.match(line)
      next if m.nil?

      path = m[1]
      pos = m[2].to_i(16)
      ino = m[3].to_i(16)
      seek = @file.pos - line.bytesize + path.bytesize + 1
      if @follow_inodes
        map[ino] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
      else
        map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
      end
    end
  end

  @map = map
end
try_compact() click to toggle source

This method is similer to compact but it tries to get less lock to avoid a lock contention

# File lib/fluent/plugin/in_tail/position_file.rb, line 95
def try_compact
  last_modified = nil
  size = nil

  @file_mutex.synchronize do
    size = @file.size
    last_modified = @file.mtime
  end

  entries = fetch_compacted_entries
  @logger&.debug "Compacted entries: ", entries.keys

  @file_mutex.synchronize do
    if last_modified == @file.mtime && size == @file.size
      @file.pos = 0
      @file.truncate(0)
      @file.write(entries.values.map(&:to_entry_fmt).join)

      # entry contains path/ino key and value.
      entries.each do |key, val|
        if (m = @map[key])
          m.seek = val.seek
        end
      end
    else
      # skip
    end
  end
end
unwatch(target_info) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 64
def unwatch(target_info)
  unwatch_key(@follow_inodes ? target_info.ino : target_info.path)
end
unwatch_removed_targets(existing_targets) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 56
def unwatch_removed_targets(existing_targets)
  @map.reject { |key, entry|
    existing_targets.key?(key)
  }.each_key { |key|
    unwatch_key(key)
  }
end

Private Instance Methods

compact(existing_targets = nil) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 133
def compact(existing_targets = nil)
  @file_mutex.synchronize do
    entries = fetch_compacted_entries
    @logger&.debug "Compacted entries: ", entries.keys

    if existing_targets
      entries = remove_deleted_files_entries(entries, existing_targets)
      @logger&.debug "Remove missing entries.",
                     existing_targets: existing_targets.keys,
                     entries_after_removing: entries.keys
    end

    @file.pos = 0
    @file.truncate(0)
    @file.write(entries.values.map(&:to_entry_fmt).join)
  end
end
fetch_compacted_entries() click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 151
def fetch_compacted_entries
  entries = {}

  @file.pos = 0
  file_pos = 0
  @file.each_line do |line|
    m = POSITION_FILE_ENTRY_REGEX.match(line)
    if m.nil?
      @logger.warn "Unparsable line in pos_file: #{line}" if @logger
      next
    end

    path = m[1]
    pos = m[2].to_i(16)
    ino = m[3].to_i(16)
    if pos == UNWATCHED_POSITION
      @logger.debug "Remove unwatched line from pos_file: #{line}" if @logger
    else
      if @follow_inodes
        @logger&.warn("#{path} (inode: #{ino}) already exists. use latest one: deleted #{entries[ino]}") if entries.include?(ino)
        entries[ino] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1)
      else
        @logger&.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if entries.include?(path)
        entries[path] = Entry.new(path, pos, ino, file_pos + path.bytesize + 1)
      end
      file_pos += line.size
    end
  end

  entries
end
remove_deleted_files_entries(existent_entries, existing_targets) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 183
def remove_deleted_files_entries(existent_entries, existing_targets)
  existent_entries.select { |path_or_ino|
    existing_targets.key?(path_or_ino)
  }
end
unwatch_key(key) click to toggle source
# File lib/fluent/plugin/in_tail/position_file.rb, line 127
def unwatch_key(key)
  if (entry = @map.delete(key))
    entry.update_pos(UNWATCHED_POSITION)
  end
end