[Groonga-commit] droonga/droonga-engine at e7fad61 [master] Extract common codes to implement asynchronous handler from the dump command

Zurück zum Archiv-Index

YUKI Hiroshi null+****@clear*****
Fri Apr 10 16:40:17 JST 2015


YUKI Hiroshi	2015-04-10 16:40:17 +0900 (Fri, 10 Apr 2015)

  New Revision: e7fad615cc1bd906f3a29ce94bc4dc0fd6d9e6f6
  https://github.com/droonga/droonga-engine/commit/e7fad615cc1bd906f3a29ce94bc4dc0fd6d9e6f6

  Message:
    Extract common codes to implement asynchronous handler from the dump command

  Added files:
    lib/droonga/plugin/async_command.rb
  Modified files:
    lib/droonga/plugins/dump.rb

  Added: lib/droonga/plugin/async_command.rb (+173 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/plugin/async_command.rb    2015-04-10 16:40:17 +0900 (6c9e6d4)
@@ -0,0 +1,173 @@
+# Copyright (C) 2014-2015 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+
+require "coolio"
+
+require "droonga/handler"
+require "droonga/error_messages"
+
+module Droonga
+  module Plugins
+    module AsyncCommand
+      class Request
+        def initialize(message)
+          @message = message
+        end
+
+        def need_start?
+          reply_to
+        end
+
+        def id
+          @message["id"]
+        end
+
+        def dataset
+          @message.raw["dataset"]
+        end
+
+        def reply_to
+          (@message.raw["replyTo"] || {})["to"]
+        end
+
+        def messages_per_seconds
+          request = (@message.request || {})
+          minimum_messages_per_seconds = 10
+          [
+            minimum_messages_per_seconds,
+            (request["messagesPerSecond"] || 10000).to_i,
+          ].max
+        end
+      end
+
+      class Handler < Droonga::Handler
+        def handle(message)
+          request = Request.new(message)
+          if request.need_start?
+            start(request)
+            true
+          else
+            false
+          end
+        end
+
+        def start(request)
+          #XXX override me!
+          # handler = MyAsyncHandler.new(loop, messenger, request)
+          # handler.start
+        end
+      end
+
+      class AsyncHandler
+        def initialize(loop, messenger, request)
+          @loop = loop
+          @messenger = messenger
+          @request = request
+        end
+
+        def start
+          setup_forward_data
+
+          forward("#{prefix}.start")
+
+          runner = Fiber.new do
+            handle
+            forward("#{prefix}.end")
+          end
+
+          timer = Coolio::TimerWatcher.new(0.1, true)
+          timer.on_timer do
+            begin
+              runner.resume
+            rescue FiberError
+              timer.detach
+              logger.trace("start: watcher detached on FiberError",
+                           :watcher => timer)
+            rescue
+              timer.detach
+              logger.trace("start: watcher detached on unexpected exception",
+                           :watcher => timer)
+              logger.exception(error_message, $!)
+              error(error_name, error_message)
+            end
+          end
+
+          @loop.attach(timer)
+          logger.trace("start: new watcher attached",
+                       :watcher => timer)
+        end
+
+        private
+        def prefix
+          "" #XXX override me!!
+        end
+
+        def handle
+          #XXX override me!!
+        end
+
+        def setup_forward_data
+          @base_forward_message = {
+            "inReplyTo" => @request.id,
+            "dataset"   => @request.dataset,
+          }
+          @forward_to =****@reque*****_to
+          @n_forwarded_messages = 0
+          @messages_per_100msec =****@reque*****_per_seconds / 10
+        end
+
+        def error_name
+          "Failure" #XXX override me!!
+        end
+
+        def error_message
+          "failed to do" #XXX override me!!
+        end
+
+        def error(name, message)
+          message = {
+            "statusCode" => ErrorMessages::InternalServerError::STATUS_CODE,
+            "body" => {
+              "name"    => name,
+              "message" => message,
+            },
+          }
+          error_message = @base_forward_message.merge(message)
+          @messenger.forward(error_message,
+                             "to"   => @forward_to,
+                             "type" => "#{prefix}.error")
+        end
+
+        def forward(type, body=nil)
+          forward_message = @base_forward_message
+          if body
+            forward_message = forward_message.merge("body" => body)
+          end
+          @messenger.forward(forward_message,
+                             "to"   => @forward_to,
+                             "type" => type)
+
+          @n_forwarded_messages += 1
+          @n_forwarded_messages %= @messages_per_100msec
+          Fiber.yield if @n_forwarded_messages.zero?
+        end
+
+        def log_tag
+          "[#{Process.ppid}] async-handler"
+        end
+      end
+    end
+  end
+end

  Modified: lib/droonga/plugins/dump.rb (+21 -118)
===================================================================
--- lib/droonga/plugins/dump.rb    2015-04-10 15:35:09 +0900 (63bdbd5)
+++ lib/droonga/plugins/dump.rb    2015-04-10 16:40:17 +0900 (8985ec6)
@@ -16,6 +16,7 @@
 require "groonga"
 
 require "droonga/plugin"
+require "droonga/plugin/async_command"
 require "droonga/error_messages"
 
 module Droonga
@@ -35,136 +36,38 @@ module Droonga
       extend Plugin
       register("dump")
 
-      class Handler < Droonga::Handler
-        def handle(message)
-          request = Request.new(message)
-          if request.need_dump?
-            dumper = Dumper.new(@context, loop, messenger, request)
-            dumper.start_dump
-            true
-          else
-            false
-          end
-        end
-      end
-
-      class Request
-        def initialize(message)
-          @message = message
-        end
-
-        def need_dump?
-          reply_to
-        end
-
-        def id
-          @message["id"]
-        end
-
-        def dataset
-          @message.raw["dataset"]
-        end
-
-        def reply_to
-          (@message.raw["replyTo"] || {})["to"]
-        end
-
-        def messages_per_seconds
-          request = (@message.request || {})
-          minimum_messages_per_seconds = 10
-          [
-            minimum_messages_per_seconds,
-            (request["messagesPerSecond"] || 10000).to_i,
-          ].max
+      class Handler < AsyncCommand::Handler
+        def start(request)
+          dumper = Dumper.new(@context, loop, messenger, request)
+          dumper.start_dump
         end
       end
 
-      class Dumper
+      class Dumper < AsyncCommand::AsyncHandler
         include Loggable
 
         def initialize(context, loop, messenger, request)
           @context = context
-          @loop = loop
-          @messenger = messenger
-          @request = request
-        end
-
-        def start_dump
-          setup_forward_data
-
-          forward("dump.start")
-
-          dumper = Fiber.new do
-            dump_schema
-            dump_records
-            dump_indexes
-            forward("dump.end")
-          end
-
-          on_error = lambda do |exception|
-            message = "failed to dump"
-            logger.exception(message, $!)
-            error("DumpFailure", message)
-          end
-
-          timer = Coolio::TimerWatcher.new(0.1, true)
-          timer.on_timer do
-            begin
-              dumper.resume
-            rescue FiberError
-              timer.detach
-              logger.trace("start_dump: watcher detached on FiberError",
-                           :watcher => timer)
-            rescue
-              timer.detach
-              logger.trace("start_dump: watcher detached on unexpected exception",
-                           :watcher => timer)
-              on_error.call($!)
-            end
-          end
-
-          @loop.attach(timer)
-          logger.trace("start_dump: new watcher attached",
-                       :watcher => timer)
+          super(loop, messenger, request)
         end
 
         private
-        def setup_forward_data
-          @base_forward_message = {
-            "inReplyTo" => @request.id,
-            "dataset"   => @request.dataset,
-          }
-          @forward_to =****@reque*****_to
-          @n_forwarded_messages = 0
-          @messages_per_100msec =****@reque*****_per_seconds / 10
+        def prefix
+          "dump"
         end
 
-        def error(name, message)
-          message = {
-            "statusCode" => ErrorMessages::InternalServerError::STATUS_CODE,
-            "body" => {
-              "name"    => name,
-              "message" => message,
-            },
-          }
-          error_message = @base_forward_message.merge(message)
-          @messenger.forward(error_message,
-                             "to"   => @forward_to,
-                             "type" => "dump.error")
+        def error_name
+          "DumpFailure"
         end
 
-        def forward(type, body=nil)
-          forward_message = @base_forward_message
-          if body
-            forward_message = forward_message.merge("body" => body)
-          end
-          @messenger.forward(forward_message,
-                             "to"   => @forward_to,
-                             "type" => type)
+        def error_message
+          "failed to dump"
+        end
 
-          @n_forwarded_messages += 1
-          @n_forwarded_messages %= @messages_per_100msec
-          Fiber.yield if @n_forwarded_messages.zero?
+        def handle
+          dump_schema
+          dump_records
+          dump_indexes
         end
 
         def dump_schema
@@ -183,7 +86,7 @@ module Droonga
         end
 
         def dump_table(table)
-          forward("dump.table", table_body(table))
+          forward("#{prefix}.table", table_body(table))
 
           columns = table.columns.sort_by(&:name)
           columns.each do |column|
@@ -216,7 +119,7 @@ module Droonga
         end
 
         def dump_column(column)
-          forward("dump.column", column_body(column))
+          forward("#{prefix}.column", column_body(column))
         end
 
         def column_body(column)
@@ -276,7 +179,7 @@ module Droonga
                 "key"    => record.key,
                 "values" => values,
               }
-              forward("dump.record", body)
+              forward("#{prefix}.record", body)
             end
           end
         end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Zurück zum Archiv-Index