class Fluent::Plugin::FileServiceDiscovery

Constants

DEFAULT_FILE_TYPE
DEFAULT_SD_FILE_PATH
DEFAUT_WEIGHT

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_file.rb, line 38
def initialize
  super

  @file_type = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_file.rb, line 44
def configure(conf)
  super

  unless File.exist?(@path)
    raise Fluent::ConfigError, "sd_file: path=#{@path} not found"
  end

  @file_type = File.basename(@path).split('.', 2).last.to_sym
  unless %i[yaml yml json].include?(@file_type)
    @file_type = DEFAULT_FILE_TYPE
  end

  @services = fetch_server_info
end
start(queue) click to toggle source
Calls superclass method
# File lib/fluent/plugin/sd_file.rb, line 59
def start(queue)
  watcher = StatWatcher.new(@path, @log) do |_prev, _cur|
    refresh_file(queue)
  end
  event_loop_attach(watcher)

  super()
end

Private Instance Methods

fetch_server_info() click to toggle source
# File lib/fluent/plugin/sd_file.rb, line 115
def fetch_server_info
  config_data =
    begin
      File.open(@path, "r:#{@conf_encoding}:utf-8", &:read)
    rescue => e
      raise Fluent::ConfigError, "sd_file: path=#{@path} couldn't open #{e}"
    end

  parser.call(config_data).map do |s|
    Service.new(
      :file,
      s.fetch('host'),
      s.fetch('port'),
      s['name'],
      s.fetch('weight', DEFAUT_WEIGHT),
      s['standby'],
      s['username'],
      s['password'],
      s['shared_key'],
    )
  end
rescue KeyError => e
  raise Fluent::ConfigError, "#{e}. Service must have `host` and `port`"
end
parser() click to toggle source
# File lib/fluent/plugin/sd_file.rb, line 70
def parser
  @parser ||=
    case @file_type
    when :yaml, :yml
      require 'yaml'
      -> (v) { YAML.safe_load(v).map }
    when :json
      require 'json'
      -> (v) { JSON.parse(v) }
    end
end
refresh_file(queue) click to toggle source
# File lib/fluent/plugin/sd_file.rb, line 82
def refresh_file(queue)
  s =
    begin
      fetch_server_info
    rescue => e
      @log.error("sd_file: #{e}")
      return
    end

  if s.nil?
    # if any error occurs, skip this turn
    return
  end

  diff = []
  join = s - @services
  # Need service_in first to guarantee that server exist at least one all time.
  join.each do |j|
    diff << ServiceDiscovery.service_in_msg(j)
  end

  drain = @services - s
  drain.each do |d|
    diff << ServiceDiscovery.service_out_msg(d)
  end

  @services = s

  diff.each do |a|
    queue.push(a)
  end
end