[Groonga-commit] groonga/fluent-plugin-groonga at 179473a [master] Support auto schema definition

Zurück zum Archiv-Index

Kouhei Sutou null+****@clear*****
Mon Oct 20 16:29:41 JST 2014


Kouhei Sutou	2014-10-20 16:29:41 +0900 (Mon, 20 Oct 2014)

  New Revision: 179473a0439c36d956c39ea1a38e6ad64291df75
  https://github.com/groonga/fluent-plugin-groonga/commit/179473a0439c36d956c39ea1a38e6ad64291df75

  Message:
    Support auto schema definition

  Added files:
    sample/store.conf
    test/output/test_type_guesser.rb
  Modified files:
    lib/fluent/plugin/out_groonga.rb
    test/test_output.rb

  Modified: lib/fluent/plugin/out_groonga.rb (+176 -0)
===================================================================
--- lib/fluent/plugin/out_groonga.rb    2014-10-20 15:00:25 +0900 (90bfa60)
+++ lib/fluent/plugin/out_groonga.rb    2014-10-20 16:29:41 +0900 (9e9a64f)
@@ -50,10 +50,12 @@ module Fluent
     def start
       super
       @client.start
+      @emitter.start
     end
 
     def shutdown
       super
+      @emitter.shutdown
       @client.shutdown
     end
 
@@ -75,10 +77,182 @@ module Fluent
       end
     end
 
+    class Schema
+      def initialize(client, table_name)
+        @client = client
+        @table_name = table_name
+        @table = nil
+        @columns = nil
+      end
+
+      def populate
+        # TODO
+      end
+
+      def update(records)
+        ensure_table
+        ensure_columns
+
+        nonexistent_columns = {}
+        records.each do |record|
+          record.each do |key, value|
+            column = @columns[key]
+            if column.nil?
+              nonexistent_columns[key] ||= []
+              nonexistent_columns[key] << value
+            end
+          end
+        end
+
+        nonexistent_columns.each do |name, values|
+          @columns[name] = create_column(name, values)
+        end
+      end
+
+      private
+      def ensure_table
+        return if @table
+
+        table_list =****@clien*****("table_list")
+        target_table = table_list.find do |table|
+          table.name == @table_name
+        end
+        if target_table
+          @table = Table.new(@table_name, target_table.domain)
+        else
+          # TODO: Check response
+          @client.send("table_create",
+                       "name"  => @table_name,
+                       "flags" => "TABLE_NO_KEY")
+          @table = Table.new(@table_name, nil)
+        end
+      end
+
+      def ensure_columns
+        return if @columns
+
+        column_list =****@clien*****("column_list", "table" => @table_name)
+        @columns = {}
+        column_list.each do |column|
+          vector_p = column.flags.split("|").include?("COLUMN_VECTOR")
+          @columns[column.name] = Column.new(column.name,
+                                             column.range,
+                                             vector_p)
+        end
+      end
+
+      def create_column(name, sample_values)
+        guesser = TypeGuesser.new(sample_values)
+        value_type = guesser.guess
+        vector_p = guesser.vector?
+        if vector_p
+          flags = "COLUMN_VECTOR"
+        else
+          flags = "COLUMN_SCALAR"
+        end
+        # TODO: Check response
+        @client.send("column_create",
+                     "table" => @table_name,
+                     "name" => name,
+                     "flags" => flags,
+                     "type" => value_type)
+        @columns[name] = Column.new(name, value_type, vector_p)
+      end
+
+      class TypeGuesser
+        def initialize(sample_values)
+          @sample_values = sample_values
+        end
+
+        def guess
+          return "Time"          if time_values?
+          return "Int32"         if int32_values?
+          return "Int64"         if int64_values?
+          return "Float"         if float_values?
+          return "WGS84GeoPoint" if geo_point_values?
+
+          "Text"
+        end
+
+        def vector?
+          @sample_values.any? do |sample_value|
+            sample_value.is_a?(Array)
+          end
+        end
+
+        private
+        def time_values?
+          now = Time.now.to_i
+          year_in_seconds = 365 * 24 * 60 * 60
+          window = 10 * year_in_seconds
+          new = now + window
+          old = now - window
+          recent_range = old..new
+          @sample_values.all? do |sample_value|
+            sample_value.is_a?(Integer) and
+              recent_range.cover?(sample_value)
+          end
+        end
+
+        def int32_values?
+          int32_min = -(2 ** 31)
+          int32_max = 2 ** 31 - 1
+          range = int32_min..int32_max
+          @sample_values.all? do |sample_value|
+            sample_value.is_a?(Integer) and
+              range.cover?(sample_value)
+          end
+        end
+
+        def int64_values?
+          @sample_values.all? do |sample_value|
+            sample_value.is_a?(Integer)
+          end
+        end
+
+        def float_values?
+          @sample_values.all? do |sample_value|
+            sample_value.is_a?(Float) or
+              sample_value.is_a?(Integer)
+          end
+        end
+
+        def geo_point_values?
+          @sample_values.all? do |sample_value|
+            sample_value.is_a?(String) and
+              /\A-?\d+(?:\.\d+)[,x]-?\d+(?:\.\d+)\z/ =~ sample_value
+          end
+        end
+      end
+
+      class Table
+        def initialize(name, key_type)
+          @name = name
+          @key_type = key_type
+        end
+      end
+
+      class Column
+        def initialize(name, value_type, vector_p)
+          @name = name
+          @value_type = value_type
+          @vector_p = vector_p
+        end
+      end
+    end
+
     class Emitter
       def initialize(client, table)
         @client = client
         @table = table
+        @schema = nil
+      end
+
+      def start
+        @schema = Schema.new(@client, @table)
+      end
+
+      def shutdown
       end
 
       def emit(chunk)
