[Groonga-commit] groonga/fluent-plugin-groonga [master] Support non buffer mode

Zurück zum Archiv-Index

Kouhei Sutou null+****@clear*****
Fri Jan 18 16:28:33 JST 2013


Kouhei Sutou	2013-01-18 16:28:33 +0900 (Fri, 18 Jan 2013)

  New Revision: b5a33636f91a9b25ef559835983b80c8fc21af57
  https://github.com/groonga/fluent-plugin-groonga/commit/b5a33636f91a9b25ef559835983b80c8fc21af57

  Log:
    Support non buffer mode

  Modified files:
    lib/fluent/plugin/out_groonga.rb

  Modified: lib/fluent/plugin/out_groonga.rb (+89 -30)
===================================================================
--- lib/fluent/plugin/out_groonga.rb    2012-12-28 16:44:07 +0900 (d91d036)
+++ lib/fluent/plugin/out_groonga.rb    2013-01-18 16:28:33 +0900 (f2eb3fd)
@@ -18,73 +18,132 @@
 require "fileutils"
 
 module Fluent
-  class GroongaOutput < BufferedOutput
+  class GroongaOutput < Output
     Plugin.register_output("groonga", self)
 
     def initialize
       super
     end
 
+    BufferedOutput.config_params.each do |name, (block, options)|
+      if options[:type]
+        config_param(name, options[:type], options)
+      else
+        config_param(name, options, &block)
+      end
+    end
+
     config_param :protocol, :string, :default => "http"
     config_param :table, :string, :default => nil
 
     def configure(conf)
       super
-      case @protocol
-      when "http"
-        @client = HTTPClient.new
-      when "gqtp"
-        @client = GQTPClient.new
-      when "command"
-        @client = CommandClient.new
-      end
+      @client = create_client(@protocol)
       @client.configure(conf)
+
+      @emitter = Emitter.new(@client, @table)
+      @output = create_output(@buffer_type, @emitter)
+      @output.configure(conf)
     end
 
     def start
       super
       @client.start
+      @output.start
     end
 
     def shutdown
       super
+      @output.shutdown
       @client.shutdown
     end
 
-    def format(tag, time, record)
-      [tag, time, record].to_msgpack
+    def emit(tag, event_stream, chain)
+      @output.emit(tag, event_stream, chain)
+    end
+
+    def create_client(protocol)
+      case protocol
+      when "http"
+        HTTPClient.new
+      when "gqtp"
+        GQTPClient.new
+      when "command"
+        CommandClient.new
+      end
+    end
+
+    def create_output(buffer_type, emitter)
+      if buffer_type == "none"
+        RawGroongaOutput.new(emitter)
+      else
+        BufferedGroongaOutput.new(emitter)
+      end
     end
 
-    def write(chunk)
-      chunk.msgpack_each do |tag, time, arguments|
+    class Emitter
+      def initialize(client, table)
+        @client = client
+        @table = table
+      end
+
+      def emit(tag, record)
         if /\Agroonga\.command\./ =~ tag
           name = $POSTMATCH
-          send_command(name, arguments)
+          send_command(name, record)
         else
-          store_chunk(chunk)
+          store_chunk(data)
         end
       end
+
+      private
+      def send_command(name, arguments)
+        command_class = Groonga::Command.find(name)
+        command = command_class.new(name, arguments)
+        @client.send(command)
+      end
+
+      def store_chunk(value)
+        return if****@table*****?
+
+        values = [value]
+        arguments = {
+          "table" => @table,
+          "values" => Yajl::Enocder.encode(values),
+        }
+        send_command("load", arguments)
+      end
     end
 
-    private
-    def send_command(name, arguments)
-      command_class = Groonga::Command.find(name)
-      command = command_class.new(name, arguments)
-      @client.send(command)
+    class RawGroongaOutput < Output
+      def initialize(emitter)
+        @emitter = emitter
+        super()
+      end
+
+      def emit(tag, event_stream, chain)
+        event_stream.each do |time, record|
+          @emitter.emit(tag, record)
+        end
+        chain.next
+      end
     end
 
-    def store_chunk(chunk)
-      return if****@table*****?
+    class BufferedGroongaOutput < BufferedOutput
+      def initialize(emitter)
+        @emitter = emitter
+        super()
+      end
+
+      def format(tag, time, record)
+        [tag, time, record].to_msgpack
+      end
 
-      values = []
-      chunk.each do |time, value|
-        values << value
+      def write(chunk)
+        chunk.msgpack_each do |tag, time, record|
+          @emitter.emit(tag, record)
+        end
       end
-      arguments = {
-        "table" => @table,
-        "values" => Yajl::Enocder.encode(values),
-      }
-      send_command("load", arguments)
     end
 
     class HTTPClient
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Zurück zum Archiv-Index