[Groonga-commit] droonga/droonga-engine at 71840ae [master] Create a socket for internal connections

Zurück zum Archiv-Index

Kouhei Sutou null+****@clear*****
Mon May 26 15:25:54 JST 2014


Kouhei Sutou	2014-05-26 15:25:54 +0900 (Mon, 26 May 2014)

  New Revision: 71840aeec748053e2f7d9bda06192255ec910a10
  https://github.com/droonga/droonga-engine/commit/71840aeec748053e2f7d9bda06192255ec910a10

  Message:
    Create a socket for internal connections
    
    It is needed for graceful restart.

  Added files:
    lib/droonga/internal_fluent_message_receiver.rb
  Modified files:
    lib/droonga/command/droonga_engine.rb
    lib/droonga/dispatcher.rb
    lib/droonga/engine.rb
    lib/droonga/engine_state.rb

  Modified: lib/droonga/command/droonga_engine.rb (+35 -3)
===================================================================
--- lib/droonga/command/droonga_engine.rb    2014-05-26 14:50:07 +0900 (d2f313c)
+++ lib/droonga/command/droonga_engine.rb    2014-05-26 15:25:54 +0900 (81ee8fc)
@@ -24,6 +24,7 @@ require "droonga/engine"
 require "droonga/serf"
 require "droonga/event_loop"
 require "droonga/fluent_message_receiver"
+require "droonga/internal_fluent_message_receiver"
 require "droonga/plugin_loader"
 
 module Droonga
@@ -360,6 +361,8 @@ module Droonga
           end
         end
 
+        include Loggable
+
         def initialize
           @engine_name = nil
           @listen_fd = nil
@@ -373,6 +376,8 @@ module Droonga
 
           begin
             run_services
+          rescue
+            logger.exception("failed to run services", $!)
           ensure
             shutdown_services
           end
@@ -408,12 +413,17 @@ module Droonga
           end
         end
 
+        def host
+          @engine_name.split(":", 2).first
+        end
+
         def run_services
           @engine = nil
           @receiver = nil
           raw_loop = Coolio::Loop.default
           @loop = EventLoop.new(raw_loop)
 
+          run_internal_message_receiver
           run_engine
           run_receiver
           setup_signals
@@ -424,12 +434,32 @@ module Droonga
         def shutdown_services
           shutdown_control_io
           shutdown_receiver
+          shutdown_internal_message_receiver
           shutdown_engine
           @loop = nil
         end
 
+        def run_internal_message_receiver
+          @internal_message_receiver = create_internal_message_receiver
+          host, port = @internal_message_receiver.start
+          tag = @engine_name.split("/", 2).last.split(".", 2).first
+          @internal_engine_name = "#{host}:#{port}/#{tag}"
+        end
+
+        def create_internal_message_receiver
+          InternalFluentMessageReceiver.new(@loop, host) do |tag, time, record|
+            on_message(tag, time, record)
+          end
+        end
+
+        def shutdown_internal_message_receiver
+          return if @internal_message_receiver.nil?
+          @internal_message_receiver, receiver = nil, @internal_message_receiver
+          receiver.shutdown
+        end
+
         def run_engine
-          @engine = Engine.new(@loop, @engine_name)
+          @engine = Engine.new(@loop, @engine_name, @internal_engine_name)
           @engine.start
         end
 
@@ -466,8 +496,6 @@ module Droonga
 
         def create_receiver
           options = {
-            :host => @host,
-            :port => @port,
             :listen_fd => @listen_fd,
             :heartbeat_fd => @heartbeat_fd,
           }
@@ -518,6 +546,10 @@ module Droonga
         def stop_immediate
           shutdown_services
         end
+
+        def log_tag
+          "service"
+        end
       end
     end
   end

  Modified: lib/droonga/dispatcher.rb (+1 -6)
===================================================================
--- lib/droonga/dispatcher.rb    2014-05-26 14:50:07 +0900 (bb14fc7)
+++ lib/droonga/dispatcher.rb    2014-05-26 15:25:54 +0900 (3ae7fd1)
@@ -13,7 +13,6 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "English"
 require "tsort"
 
 require "droonga/loggable"
@@ -213,11 +212,7 @@ module Droonga
 
     private
     def farm_path(route)
-      if route =~ /\A.*:\d+\/[^\.]+/
-        $MATCH
-      else
-        route
-      end
+      @engine_state.farm_path(route)
     end
 
     def process_input_message(message)

  Modified: lib/droonga/engine.rb (+2 -2)
