class Fluent::Plugin::MultiOutput

Attributes

outputs[R]
outputs_statically_created[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::PluginLoggerMixin::new
# File lib/fluent/plugin/multi_output.rb, line 43
def initialize
  super
  @outputs = []
  @outputs_statically_created = false

  @counter_mutex = Mutex.new
  # TODO: well organized counters
  @num_errors_metrics = nil
  @emit_count_metrics = nil
  @emit_records_metrics = nil
  @emit_size_metrics = nil
  # @write_count = 0
  # @rollback_count = 0
  @enable_size_metrics = false
end

Public Instance Methods

after_shutdown() click to toggle source
Calls superclass method Fluent::Plugin::Base#after_shutdown
# File lib/fluent/plugin/multi_output.rb, line 166
def after_shutdown
  super
  call_lifecycle_method(:after_shutdown, :after_shutdown?)
end
after_start() click to toggle source
Calls superclass method Fluent::Plugin::Base#after_start
# File lib/fluent/plugin/multi_output.rb, line 146
def after_start
  super
  call_lifecycle_method(:after_start, :after_started?)
end
before_shutdown() click to toggle source
Calls superclass method Fluent::Plugin::Base#before_shutdown
# File lib/fluent/plugin/multi_output.rb, line 156
def before_shutdown
  super
  call_lifecycle_method(:before_shutdown, :before_shutdown?)
end
call_lifecycle_method(method_name, checker_name) click to toggle source

But when MultiOutput plugins are created dynamically (by forest plugin or others), agent cannot find sub-plugins. So child plugins’ lifecycles MUST be controlled by MultiOutput plugin itself. TODO: this hack will be removed at v2.

# File lib/fluent/plugin/multi_output.rb, line 128
def call_lifecycle_method(method_name, checker_name)
  return if @outputs_statically_created
  @outputs.each do |o|
    begin
      log.debug "calling #{method_name} on output plugin dynamically created", type: Fluent::Plugin.lookup_type_from_class(o.class), plugin_id: o.plugin_id
      o.__send__(method_name) unless o.__send__(checker_name)
    rescue Exception => e
      log.warn "unexpected error while calling #{method_name} on output plugin dynamically created", plugin: o.class, plugin_id: o.plugin_id, error: e
      log.warn_backtrace
    end
  end
end
close() click to toggle source
Calls superclass method Fluent::Plugin::Base#close
# File lib/fluent/plugin/multi_output.rb, line 171
def close
  super
  call_lifecycle_method(:close, :closed?)
end
configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/multi_output.rb, line 90
def configure(conf)
  super

  @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors")
  @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits")
  @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records")
  @emit_size_metrics =  metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events")
  @enable_size_metrics = !!system_config.enable_size_metrics

  @stores.each do |store|
    store_conf = store.corresponding_config_element
    type = store_conf['@type']
    unless type
      raise Fluent::ConfigError, "Missing '@type' parameter in <store> section"
    end

    log.debug "adding store", type: type

    output = Fluent::Plugin.new_output(type)
    output.context_router = self.context_router
    output.configure(store_conf)
    @outputs << output
  end
end
emit_count() click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 63
def emit_count
  @emit_count_metrics.get
end
emit_events(tag, es)
Alias for: emit_sync
emit_records() click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 71
def emit_records
  @emit_records_metrics.get
end
emit_size() click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 67
def emit_size
  @emit_size_metrics.get
end
emit_sync(tag, es) click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 181
def emit_sync(tag, es)
  @emit_count_metrics.inc
  begin
    process(tag, es)
    @emit_records_metrics.add(es.size)
    @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
  rescue
    @num_errors_metrics.inc
    raise
  end
end
Also aliased as: emit_events
multi_output?() click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 86
def multi_output?
  true
end
num_errors() click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 59
def num_errors
  @num_errors_metrics.get
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 39
def process(tag, es)
  raise NotImplementedError, "BUG: output plugins MUST implement this method"
end
shutdown() click to toggle source
Calls superclass method Fluent::Plugin::Base#shutdown
# File lib/fluent/plugin/multi_output.rb, line 161
def shutdown
  super
  call_lifecycle_method(:shutdown, :shutdown?)
end
start() click to toggle source
Calls superclass method Fluent::Plugin::Base#start
# File lib/fluent/plugin/multi_output.rb, line 141
def start
  super
  call_lifecycle_method(:start, :started?)
end
static_outputs() click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 115
def static_outputs
  @outputs_statically_created = true
  @outputs
end
statistics() click to toggle source
# File lib/fluent/plugin/multi_output.rb, line 75
def statistics
  stats = {
    'num_errors' => @num_errors_metrics.get,
    'emit_records' => @emit_records_metrics.get,
    'emit_count' => @emit_count_metrics.get,
    'emit_size' => @emit_size_metrics.get,
  }

  { 'multi_output' => stats }
end
stop() click to toggle source
Calls superclass method Fluent::PluginId#stop
# File lib/fluent/plugin/multi_output.rb, line 151
def stop
  super
  call_lifecycle_method(:stop, :stopped?)
end
terminate() click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#terminate
# File lib/fluent/plugin/multi_output.rb, line 176
def terminate
  super
  call_lifecycle_method(:terminate, :terminated?)
end