Kouhei Sutou
null+****@clear*****
Mon Oct 20 16:29:41 JST 2014
Kouhei Sutou 2014-10-20 16:29:41 +0900 (Mon, 20 Oct 2014) New Revision: 179473a0439c36d956c39ea1a38e6ad64291df75 https://github.com/groonga/fluent-plugin-groonga/commit/179473a0439c36d956c39ea1a38e6ad64291df75 Message: Support auto schema definition Added files: sample/store.conf test/output/test_type_guesser.rb Modified files: lib/fluent/plugin/out_groonga.rb test/test_output.rb Modified: lib/fluent/plugin/out_groonga.rb (+176 -0) =================================================================== --- lib/fluent/plugin/out_groonga.rb 2014-10-20 15:00:25 +0900 (90bfa60) +++ lib/fluent/plugin/out_groonga.rb 2014-10-20 16:29:41 +0900 (9e9a64f) @@ -50,10 +50,12 @@ module Fluent def start super @client.start + @emitter.start end def shutdown super + @emitter.shutdown @client.shutdown end @@ -75,10 +77,182 @@ module Fluent end end + class Schema + def initialize(client, table_name) + @client = client + @table_name = table_name + @table = nil + @columns = nil + end + + def populate + # TODO + end + + def update(records) + ensure_table + ensure_columns + + nonexistent_columns = {} + records.each do |record| + record.each do |key, value| + column = @columns[key] + if column.nil? + nonexistent_columns[key] ||= [] + nonexistent_columns[key] << value + end + end + end + + nonexistent_columns.each do |name, values| + @columns[name] = create_column(name, values) + end + end + + private + def ensure_table + return if @table + + table_list =****@clien*****("table_list") + target_table = table_list.find do |table| + table.name == @table_name + end + if target_table + @table = Table.new(@table_name, target_table.domain) + else + # TODO: Check response + @client.send("table_create", + "name" => @table_name, + "flags" => "TABLE_NO_KEY") + @table = Table.new(@table_name, nil) + end + end + + def ensure_columns + return if @columns + + column_list =****@clien*****("column_list", "table" => @table_name) + @columns = {} + column_list.each do |column| + vector_p = column.flags.split("|").include?("COLUMN_VECTOR") + @columns[column.name] = Column.new(column.name, + column.range, + vector_p) + end + end + + def create_column(name, sample_values) + guesser = TypeGuesser.new(sample_values) + value_type = guesser.guess + vector_p = guesser.vector? + if vector_p + flags = "COLUMN_VECTOR" + else + flags = "COLUMN_SCALAR" + end + # TODO: Check response + @client.send("column_create", + "table" => @table_name, + "name" => name, + "flags" => flags, + "type" => value_type) + @columns[name] = Column.new(name, value_type, vector_p) + end + + class TypeGuesser + def initialize(sample_values) + @sample_values = sample_values + end + + def guess + return "Time" if time_values? + return "Int32" if int32_values? + return "Int64" if int64_values? + return "Float" if float_values? + return "WGS84GeoPoint" if geo_point_values? + + "Text" + end + + def vector? + @sample_values.any? do |sample_value| + sample_value.is_a?(Array) + end + end + + private + def time_values? + now = Time.now.to_i + year_in_seconds = 365 * 24 * 60 * 60 + window = 10 * year_in_seconds + new = now + window + old = now - window + recent_range = old..new + @sample_values.all? do |sample_value| + sample_value.is_a?(Integer) and + recent_range.cover?(sample_value) + end + end + + def int32_values? + int32_min = -(2 ** 31) + int32_max = 2 ** 31 - 1 + range = int32_min..int32_max + @sample_values.all? do |sample_value| + sample_value.is_a?(Integer) and + range.cover?(sample_value) + end + end + + def int64_values? + @sample_values.all? do |sample_value| + sample_value.is_a?(Integer) + end + end + + def float_values? + @sample_values.all? do |sample_value| + sample_value.is_a?(Float) or + sample_value.is_a?(Integer) + end + end + + def geo_point_values? + @sample_values.all? do |sample_value| + sample_value.is_a?(String) and + /\A-?\d+(?:\.\d+)[,x]-?\d+(?:\.\d+)\z/ =~ sample_value + end + end + end + + class Table + def initialize(name, key_type) + @name = name + @key_type = key_type + end + end + + class Column + def initialize(name, value_type, vector_p) + @name = name + @value_type = value_type + @vector_p = vector_p + end + end + end + class Emitter def initialize(client, table) @client = client @table = table + @schema = nil + end + + def start + @schema = Schema.new(@client, @table) + end + + def shutdown end def emit(chunk) @@ -103,6 +277,8 @@ module Fluent def store_records(records) return if****@table*****? + @schema.update(records) + arguments = { "table" => @table, "values" => Yajl::Encoder.encode(records), Added: sample/store.conf (+15 -0) 100644 =================================================================== --- /dev/null +++ sample/store.conf 2014-10-20 16:29:41 +0900 (3e183a8) @@ -0,0 +1,15 @@ +<source> + type forward +</source> + +<match log.*> + type groonga + table Logs + + protocol http + host 127.0.0.1 + + buffer_type file + buffer_path /tmp/buffer + flush_interval 1 +</match> Added: test/output/test_type_guesser.rb (+101 -0) 100644 =================================================================== --- /dev/null +++ test/output/test_type_guesser.rb 2014-10-20 16:29:41 +0900 (1f86ffe) @@ -0,0 +1,101 @@ +# Copyright (C) 2014 Kouhei Sutou <kou �� clear-code.com> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "fluent/plugin/out_groonga" + +class OutputTypeGuesserTest < Test::Unit::TestCase + sub_test_case "#guess" do + def guess(sample_values) + guesser = Fluent::GroongaOutput::Schema::TypeGuesser.new(sample_values) + guesser.guess + end + + sub_test_case "Time" do + test "now" do + now = Time.now.to_i + assert_equal("Time", guess([now])) + end + + test "past value" do + now = Time.now.to_i + year_in_seconds = 365 * 24 * 60 * 60 + past = now - (9 * year_in_seconds) + assert_equal("Time", guess([past])) + end + + test "future value" do + now = Time.now.to_i + year_in_seconds = 365 * 24 * 60 * 60 + future = now + (9 * year_in_seconds) + assert_equal("Time", guess([future])) + end + + test "all type values" do + now = Time.now.to_i + year_in_seconds = 365 * 24 * 60 * 60 + past = now - (9 * year_in_seconds) + future = now + (9 * year_in_seconds) + assert_equal("Time", guess([now, past, future])) + end + end + + sub_test_case "Int32" do + test "min" do + int32_min = -(2 ** 31) + assert_equal("Int32", guess([int32_min])) + end + + test "max" do + int32_max = 2 ** 31 - 1 + assert_equal("Int32", guess([int32_max])) + end + + test "zero" do + assert_equal("Int32", guess([0])) + end + end + + sub_test_case "Int64" do + test "int32_min - 1" do + int32_min = -(2 ** 31) + assert_equal("Int64", guess([int32_min - 1])) + end + + test "int32_max + 1" do + int32_max = 2 ** 31 - 1 + assert_equal("Int64", guess([int32_max + 1])) + end + end + + sub_test_case "WGS84GeoPoint" do + test "\#{LATITUDE},\#{LONGITUDE}" do + statue_of_liberty = "40.689167,-74.044444" + assert_equal("WGS84GeoPoint", guess([statue_of_liberty])) + end + + test "\#{LATITUDE}x\#{LONGITUDE}" do + statue_of_liberty = "40.689167x-74.044444" + assert_equal("WGS84GeoPoint", guess([statue_of_liberty])) + end + end + + sub_test_case "Text" do + test "message" do + message = "failed to load data" + assert_equal("Text", guess([message])) + end + end + end +end Modified: test/test_output.rb (+23 -21) =================================================================== --- test/test_output.rb 2014-10-20 15:00:25 +0900 (9c477cf) +++ test/test_output.rb 2014-10-20 16:29:41 +0900 (446aa5f) @@ -54,34 +54,36 @@ EOC @real_host = "127.0.0.1" @real_port = 29292 @real_server_thread = Thread.new do - real_server = TCPServer.new(@real_host, @real_port) - response_config = WEBrick::Config::HTTP.dup.update(:Logger => $log) - real_response = WEBrick::HTTPResponse.new(response_config) - request_headers = nil - request_body = "" - client = real_server.accept - real_server.close - @request_parser.on_body = lambda do |chunk| - @request_body << chunk - end - @request_parser.on_message_complete = lambda do - real_response.body = @response_body - real_response.send_response(client) - client.close - end - + @real_server = TCPServer.new(@real_host, @real_port) loop do - break if client.closed? - data = client.readpartial(4096) - break if data.nil? - @request_parser << data + response_config = WEBrick::Config::HTTP.dup.update(:Logger => $log) + real_response = WEBrick::HTTPResponse.new(response_config) + request_headers = nil + request_body = "" + client = @real_server.accept + @request_parser.on_body = lambda do |chunk| + @request_body << chunk + end + @request_parser.on_message_complete = lambda do + real_response.body = @response_body + real_response.send_response(client) + client.close + end + + loop do + break if client.closed? + data = client.readpartial(4096) + break if data.nil? + @request_parser << data + end end end end teardown def teardown_real_server - @real_server_thread.join + @real_server_thread.kill + @real_server.close end def configuration -------------- next part -------------- HTML����������������������������...Download