class Fluent::Plugin::SyslogInput

Constants

DEFAULT_PARSER
FACILITY_MAP
SEVERITY_MAP
SYSLOG_REGEXP

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_syslog.rb, line 119
def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  if conf.has_key?('priority_key')
    log.warn "priority_key is deprecated. Use severity_key instead"
  end

  @use_default = false

  @parser = parser_create
  @parser_parse_priority = @parser.respond_to?(:with_priority) && @parser.with_priority

  if @include_source_host
    if @source_address_key
      raise Fluent::ConfigError, "specify either source_address_key or include_source_host"
    end
    @source_address_key = @source_host_key
  end
  if @source_hostname_key
    if @resolve_hostname.nil?
      @resolve_hostname = true
    elsif !@resolve_hostname # user specifies "false" in config
      raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
    end
  end

  @_event_loop_run_timeout = @blocking_timeout

  protocol = @protocol_type || @transport_config.protocol
  if @send_keepalive_packet && protocol == :udp
    raise Fluent::ConfigError, "send_keepalive_packet is available for tcp/tls"
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 155
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_syslog.rb, line 159
def start
  super

  log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type || @transport_config.protocol}"
  case @protocol_type || @transport_config.protocol
  when :udp then start_udp_server
  when :tcp then start_tcp_server
  when :tls then start_tcp_server(tls: true)
  else
    raise "BUG: invalid transport value: #{@protocol_type || @transport_config.protocol}"
  end
end
start_tcp_server(tls: false) click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 178
def start_tcp_server(tls: false)
  octet_count_frame = @frame_type == :octet_count

  delimiter = octet_count_frame ? " " : @delimiter
  delimiter_size = delimiter.size
  server_create_connection(
    tls ? :in_syslog_tls_server : :in_syslog_tcp_server, @port,
    bind: @bind,
    resolve_name: @resolve_hostname,
    send_keepalive_packet: @send_keepalive_packet
  ) do |conn|
    conn.data do |data|
      buffer = conn.buffer
      buffer << data
      pos = 0
      if octet_count_frame
        while idx = buffer.index(delimiter, pos)
          num = Integer(buffer[pos..idx])
          msg = buffer[idx + delimiter_size, num]
          if msg.size != num
            break
          end

          pos = idx + delimiter_size + num
          message_handler(msg, conn)
        end
      else
        while idx = buffer.index(delimiter, pos)
          msg = buffer[pos...idx]
          pos = idx + delimiter_size
          message_handler(msg, conn)
        end
      end
      buffer.slice!(0, pos) if pos > 0
    end
  end
end
start_udp_server() click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 172
def start_udp_server
  server_create_udp(:in_syslog_udp_server, @port, bind: @bind, max_bytes: @message_length_limit, resolve_name: @resolve_hostname) do |data, sock|
    message_handler(data.chomp, sock)
  end
end

Private Instance Methods

emit(tag, time, record) click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 270
def emit(tag, time, record)
  router.emit(tag, time, record)
rescue => e
  log.error "syslog failed to emit", error: e, tag: tag, record: Yajl.dump(record)
end
emit_unmatched(data, sock) click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 218
def emit_unmatched(data, sock)
  record = {"unmatched_line" => data}
  record[@source_address_key] = sock.remote_addr if @source_address_key
  record[@source_hostname_key] = sock.remote_host if @source_hostname_key
  emit("#{@tag}.unmatched", Fluent::EventTime.now, record)
end
message_handler(data, sock) click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 225
def message_handler(data, sock)
  pri = nil
  text = data
  unless @parser_parse_priority
    m = SYSLOG_REGEXP.match(data)
    unless m
      if @emit_unmatched_lines
        emit_unmatched(data, sock)
      end
      log.warn "invalid syslog message: #{data.dump}"
      return
    end
    pri = m[1].to_i
    text = m[2]
  end

  @parser.parse(text) do |time, record|
    unless time && record
      if @emit_unmatched_lines
        emit_unmatched(data, sock)
      end
      log.warn "failed to parse message", data: data
      return
    end

    pri ||= record.delete('pri')
    facility = FACILITY_MAP[pri >> 3]
    severity = SEVERITY_MAP[pri & 0b111]

    record[@severity_key] = severity if @severity_key
    record[@facility_key] = facility if @facility_key
    record[@source_address_key] = sock.remote_addr if @source_address_key
    record[@source_hostname_key] = sock.remote_host if @source_hostname_key

    tag = "#{@tag}.#{facility}.#{severity}"
    emit(tag, time, record)
  end
rescue => e
  if @emit_unmatched_lines
    emit_unmatched(data, sock)
  end
  log.error "invalid input", data: data, error: e
  log.error_backtrace
end