class Fluent::Plugin::UnixInput
TODO: This plugin will be 3rd party plugin
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Compat::Input::new
# File lib/fluent/plugin/in_unix.rb, line 33 def initialize super @lsock = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_unix.rb, line 46 def configure(conf) super end
convert_time(time)
click to toggle source
# File lib/fluent/plugin/in_unix.rb, line 135 def convert_time(time) case time when nil, 0 Fluent::EventTime.now when Fluent::EventTime time else Fluent::EventTime.from_time(Time.at(time)) end end
listen()
click to toggle source
# File lib/fluent/plugin/in_unix.rb, line 66 def listen if File.exist?(@path) log.warn "Found existing '#{@path}'. Remove this file for in_unix plugin" File.unlink(@path) end FileUtils.mkdir_p(File.dirname(@path)) log.info "listening fluent socket on #{@path}" s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message)) s.listen(@backlog) unless @backlog.nil? s end
on_message(msg)
click to toggle source
message Entry {
1: long time 2: object record
}
message Forward {
1: string tag 2: list<Entry> entries
}
message PackedForward {
1: string tag 2: raw entries # msgpack stream of Entry
}
message Message {
1: string tag 2: long? time 3: object record
}
# File lib/fluent/plugin/in_unix.rb, line 99 def on_message(msg) unless msg.is_a?(Array) log.warn "incoming data is broken:", msg: msg return end tag = @tag || (msg[0].to_s) entries = msg[1] case entries when String # PackedForward es = Fluent::MessagePackEventStream.new(entries) router.emit_stream(tag, es) when Array # Forward es = Fluent::MultiEventStream.new entries.each {|e| record = e[1] next if record.nil? time = convert_time(e[0]) es.add(time, record) } router.emit_stream(tag, es) else # Message record = msg[2] return if record.nil? time = convert_time(msg[1]) router.emit(tag, time, record) end end
shutdown()
click to toggle source
Calls superclass method
Fluent::Compat::Input#shutdown
# File lib/fluent/plugin/in_unix.rb, line 57 def shutdown if @lsock event_loop_detach(@lsock) @lsock.close end super end
start()
click to toggle source
Calls superclass method
Fluent::Compat::Input#start
# File lib/fluent/plugin/in_unix.rb, line 50 def start super @lsock = listen event_loop_attach(@lsock) end