class Fluent::Counter::Server
Constants
- DEFAULT_ADDR
- DEFAULT_PORT
Public Class Methods
new(name, opt = {})
click to toggle source
# File lib/fluent/counter/server.rb, line 29 def initialize(name, opt = {}) raise 'Counter server name is invalid' unless Validator::VALID_NAME.match?(name) @name = name @opt = opt @addr = @opt[:addr] || DEFAULT_ADDR @port = @opt[:port] || DEFAULT_PORT @loop = @opt[:loop] || Coolio::Loop.new @log = @opt[:log] || $log @store = Fluent::Counter::Store.new(opt) @mutex_hash = MutexHash.new(@store) @server = Coolio::TCPServer.new(@addr, @port, Handler, method(:on_message)) @thread = nil @running = false end
Public Instance Methods
on_message(data)
click to toggle source
# File lib/fluent/counter/server.rb, line 68 def on_message(data) errors = Validator.request(data) unless errors.empty? return { 'id' => data['id'], 'data' => [], 'errors' => errors } end result = safe_run do send(data['method'], data['params'], data['scope'], data['options']) end result.merge('id' => data['id']) rescue => e @log.error e.to_s end
start()
click to toggle source
# File lib/fluent/counter/server.rb, line 46 def start @server.attach(@loop) @thread = Thread.new do @running = true @loop.run(0.5) @running = false end @log.debug("starting counter server #{@addr}:#{@port}") @mutex_hash.start self end
stop()
click to toggle source
# File lib/fluent/counter/server.rb, line 58 def stop # This `sleep` for a test to wait for a `@loop` to begin to run sleep 0.1 @server.close @loop.stop if @running @mutex_hash.stop @thread.join if @thread @log.debug("calling stop in counter server #{@addr}:#{@port}") end
Private Instance Methods
delete(params, scope, options)
click to toggle source
# File lib/fluent/counter/server.rb, line 126 def delete(params, scope, options) validator = Fluent::Counter::ArrayValidator.new(:empty, :key) valid_params, errors = validator.call(params) res = Response.new(errors) keys = valid_params.map { |vp| Store.gen_key(scope, vp) } do_delete = lambda do |store, key| begin v = store.delete(key) @log.debug("delete a key: #{key}") res.push_data v rescue => e res.push_error e end end if options['random'] @mutex_hash.synchronize_keys(*keys, &do_delete) else @mutex_hash.synchronize(*keys, &do_delete) end res.to_hash end
establish(params, _scope, _options)
click to toggle source
# File lib/fluent/counter/server.rb, line 84 def establish(params, _scope, _options) validator = Fluent::Counter::ArrayValidator.new(:empty, :scope) valid_params, errors = validator.call(params) res = Response.new(errors) if scope = valid_params.first new_scope = "#{@name}\t#{scope}" res.push_data new_scope @log.debug("Establish new key: #{new_scope}") end res.to_hash end
get(params, scope, _options)
click to toggle source
# File lib/fluent/counter/server.rb, line 206 def get(params, scope, _options) validator = Fluent::Counter::ArrayValidator.new(:empty, :key) valid_params, errors = validator.call(params) res = Response.new(errors) keys = valid_params.map { |vp| Store.gen_key(scope, vp) } keys.each do |key| begin v = @store.get(key, raise_error: true) @log.debug("Get counter value: #{key}") res.push_data v rescue => e res.push_error e end end res.to_hash end
inc(params, scope, options)
click to toggle source
# File lib/fluent/counter/server.rb, line 151 def inc(params, scope, options) validate_param = [:empty, :name, :value] validate_param << :reset_interval if options['force'] validator = Fluent::Counter::HashValidator.new(*validate_param) valid_params, errors = validator.call(params) res = Response.new(errors) key_hash = valid_params.reduce({}) do |acc, vp| acc.merge(Store.gen_key(scope, vp['name']) => vp) end do_inc = lambda do |store, key| begin param = key_hash[key] v = store.inc(key, param, force: options['force']) @log.debug("Increment #{key} by #{param['value']}") res.push_data v rescue => e res.push_error e end end if options['random'] @mutex_hash.synchronize_keys(*(key_hash.keys), &do_inc) else @mutex_hash.synchronize(*(key_hash.keys), &do_inc) end res.to_hash end
init(params, scope, options)
click to toggle source
# File lib/fluent/counter/server.rb, line 98 def init(params, scope, options) validator = Fluent::Counter::HashValidator.new(:empty, :name, :reset_interval) valid_params, errors = validator.call(params) res = Response.new(errors) key_hash = valid_params.reduce({}) do |acc, vp| acc.merge(Store.gen_key(scope, vp['name']) => vp) end do_init = lambda do |store, key| begin param = key_hash[key] v = store.init(key, param, ignore: options['ignore']) @log.debug("Create new key: #{param['name']}") res.push_data v rescue => e res.push_error e end end if options['random'] @mutex_hash.synchronize_keys(*(key_hash.keys), &do_init) else @mutex_hash.synchronize(*(key_hash.keys), &do_init) end res.to_hash end
reset(params, scope, options)
click to toggle source
# File lib/fluent/counter/server.rb, line 181 def reset(params, scope, options) validator = Fluent::Counter::ArrayValidator.new(:empty, :key) valid_params, errors = validator.call(params) res = Response.new(errors) keys = valid_params.map { |vp| Store.gen_key(scope, vp) } do_reset = lambda do |store, key| begin v = store.reset(key) @log.debug("Reset #{key}'s' counter value") res.push_data v rescue => e res.push_error e end end if options['random'] @mutex_hash.synchronize_keys(*keys, &do_reset) else @mutex_hash.synchronize(*keys, &do_reset) end res.to_hash end
safe_run() { || ... }
click to toggle source
# File lib/fluent/counter/server.rb, line 224 def safe_run yield rescue => e { 'errors' => [InternalServerError.new(e).to_hash], 'data' => [] } end