class Fluent::RPC::Server
Public Class Methods
new(endpoint, log)
click to toggle source
# File lib/fluent/rpc.rb, line 22 def initialize(endpoint, log) m = endpoint.match(/^\[?(?<host>[0-9a-zA-Z:\-\.]+)\]?:(?<port>[0-9]+)$/) raise Fluent::ConfigError, "Invalid rpc_endpoint: #{endpoint}" unless m @bind = m[:host] @port = m[:port] @log = log @server = WEBrick::HTTPServer.new( BindAddress: @bind, Port: @port, Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), AccessLog: [], ) end
Public Instance Methods
mount(path, servlet, *args)
click to toggle source
# File lib/fluent/rpc.rb, line 37 def mount(path, servlet, *args) @server.mount(path, servlet, *args) @log.debug "register #{path} RPC servlet" end
mount_proc(path, &block)
click to toggle source
# File lib/fluent/rpc.rb, line 42 def mount_proc(path, &block) @server.mount_proc(path) { |req, res| begin code, header, response = block.call(req, res) rescue => e @log.warn "failed to handle RPC request", path: path, error: e.to_s @log.warn_backtrace e.backtrace code = 500 body = { 'message '=> 'Internal Server Error', 'error' => "#{e}", 'backtrace'=> e.backtrace, } end code = 200 if code.nil? header = {'Content-Type' => 'application/json'} if header.nil? body = if response.nil? '{"ok":true}' else response.body['ok'] = code == 200 response.body.to_json end res.status = code header.each_pair { |k, v| res[k] = v } res.body = body } @log.debug "register #{path} RPC handler" end
shutdown()
click to toggle source
# File lib/fluent/rpc.rb, line 83 def shutdown if @server @server.shutdown @server = nil end if @thread @thread.join @thread = nil end end
start()
click to toggle source
# File lib/fluent/rpc.rb, line 76 def start @log.debug "listening RPC http server on http://#{@bind}:#{@port}/" @thread = Thread.new { @server.start } end