YUKI Hiroshi
null+****@clear*****
Fri Apr 10 16:40:17 JST 2015
YUKI Hiroshi 2015-04-10 16:40:17 +0900 (Fri, 10 Apr 2015) New Revision: e7fad615cc1bd906f3a29ce94bc4dc0fd6d9e6f6 https://github.com/droonga/droonga-engine/commit/e7fad615cc1bd906f3a29ce94bc4dc0fd6d9e6f6 Message: Extract common codes to implement asynchronous handler from the dump command Added files: lib/droonga/plugin/async_command.rb Modified files: lib/droonga/plugins/dump.rb Added: lib/droonga/plugin/async_command.rb (+173 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/plugin/async_command.rb 2015-04-10 16:40:17 +0900 (6c9e6d4) @@ -0,0 +1,173 @@ +# Copyright (C) 2014-2015 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +require "coolio" + +require "droonga/handler" +require "droonga/error_messages" + +module Droonga + module Plugins + module AsyncCommand + class Request + def initialize(message) + @message = message + end + + def need_start? + reply_to + end + + def id + @message["id"] + end + + def dataset + @message.raw["dataset"] + end + + def reply_to + (@message.raw["replyTo"] || {})["to"] + end + + def messages_per_seconds + request = (@message.request || {}) + minimum_messages_per_seconds = 10 + [ + minimum_messages_per_seconds, + (request["messagesPerSecond"] || 10000).to_i, + ].max + end + end + + class Handler < Droonga::Handler + def handle(message) + request = Request.new(message) + if request.need_start? + start(request) + true + else + false + end + end + + def start(request) + #XXX override me! + # handler = MyAsyncHandler.new(loop, messenger, request) + # handler.start + end + end + + class AsyncHandler + def initialize(loop, messenger, request) + @loop = loop + @messenger = messenger + @request = request + end + + def start + setup_forward_data + + forward("#{prefix}.start") + + runner = Fiber.new do + handle + forward("#{prefix}.end") + end + + timer = Coolio::TimerWatcher.new(0.1, true) + timer.on_timer do + begin + runner.resume + rescue FiberError + timer.detach + logger.trace("start: watcher detached on FiberError", + :watcher => timer) + rescue + timer.detach + logger.trace("start: watcher detached on unexpected exception", + :watcher => timer) + logger.exception(error_message, $!) + error(error_name, error_message) + end + end + + @loop.attach(timer) + logger.trace("start: new watcher attached", + :watcher => timer) + end + + private + def prefix + "" #XXX override me!! + end + + def handle + #XXX override me!! + end + + def setup_forward_data + @base_forward_message = { + "inReplyTo" => @request.id, + "dataset" => @request.dataset, + } + @forward_to =****@reque*****_to + @n_forwarded_messages = 0 + @messages_per_100msec =****@reque*****_per_seconds / 10 + end + + def error_name + "Failure" #XXX override me!! + end + + def error_message + "failed to do" #XXX override me!! + end + + def error(name, message) + message = { + "statusCode" => ErrorMessages::InternalServerError::STATUS_CODE, + "body" => { + "name" => name, + "message" => message, + }, + } + error_message = @base_forward_message.merge(message) + @messenger.forward(error_message, + "to" => @forward_to, + "type" => "#{prefix}.error") + end + + def forward(type, body=nil) + forward_message = @base_forward_message + if body + forward_message = forward_message.merge("body" => body) + end + @messenger.forward(forward_message, + "to" => @forward_to, + "type" => type) + + @n_forwarded_messages += 1 + @n_forwarded_messages %= @messages_per_100msec + Fiber.yield if @n_forwarded_messages.zero? + end + + def log_tag + "[#{Process.ppid}] async-handler" + end + end + end + end +end Modified: lib/droonga/plugins/dump.rb (+21 -118) =================================================================== --- lib/droonga/plugins/dump.rb 2015-04-10 15:35:09 +0900 (63bdbd5) +++ lib/droonga/plugins/dump.rb 2015-04-10 16:40:17 +0900 (8985ec6) @@ -16,6 +16,7 @@ require "groonga" require "droonga/plugin" +require "droonga/plugin/async_command" require "droonga/error_messages" module Droonga @@ -35,136 +36,38 @@ module Droonga extend Plugin register("dump") - class Handler < Droonga::Handler - def handle(message) - request = Request.new(message) - if request.need_dump? - dumper = Dumper.new(@context, loop, messenger, request) - dumper.start_dump - true - else - false - end - end - end - - class Request - def initialize(message) - @message = message - end - - def need_dump? - reply_to - end - - def id - @message["id"] - end - - def dataset - @message.raw["dataset"] - end - - def reply_to - (@message.raw["replyTo"] || {})["to"] - end - - def messages_per_seconds - request = (@message.request || {}) - minimum_messages_per_seconds = 10 - [ - minimum_messages_per_seconds, - (request["messagesPerSecond"] || 10000).to_i, - ].max + class Handler < AsyncCommand::Handler + def start(request) + dumper = Dumper.new(@context, loop, messenger, request) + dumper.start_dump end end - class Dumper + class Dumper < AsyncCommand::AsyncHandler include Loggable def initialize(context, loop, messenger, request) @context = context - @loop = loop - @messenger = messenger - @request = request - end - - def start_dump - setup_forward_data - - forward("dump.start") - - dumper = Fiber.new do - dump_schema - dump_records - dump_indexes - forward("dump.end") - end - - on_error = lambda do |exception| - message = "failed to dump" - logger.exception(message, $!) - error("DumpFailure", message) - end - - timer = Coolio::TimerWatcher.new(0.1, true) - timer.on_timer do - begin - dumper.resume - rescue FiberError - timer.detach - logger.trace("start_dump: watcher detached on FiberError", - :watcher => timer) - rescue - timer.detach - logger.trace("start_dump: watcher detached on unexpected exception", - :watcher => timer) - on_error.call($!) - end - end - - @loop.attach(timer) - logger.trace("start_dump: new watcher attached", - :watcher => timer) + super(loop, messenger, request) end private - def setup_forward_data - @base_forward_message = { - "inReplyTo" => @request.id, - "dataset" => @request.dataset, - } - @forward_to =****@reque*****_to - @n_forwarded_messages = 0 - @messages_per_100msec =****@reque*****_per_seconds / 10 + def prefix + "dump" end - def error(name, message) - message = { - "statusCode" => ErrorMessages::InternalServerError::STATUS_CODE, - "body" => { - "name" => name, - "message" => message, - }, - } - error_message = @base_forward_message.merge(message) - @messenger.forward(error_message, - "to" => @forward_to, - "type" => "dump.error") + def error_name + "DumpFailure" end - def forward(type, body=nil) - forward_message = @base_forward_message - if body - forward_message = forward_message.merge("body" => body) - end - @messenger.forward(forward_message, - "to" => @forward_to, - "type" => type) + def error_message + "failed to dump" + end - @n_forwarded_messages += 1 - @n_forwarded_messages %= @messages_per_100msec - Fiber.yield if @n_forwarded_messages.zero? + def handle + dump_schema + dump_records + dump_indexes end def dump_schema @@ -183,7 +86,7 @@ module Droonga end def dump_table(table) - forward("dump.table", table_body(table)) + forward("#{prefix}.table", table_body(table)) columns = table.columns.sort_by(&:name) columns.each do |column| @@ -216,7 +119,7 @@ module Droonga end def dump_column(column) - forward("dump.column", column_body(column)) + forward("#{prefix}.column", column_body(column)) end def column_body(column) @@ -276,7 +179,7 @@ module Droonga "key" => record.key, "values" => values, } - forward("dump.record", body) + forward("#{prefix}.record", body) end end end -------------- next part -------------- HTML����������������������������...Download