class Fluent::Plugin::ElasticsearchInput

Constants

DEFAULT_RELOAD_AFTER
DEFAULT_STORAGE_TYPE
METADATA

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elasticsearch.rb, line 59
def initialize
  super
end

Public Instance Methods

backend_options() click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 101
def backend_options
  case @http_backend
  when :excon
    { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
  when :typhoeus
    require 'typhoeus'
    { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
  end
rescue LoadError => ex
  log.error_backtrace(ex.backtrace)
  raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end
client(host = nil) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 206
def client(host = nil)
  # check here to see if we already have a client connection for the given host
  connection_options = get_connection_options(host)

  @_es = nil unless is_existing_connection(connection_options[:hosts])

  @_es ||= begin
    @current_config = connection_options[:hosts].clone
    adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
    local_reload_connections = @reload_connections
    if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
      local_reload_connections = @reload_after
    end

    headers = { 'Content-Type' => "application/json" }.merge(@custom_headers)

    transport = TRANSPORT_CLASS::Transport::HTTP::Faraday.new(
      connection_options.merge(
        options: {
          reload_connections: local_reload_connections,
          reload_on_failure: @reload_on_failure,
          resurrect_after: @resurrect_after,
          logger: @transport_logger,
          transport_options: {
            headers: headers,
            request: { timeout: @request_timeout },
            ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
          },
          http: {
            user: @user,
            password: @password
          },
          sniffer_class: @sniffer_class,
        }), &adapter_conf)
    Elasticsearch::Client.new transport: transport
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elasticsearch.rb, line 63
def configure(conf)
  super

  @timestamp_parser = create_time_parser
  @backend_options = backend_options

  raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?

  if @user && m = @user.match(/%{(?<user>.*)}/)
    @user = URI.encode_www_form_component(m["user"])
  end
  if @password && m = @password.match(/%{(?<password>.*)}/)
    @password = URI.encode_www_form_component(m["password"])
  end

  @transport_logger = nil
  if @with_transporter_log
    @transport_logger = log
    log_level = conf['@log_level'] || conf['log_level']
    log.warn "Consider to specify log_level with @log_level." unless log_level
  end
  @current_config = nil
  # Specify @sniffer_class before calling #client.
  @sniffer_class = nil
  begin
    @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
  rescue Exception => ex
    raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
  end

  @options = {
    :index => @index_name,
    :scroll => @scroll,
    :size => @size
  }
  @base_query = @query
end
convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z") click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 194
def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z")
  numeric_time_parser = Fluent::NumericTimeParser.new(:float)
  Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(timestamp_key_format)
end
create_time_parser() click to toggle source

once fluent v0.14 is released we might be able to use Fluent::Parser::TimeParser, but it doesn’t quite do what we want - if gives

sec,nsec

where as we want something we can call ‘strftime` on…

# File lib/fluent/plugin/in_elasticsearch.rb, line 167
def create_time_parser
  if @timestamp_key_format
    begin
      # Strptime doesn't support all formats, but for those it does it's
      # blazingly fast.
      strptime = Strptime.new(@timestamp_key_format)
      Proc.new { |value|
        value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
        strptime.exec(value).to_time
      }
    rescue
      # Can happen if Strptime doesn't recognize the format; or
      # if strptime couldn't be required (because it's not installed -- it's
      # ruby 2 only)
      Proc.new { |value|
        value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
        DateTime.strptime(value, @timestamp_key_format).to_time
      }
    end
  else
    Proc.new { |value|
      value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
      DateTime.parse(value).to_time
    }
  end
end
get_connection_options(con_host=nil) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 126
def get_connection_options(con_host=nil)

  hosts = if con_host || @hosts
    (con_host || @hosts).split(',').map do |host_str|
      # Support legacy hosts format host:port,host:port,host:port...
      if host_str.match(%r{^[^:]+(\:\d+)?$})
        {
          host:   host_str.split(':')[0],
          port:   (host_str.split(':')[1] || @port).to_i,
          scheme: @scheme.to_s
        }
      else
        # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
        uri = URI(get_escaped_userinfo(host_str))
        %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
          hash
        end
      end
    end.compact
  else
    [{host: @host, port: @port, scheme: @scheme.to_s}]
  end.each do |host|
    host.merge!(user: @user, password: @password) if !host[:user] && @user
    host.merge!(path: @path) if !host[:path] && @path
  end

  {
    hosts: hosts
  }
end
get_escaped_userinfo(host_str) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 114
def get_escaped_userinfo(host_str)
  if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
    m["scheme"] +
      URI.encode_www_form_component(m["user"]) +
      ':' +
      URI.encode_www_form_component(m["password"]) +
      m["path"]
  else
    host_str
  end
end
is_existing_connection(host) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 244
def is_existing_connection(host)
  # check if the host provided match the current connection
  return false if @_es.nil?
  return false if @current_config.nil?
  return false if host.length != @current_config.length

  for i in 0...host.length
    if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
      return false
    end
  end

  return true
end
parse_time(value, event_time, tag) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 199
def parse_time(value, event_time, tag)
  @timestamp_parser.call(value)
rescue => e
  router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
  return Time.at(event_time).to_time
end
process_events(hit, es) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 301
def process_events(hit, es)
  event = hit["_source"]
  time = Fluent::Engine.now
  if @parse_timestamp
    if event.has_key?(TIMESTAMP_FIELD)
      rts = event[TIMESTAMP_FIELD]
      time = parse_time(rts, time, @tag)
    end
  end
  if @docinfo
    docinfo_target = event[@docinfo_target] || {}

    unless docinfo_target.is_a?(Hash)
      raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
    end

    @docinfo_fields.each do |field|
      docinfo_target[field] = hit[field]
    end

    event[@docinfo_target] = docinfo_target
  end
  es.add(time, event)
end
process_next_scroll_request(es, scroll_id) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 295
def process_next_scroll_request(es, scroll_id)
  result = process_scroll_request(scroll_id)
  result['hits']['hits'].each { |hit| process_events(hit, es) }
  {'has_hits' => result['hits']['hits'].any?, '_scroll_id' => result['_scroll_id']}
end
process_scroll_request(scroll_id) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 291
def process_scroll_request(scroll_id)
  client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
run() click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 259
def run
  return run_slice if @num_slices <= 1

  log.warn("Large slice number is specified:(#{@num_slices}). Consider reducing num_slices") if @num_slices > 8

  @num_slices.times.map do |slice_id|
    thread_create(:"in_elasticsearch_thread_#{slice_id}") do
      run_slice(slice_id)
    end
  end
end
run_slice(slice_id=nil) click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 271
def run_slice(slice_id=nil)
  slice_query = @base_query
  slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
  result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
  es = Fluent::MultiEventStream.new

  result["hits"]["hits"].each {|hit| process_events(hit, es)}
  has_hits = result['hits']['hits'].any?
  scroll_id = result['_scroll_id']

  while has_hits && scroll_id
    result = process_next_scroll_request(es, scroll_id)
    has_hits = result['has_hits']
    scroll_id = result['_scroll_id']
  end

  router.emit_stream(@tag, es)
  client.clear_scroll(scroll_id: scroll_id) if scroll_id
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elasticsearch.rb, line 158
def start
  super

  timer_execute(:in_elasticsearch_timer, @interval, repeat: @repeat, &method(:run))
end