@@ -103,6 +277,8 @@ module Fluent
       def store_records(records)
         return if****@table*****?
 
+        @schema.update(records)
+
         arguments = {
           "table" => @table,
           "values" => Yajl::Encoder.encode(records),

  Added: sample/store.conf (+15 -0) 100644
===================================================================
--- /dev/null
+++ sample/store.conf    2014-10-20 16:29:41 +0900 (3e183a8)
@@ -0,0 +1,15 @@
+<source>
+  type forward
+</source>
+
+<match log.*>
+  type groonga
+  table Logs
+
+  protocol http
+  host 127.0.0.1
+
+  buffer_type file
+  buffer_path /tmp/buffer
+  flush_interval 1
+</match>

  Added: test/output/test_type_guesser.rb (+101 -0) 100644
===================================================================
--- /dev/null
+++ test/output/test_type_guesser.rb    2014-10-20 16:29:41 +0900 (1f86ffe)
@@ -0,0 +1,101 @@
+# Copyright (C) 2014  Kouhei Sutou <kou �� clear-code.com>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "fluent/plugin/out_groonga"
+
+class OutputTypeGuesserTest < Test::Unit::TestCase
+  sub_test_case "#guess" do
+    def guess(sample_values)
+      guesser = Fluent::GroongaOutput::Schema::TypeGuesser.new(sample_values)
+      guesser.guess
+    end
+
+    sub_test_case "Time" do
+      test "now" do
+        now = Time.now.to_i
+        assert_equal("Time", guess([now]))
+      end
+
+      test "past value" do
+        now = Time.now.to_i
+        year_in_seconds = 365 * 24 * 60 * 60
+        past = now - (9 * year_in_seconds)
+        assert_equal("Time", guess([past]))
+      end
+
+      test "future value" do
+        now = Time.now.to_i
+        year_in_seconds = 365 * 24 * 60 * 60
+        future = now + (9 * year_in_seconds)
+        assert_equal("Time", guess([future]))
+      end
+
+      test "all type values" do
+        now = Time.now.to_i
+        year_in_seconds = 365 * 24 * 60 * 60
+        past = now - (9 * year_in_seconds)
+        future = now + (9 * year_in_seconds)
+        assert_equal("Time", guess([now, past, future]))
+      end
+    end
+
+    sub_test_case "Int32" do
+      test "min" do
+        int32_min = -(2 ** 31)
+        assert_equal("Int32", guess([int32_min]))
+      end
+
+      test "max" do
+        int32_max = 2 ** 31 - 1
+        assert_equal("Int32", guess([int32_max]))
+      end
+
+      test "zero" do
+        assert_equal("Int32", guess([0]))
+      end
+    end
+
+    sub_test_case "Int64" do
+      test "int32_min - 1" do
+        int32_min = -(2 ** 31)
+        assert_equal("Int64", guess([int32_min - 1]))
+      end
+
+      test "int32_max + 1" do
+        int32_max = 2 ** 31 - 1
+        assert_equal("Int64", guess([int32_max + 1]))
+      end
+    end
+
+    sub_test_case "WGS84GeoPoint" do
+      test "\#{LATITUDE},\#{LONGITUDE}" do
+        statue_of_liberty = "40.689167,-74.044444"
+        assert_equal("WGS84GeoPoint", guess([statue_of_liberty]))
+      end
+
+      test "\#{LATITUDE}x\#{LONGITUDE}" do
+        statue_of_liberty = "40.689167x-74.044444"
+        assert_equal("WGS84GeoPoint", guess([statue_of_liberty]))
+      end
+    end
+
+    sub_test_case "Text" do
+      test "message" do
+        message = "failed to load data"
+        assert_equal("Text", guess([message]))
+      end
+    end
+  end
+end

  Modified: test/test_output.rb (+23 -21)
===================================================================
--- test/test_output.rb    2014-10-20 15:00:25 +0900 (9c477cf)
+++ test/test_output.rb    2014-10-20 16:29:41 +0900 (446aa5f)
@@ -54,34 +54,36 @@ EOC
       @real_host = "127.0.0.1"
       @real_port = 29292
       @real_server_thread = Thread.new do
-        real_server = TCPServer.new(@real_host, @real_port)
-        response_config = WEBrick::Config::HTTP.dup.update(:Logger => $log)
-        real_response = WEBrick::HTTPResponse.new(response_config)
-        request_headers = nil
-        request_body = ""
-        client = real_server.accept
-        real_server.close
-        @request_parser.on_body = lambda do |chunk|
-          @request_body << chunk
-        end
-        @request_parser.on_message_complete = lambda do
-          real_response.body = @response_body
-          real_response.send_response(client)
-          client.close
-        end
-
+        @real_server = TCPServer.new(@real_host, @real_port)
         loop do
-          break if client.closed?
-          data = client.readpartial(4096)
-          break if data.nil?
-          @request_parser << data
+          response_config = WEBrick::Config::HTTP.dup.update(:Logger => $log)
+          real_response = WEBrick::HTTPResponse.new(response_config)
+          request_headers = nil
+          request_body = ""
+          client = @real_server.accept
+          @request_parser.on_body = lambda do |chunk|
+            @request_body << chunk
+          end
+          @request_parser.on_message_complete = lambda do
+            real_response.body = @response_body
+            real_response.send_response(client)
+            client.close
+          end
+
+          loop do
+            break if client.closed?
+            data = client.readpartial(4096)
+            break if data.nil?
+            @request_parser << data
+          end
         end
       end
     end
 
     teardown
     def teardown_real_server
-      @real_server_thread.join
+      @real_server_thread.kill
+      @real_server.close
     end
 
     def configuration
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Zurück zum Archiv-Index