===================================================================
--- lib/droonga/engine.rb    2014-05-26 14:50:07 +0900 (fe48f49)
+++ lib/droonga/engine.rb    2014-05-26 15:25:54 +0900 (ef4c30a)
@@ -31,8 +31,8 @@ module Droonga
     LAST_PROCESSED_TIMESTAMP = "last_processed_timestamp"
     EFFECTIVE_MESSAGE_TIMESTAMP = "effective_message_timestamp"
 
-    def initialize(loop, name)
-      @state = EngineState.new(loop, name)
+    def initialize(loop, name, internal_name)
+      @state = EngineState.new(loop, name, internal_name)
 
       @catalog_observer = Droonga::CatalogObserver.new(@state.loop)
       @catalog_observer.on_reload = lambda do |catalog|

  Modified: lib/droonga/engine_state.rb (+20 -3)
===================================================================
--- lib/droonga/engine_state.rb    2014-05-26 14:50:07 +0900 (2f29ed4)
+++ lib/droonga/engine_state.rb    2014-05-26 15:25:54 +0900 (985bd1c)
@@ -13,6 +13,8 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
+require "English"
+
 require "coolio"
 
 require "droonga/loggable"
@@ -26,11 +28,13 @@ module Droonga
 
     attr_reader :loop
     attr_reader :name
+    attr_reader :internal_name
     attr_reader :forwarder
     attr_reader :replier
-    def initialize(loop, name)
+    def initialize(loop, name, internal_name)
       @loop = loop
       @name = name
+      @internal_name = internal_name
       @sessions = {}
       @current_id = 0
       @forwarder = Forwarder.new(@loop)
@@ -50,13 +54,26 @@ module Droonga
     end
 
     def local_route?(route)
-      route.start_with?(@name)
+      route.start_with?(@name) or route.start_with?(@internal_name)
+    end
+
+    def farm_path(route)
+      if /\A[^:]+:\d+\/[^.]+/ =~ route
+        name = $MATCH
+        if name == @internal_name
+          @name
+        else
+          name
+        end
+      else
+        route
+      end
     end
 
     def generate_id
       id = @current_id
       @current_id = id.succ
-      return [@name, id].join(".#")
+      return [@internal_name, id].join(".#")
     end
 
     def find_session(id)

  Added: lib/droonga/internal_fluent_message_receiver.rb (+99 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/internal_fluent_message_receiver.rb    2014-05-26 15:25:54 +0900 (e7a6bb0)
@@ -0,0 +1,99 @@
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "socket"
+
+require "droonga/fluent_message_receiver"
+
+module Droonga
+  class InternalFluentMessageReceiver
+    include Loggable
+
+    def initialize(loop, host, &on_message)
+      @loop = loop
+      @host = host
+      @on_message = on_message
+    end
+
+    def start
+      logger.trace("start: start")
+      start_listen_socket
+      start_heartbeat_socket
+      start_message_receiver
+      logger.trace("start: done")
+
+      [@host, @port]
+    end
+
+    def shutdown
+      logger.trace("shutdown: start")
+      shutdown_message_receiver
+      shutdown_heartbeat_socket
+      shutdown_listen_socket
+      logger.trace("shutdown: done")
+    end
+
+    private
+    def start_listen_socket
+      logger.trace("start_listen_socket: start")
+      @listen_socket = TCPServer.new(@host, 0)
+      @port = @listen_socket.addr[1]
+      logger.trace("start_listen_socket: done")
+    end
+
+    def shutdown_listen_socket
+      logger.trace("shutdown_listen_socket: start")
+      logger.trace("shutdown_listen_socket: done")
+    end
+
+    def address_family
+      ip_address = IPAddr.new(IPSocket.getaddress(@host))
+      ip_address.family
+    end
+
+    def start_heartbeat_socket
+      logger.trace("start_heartbeat_socket: start")
+      @heartbeat_socket = UDPSocket.new(address_family)
+      @heartbeat_socket.bind(@host, @port)
+      logger.trace("start_heartbeat_socket: done")
+    end
+
+    def shutdown_heartbeat_socket
+      logger.trace("shutdown_heartbeat_socket: start")
+      logger.trace("shutdown_heartbeat_socket: done")
+    end
+
+    def start_message_receiver
+      logger.trace("start_heartbeat_socket: start")
+      options = {
+        :listen_fd    => @listen_socket.fileno,
+        :heartbeat_fd => @heartbeat_socket.fileno,
+      }
+      @message_receiver = FluentMessageReceiver.new(@loop, options, &@on_message)
+      @message_receiver.start
+      logger.trace("start_heartbeat_socket: done")
+    end
+
+    def shutdown_message_receiver
+      logger.trace("shutdown_message_receiver: start")
+      @message_receiver.shutdown
+      logger.trace("shutdown_message_receiver: done")
+    end
+
+    def log_tag
+      "internal-fluent-message-receiver"
+    end
+  end
+end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Zurück zum Archiv-Index