Kouhei Sutou
null+****@clear*****
Fri Jan 18 16:28:33 JST 2013
Kouhei Sutou 2013-01-18 16:28:33 +0900 (Fri, 18 Jan 2013) New Revision: b5a33636f91a9b25ef559835983b80c8fc21af57 https://github.com/groonga/fluent-plugin-groonga/commit/b5a33636f91a9b25ef559835983b80c8fc21af57 Log: Support non buffer mode Modified files: lib/fluent/plugin/out_groonga.rb Modified: lib/fluent/plugin/out_groonga.rb (+89 -30) =================================================================== --- lib/fluent/plugin/out_groonga.rb 2012-12-28 16:44:07 +0900 (d91d036) +++ lib/fluent/plugin/out_groonga.rb 2013-01-18 16:28:33 +0900 (f2eb3fd) @@ -18,73 +18,132 @@ require "fileutils" module Fluent - class GroongaOutput < BufferedOutput + class GroongaOutput < Output Plugin.register_output("groonga", self) def initialize super end + BufferedOutput.config_params.each do |name, (block, options)| + if options[:type] + config_param(name, options[:type], options) + else + config_param(name, options, &block) + end + end + config_param :protocol, :string, :default => "http" config_param :table, :string, :default => nil def configure(conf) super - case @protocol - when "http" - @client = HTTPClient.new - when "gqtp" - @client = GQTPClient.new - when "command" - @client = CommandClient.new - end + @client = create_client(@protocol) @client.configure(conf) + + @emitter = Emitter.new(@client, @table) + @output = create_output(@buffer_type, @emitter) + @output.configure(conf) end def start super @client.start + @output.start end def shutdown super + @output.shutdown @client.shutdown end - def format(tag, time, record) - [tag, time, record].to_msgpack + def emit(tag, event_stream, chain) + @output.emit(tag, event_stream, chain) + end + + def create_client(protocol) + case protocol + when "http" + HTTPClient.new + when "gqtp" + GQTPClient.new + when "command" + CommandClient.new + end + end + + def create_output(buffer_type, emitter) + if buffer_type == "none" + RawGroongaOutput.new(emitter) + else + BufferedGroongaOutput.new(emitter) + end end - def write(chunk) - chunk.msgpack_each do |tag, time, arguments| + class Emitter + def initialize(client, table) + @client = client + @table = table + end + + def emit(tag, record) if /\Agroonga\.command\./ =~ tag name = $POSTMATCH - send_command(name, arguments) + send_command(name, record) else - store_chunk(chunk) + store_chunk(data) end end + + private + def send_command(name, arguments) + command_class = Groonga::Command.find(name) + command = command_class.new(name, arguments) + @client.send(command) + end + + def store_chunk(value) + return if****@table*****? + + values = [value] + arguments = { + "table" => @table, + "values" => Yajl::Enocder.encode(values), + } + send_command("load", arguments) + end end - private - def send_command(name, arguments) - command_class = Groonga::Command.find(name) - command = command_class.new(name, arguments) - @client.send(command) + class RawGroongaOutput < Output + def initialize(emitter) + @emitter = emitter + super() + end + + def emit(tag, event_stream, chain) + event_stream.each do |time, record| + @emitter.emit(tag, record) + end + chain.next + end end - def store_chunk(chunk) - return if****@table*****? + class BufferedGroongaOutput < BufferedOutput + def initialize(emitter) + @emitter = emitter + super() + end + + def format(tag, time, record) + [tag, time, record].to_msgpack + end - values = [] - chunk.each do |time, value| - values << value + def write(chunk) + chunk.msgpack_each do |tag, time, record| + @emitter.emit(tag, record) + end end - arguments = { - "table" => @table, - "values" => Yajl::Enocder.encode(values), - } - send_command("load", arguments) end class HTTPClient -------------- next part -------------- HTML����������������������������...Download