class Fluent::Counter::Server

Constants

DEFAULT_ADDR
DEFAULT_PORT

Public Class Methods

new(name, opt = {}) click to toggle source
# File lib/fluent/counter/server.rb, line 29
def initialize(name, opt = {})
  raise 'Counter server name is invalid' unless Validator::VALID_NAME.match?(name)
  @name = name
  @opt = opt
  @addr = @opt[:addr] || DEFAULT_ADDR
  @port = @opt[:port] || DEFAULT_PORT
  @loop = @opt[:loop] || Coolio::Loop.new
  @log =  @opt[:log] || $log

  @store = Fluent::Counter::Store.new(opt)
  @mutex_hash = MutexHash.new(@store)

  @server = Coolio::TCPServer.new(@addr, @port, Handler, method(:on_message))
  @thread = nil
  @running = false
end

Public Instance Methods

on_message(data) click to toggle source
# File lib/fluent/counter/server.rb, line 68
def on_message(data)
  errors = Validator.request(data)
  unless errors.empty?
    return { 'id' => data['id'], 'data' => [], 'errors' => errors }
  end

  result = safe_run do
    send(data['method'], data['params'], data['scope'], data['options'])
  end
  result.merge('id' => data['id'])
rescue => e
  @log.error e.to_s
end
start() click to toggle source
# File lib/fluent/counter/server.rb, line 46
def start
  @server.attach(@loop)
  @thread = Thread.new do
    @running = true
    @loop.run(0.5)
    @running = false
  end
  @log.debug("starting counter server #{@addr}:#{@port}")
  @mutex_hash.start
  self
end
stop() click to toggle source
# File lib/fluent/counter/server.rb, line 58
def stop
  # This `sleep` for a test to wait for a `@loop` to begin to run
  sleep 0.1
  @server.close
  @loop.stop if @running
  @mutex_hash.stop
  @thread.join if @thread
  @log.debug("calling stop in counter server #{@addr}:#{@port}")
end

Private Instance Methods

delete(params, scope, options) click to toggle source
# File lib/fluent/counter/server.rb, line 126
def delete(params, scope, options)
  validator = Fluent::Counter::ArrayValidator.new(:empty, :key)
  valid_params, errors = validator.call(params)
  res = Response.new(errors)
  keys = valid_params.map { |vp| Store.gen_key(scope, vp) }

  do_delete = lambda do |store, key|
    begin
      v = store.delete(key)
      @log.debug("delete a key: #{key}")
      res.push_data v
    rescue => e
      res.push_error e
    end
  end

  if options['random']
    @mutex_hash.synchronize_keys(*keys, &do_delete)
  else
    @mutex_hash.synchronize(*keys, &do_delete)
  end

  res.to_hash
end
establish(params, _scope, _options) click to toggle source
# File lib/fluent/counter/server.rb, line 84
def establish(params, _scope, _options)
  validator = Fluent::Counter::ArrayValidator.new(:empty, :scope)
  valid_params, errors = validator.call(params)
  res = Response.new(errors)

  if scope = valid_params.first
    new_scope = "#{@name}\t#{scope}"
    res.push_data new_scope
    @log.debug("Establish new key: #{new_scope}")
  end

  res.to_hash
end
get(params, scope, _options) click to toggle source
# File lib/fluent/counter/server.rb, line 206
def get(params, scope, _options)
  validator = Fluent::Counter::ArrayValidator.new(:empty, :key)
  valid_params, errors = validator.call(params)
  res = Response.new(errors)

  keys = valid_params.map { |vp| Store.gen_key(scope, vp) }
  keys.each do |key|
    begin
      v = @store.get(key, raise_error: true)
      @log.debug("Get counter value: #{key}")
      res.push_data v
    rescue => e
      res.push_error e
    end
  end
  res.to_hash
end
inc(params, scope, options) click to toggle source
# File lib/fluent/counter/server.rb, line 151
def inc(params, scope, options)
  validate_param = [:empty, :name, :value]
  validate_param << :reset_interval if options['force']
  validator = Fluent::Counter::HashValidator.new(*validate_param)
  valid_params, errors = validator.call(params)
  res = Response.new(errors)
  key_hash = valid_params.reduce({}) do |acc, vp|
    acc.merge(Store.gen_key(scope, vp['name']) => vp)
  end

  do_inc = lambda do |store, key|
    begin
      param = key_hash[key]
      v = store.inc(key, param, force: options['force'])
      @log.debug("Increment #{key} by #{param['value']}")
      res.push_data v
    rescue => e
      res.push_error e
    end
  end

  if options['random']
    @mutex_hash.synchronize_keys(*(key_hash.keys), &do_inc)
  else
    @mutex_hash.synchronize(*(key_hash.keys), &do_inc)
  end

  res.to_hash
end
init(params, scope, options) click to toggle source
# File lib/fluent/counter/server.rb, line 98
def init(params, scope, options)
  validator = Fluent::Counter::HashValidator.new(:empty, :name, :reset_interval)
  valid_params, errors = validator.call(params)
  res = Response.new(errors)
  key_hash = valid_params.reduce({}) do |acc, vp|
    acc.merge(Store.gen_key(scope, vp['name']) => vp)
  end

  do_init = lambda do |store, key|
    begin
      param = key_hash[key]
      v = store.init(key, param, ignore: options['ignore'])
      @log.debug("Create new key: #{param['name']}")
      res.push_data v
    rescue => e
      res.push_error e
    end
  end

  if options['random']
    @mutex_hash.synchronize_keys(*(key_hash.keys), &do_init)
  else
    @mutex_hash.synchronize(*(key_hash.keys), &do_init)
  end

  res.to_hash
end
reset(params, scope, options) click to toggle source
# File lib/fluent/counter/server.rb, line 181
def reset(params, scope, options)
  validator = Fluent::Counter::ArrayValidator.new(:empty, :key)
  valid_params, errors = validator.call(params)
  res = Response.new(errors)
  keys = valid_params.map { |vp| Store.gen_key(scope, vp) }

  do_reset = lambda do |store, key|
    begin
      v = store.reset(key)
      @log.debug("Reset #{key}'s' counter value")
      res.push_data v
    rescue => e
      res.push_error e
    end
  end

  if options['random']
    @mutex_hash.synchronize_keys(*keys, &do_reset)
  else
    @mutex_hash.synchronize(*keys, &do_reset)
  end

  res.to_hash
end
safe_run() { || ... } click to toggle source
# File lib/fluent/counter/server.rb, line 224
def safe_run
  yield
rescue => e
  {
    'errors' => [InternalServerError.new(e).to_hash],
    'data' => []
  }
end