class Fluent::Agent

Agent is a resource unit who manages emittable plugins

Next step: ‘fluentd/root_agent.rb` Next step: `fluentd/label.rb`

Attributes

context[R]
error_collector[R]
event_router[R]
filters[R]
log[R]
outputs[R]

Public Class Methods

new(log:) click to toggle source
Calls superclass method Fluent::Configurable::new
# File lib/fluent/agent.rb, line 32
def initialize(log:)
  super()

  @context = nil
  @outputs = []
  @filters = []

  @lifecycle_control_list = nil
  # lifecycle_control_list is the list of plugins in this agent, and ordered
  # from plugins which DOES emit, then DOESN'T emit
  # (input -> output w/ router -> filter -> output w/o router)
  # for start: use this order DESC
  #   (because plugins which appears later in configurations will receive events from plugins which appears earlier)
  # for stop/before_shutdown/shutdown/after_shutdown/close/terminate: use this order ASC
  @lifecycle_cache = nil

  @log = log
  @event_router = EventRouter.new(NoMatchMatch.new(log), self)
  @error_collector = nil
end

Public Instance Methods

add_filter(type, pattern, conf) click to toggle source
# File lib/fluent/agent.rb, line 148
def add_filter(type, pattern, conf)
  log_type = conf.for_this_worker? ? :default : :worker0
  log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  filter = Plugin.new_filter(type)
  filter.context_router = @event_router
  filter.configure(conf)
  @filters << filter
  @event_router.add_rule(pattern, filter)

  filter
end
add_match(type, pattern, conf) click to toggle source
# File lib/fluent/agent.rb, line 126
def add_match(type, pattern, conf)
  log_type = conf.for_this_worker? ? :default : :worker0
  log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  output = Plugin.new_output(type)
  output.context_router = @event_router
  output.configure(conf)
  @outputs << output
  if output.respond_to?(:outputs) && output.respond_to?(:multi_output?) && output.multi_output?
    # TODO: ruby 2.3 or later: replace `output.respond_to?(:multi_output?) && output.multi_output?` with output&.multi_output?
    outputs = if output.respond_to?(:static_outputs)
                output.static_outputs
              else
                output.outputs
              end
    @outputs.push(*outputs)
  end
  @event_router.add_rule(pattern, output)

  output
end
configure(conf) click to toggle source
Calls superclass method Fluent::Configurable#configure
# File lib/fluent/agent.rb, line 60
def configure(conf)
  super

  # initialize <match> and <filter> elements
  conf.elements('filter', 'match').each { |e|
    if !Fluent::Engine.supervisor_mode && e.for_another_worker?
      next
    end
    pattern = e.arg.empty? ? '**' : e.arg
    type = e['@type']
    raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
    if e.name == 'filter'
      add_filter(type, pattern, e)
    else
      add_match(type, pattern, e)
    end
  }
end
emit_error_event(tag, time, record, error) click to toggle source

For handling invalid record

# File lib/fluent/agent.rb, line 162
def emit_error_event(tag, time, record, error)
end
handle_emits_error(tag, es, error) click to toggle source
# File lib/fluent/agent.rb, line 165
def handle_emits_error(tag, es, error)
end
lifecycle(desc: false) { |instance, display_kind| ... } click to toggle source
# File lib/fluent/agent.rb, line 107
def lifecycle(desc: false)
  kind_list = if desc
                [:output, :filter, :output_with_router]
              else
                [:output_with_router, :filter, :output]
              end
  kind_list.each do |kind|
    list = if desc
             lifecycle_control_list[kind].reverse
           else
             lifecycle_control_list[kind]
           end
    display_kind = (kind == :output_with_router ? :output : kind)
    list.each do |instance|
      yield instance, display_kind
    end
  end
end
lifecycle_control_list() click to toggle source
# File lib/fluent/agent.rb, line 79
def lifecycle_control_list
  return @lifecycle_control_list if @lifecycle_control_list

  lifecycle_control_list = {
    input: [],
    output_with_router: [],
    filter: [],
    output: [],
  }
  if self.respond_to?(:inputs)
    inputs.each do |i|
      lifecycle_control_list[:input] << i
    end
  end
  outputs.each do |o|
    if o.has_router?
      lifecycle_control_list[:output_with_router] << o
    else
      lifecycle_control_list[:output] << o
    end
  end
  filters.each do |f|
    lifecycle_control_list[:filter] << f
  end

  @lifecycle_control_list = lifecycle_control_list
end