Kouhei Sutou
null+****@clear*****
Mon May 26 15:25:54 JST 2014
Kouhei Sutou 2014-05-26 15:25:54 +0900 (Mon, 26 May 2014) New Revision: 71840aeec748053e2f7d9bda06192255ec910a10 https://github.com/droonga/droonga-engine/commit/71840aeec748053e2f7d9bda06192255ec910a10 Message: Create a socket for internal connections It is needed for graceful restart. Added files: lib/droonga/internal_fluent_message_receiver.rb Modified files: lib/droonga/command/droonga_engine.rb lib/droonga/dispatcher.rb lib/droonga/engine.rb lib/droonga/engine_state.rb Modified: lib/droonga/command/droonga_engine.rb (+35 -3) =================================================================== --- lib/droonga/command/droonga_engine.rb 2014-05-26 14:50:07 +0900 (d2f313c) +++ lib/droonga/command/droonga_engine.rb 2014-05-26 15:25:54 +0900 (81ee8fc) @@ -24,6 +24,7 @@ require "droonga/engine" require "droonga/serf" require "droonga/event_loop" require "droonga/fluent_message_receiver" +require "droonga/internal_fluent_message_receiver" require "droonga/plugin_loader" module Droonga @@ -360,6 +361,8 @@ module Droonga end end + include Loggable + def initialize @engine_name = nil @listen_fd = nil @@ -373,6 +376,8 @@ module Droonga begin run_services + rescue + logger.exception("failed to run services", $!) ensure shutdown_services end @@ -408,12 +413,17 @@ module Droonga end end + def host + @engine_name.split(":", 2).first + end + def run_services @engine = nil @receiver = nil raw_loop = Coolio::Loop.default @loop = EventLoop.new(raw_loop) + run_internal_message_receiver run_engine run_receiver setup_signals @@ -424,12 +434,32 @@ module Droonga def shutdown_services shutdown_control_io shutdown_receiver + shutdown_internal_message_receiver shutdown_engine @loop = nil end + def run_internal_message_receiver + @internal_message_receiver = create_internal_message_receiver + host, port = @internal_message_receiver.start + tag = @engine_name.split("/", 2).last.split(".", 2).first + @internal_engine_name = "#{host}:#{port}/#{tag}" + end + + def create_internal_message_receiver + InternalFluentMessageReceiver.new(@loop, host) do |tag, time, record| + on_message(tag, time, record) + end + end + + def shutdown_internal_message_receiver + return if @internal_message_receiver.nil? + @internal_message_receiver, receiver = nil, @internal_message_receiver + receiver.shutdown + end + def run_engine - @engine = Engine.new(@loop, @engine_name) + @engine = Engine.new(@loop, @engine_name, @internal_engine_name) @engine.start end @@ -466,8 +496,6 @@ module Droonga def create_receiver options = { - :host => @host, - :port => @port, :listen_fd => @listen_fd, :heartbeat_fd => @heartbeat_fd, } @@ -518,6 +546,10 @@ module Droonga def stop_immediate shutdown_services end + + def log_tag + "service" + end end end end Modified: lib/droonga/dispatcher.rb (+1 -6) =================================================================== --- lib/droonga/dispatcher.rb 2014-05-26 14:50:07 +0900 (bb14fc7) +++ lib/droonga/dispatcher.rb 2014-05-26 15:25:54 +0900 (3ae7fd1) @@ -13,7 +13,6 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "English" require "tsort" require "droonga/loggable" @@ -213,11 +212,7 @@ module Droonga private def farm_path(route) - if route =~ /\A.*:\d+\/[^\.]+/ - $MATCH - else - route - end + @engine_state.farm_path(route) end def process_input_message(message) Modified: lib/droonga/engine.rb (+2 -2) =================================================================== --- lib/droonga/engine.rb 2014-05-26 14:50:07 +0900 (fe48f49) +++ lib/droonga/engine.rb 2014-05-26 15:25:54 +0900 (ef4c30a) @@ -31,8 +31,8 @@ module Droonga LAST_PROCESSED_TIMESTAMP = "last_processed_timestamp" EFFECTIVE_MESSAGE_TIMESTAMP = "effective_message_timestamp" - def initialize(loop, name) - @state = EngineState.new(loop, name) + def initialize(loop, name, internal_name) + @state = EngineState.new(loop, name, internal_name) @catalog_observer = Droonga::CatalogObserver.new(@state.loop) @catalog_observer.on_reload = lambda do |catalog| Modified: lib/droonga/engine_state.rb (+20 -3) =================================================================== --- lib/droonga/engine_state.rb 2014-05-26 14:50:07 +0900 (2f29ed4) +++ lib/droonga/engine_state.rb 2014-05-26 15:25:54 +0900 (985bd1c) @@ -13,6 +13,8 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +require "English" + require "coolio" require "droonga/loggable" @@ -26,11 +28,13 @@ module Droonga attr_reader :loop attr_reader :name + attr_reader :internal_name attr_reader :forwarder attr_reader :replier - def initialize(loop, name) + def initialize(loop, name, internal_name) @loop = loop @name = name + @internal_name = internal_name @sessions = {} @current_id = 0 @forwarder = Forwarder.new(@loop) @@ -50,13 +54,26 @@ module Droonga end def local_route?(route) - route.start_with?(@name) + route.start_with?(@name) or route.start_with?(@internal_name) + end + + def farm_path(route) + if /\A[^:]+:\d+\/[^.]+/ =~ route + name = $MATCH + if name == @internal_name + @name + else + name + end + else + route + end end def generate_id id = @current_id @current_id = id.succ - return [@name, id].join(".#") + return [@internal_name, id].join(".#") end def find_session(id) Added: lib/droonga/internal_fluent_message_receiver.rb (+99 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/internal_fluent_message_receiver.rb 2014-05-26 15:25:54 +0900 (e7a6bb0) @@ -0,0 +1,99 @@ +# Copyright (C) 2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "socket" + +require "droonga/fluent_message_receiver" + +module Droonga + class InternalFluentMessageReceiver + include Loggable + + def initialize(loop, host, &on_message) + @loop = loop + @host = host + @on_message = on_message + end + + def start + logger.trace("start: start") + start_listen_socket + start_heartbeat_socket + start_message_receiver + logger.trace("start: done") + + [@host, @port] + end + + def shutdown + logger.trace("shutdown: start") + shutdown_message_receiver + shutdown_heartbeat_socket + shutdown_listen_socket + logger.trace("shutdown: done") + end + + private + def start_listen_socket + logger.trace("start_listen_socket: start") + @listen_socket = TCPServer.new(@host, 0) + @port = @listen_socket.addr[1] + logger.trace("start_listen_socket: done") + end + + def shutdown_listen_socket + logger.trace("shutdown_listen_socket: start") + logger.trace("shutdown_listen_socket: done") + end + + def address_family + ip_address = IPAddr.new(IPSocket.getaddress(@host)) + ip_address.family + end + + def start_heartbeat_socket + logger.trace("start_heartbeat_socket: start") + @heartbeat_socket = UDPSocket.new(address_family) + @heartbeat_socket.bind(@host, @port) + logger.trace("start_heartbeat_socket: done") + end + + def shutdown_heartbeat_socket + logger.trace("shutdown_heartbeat_socket: start") + logger.trace("shutdown_heartbeat_socket: done") + end + + def start_message_receiver + logger.trace("start_heartbeat_socket: start") + options = { + :listen_fd => @listen_socket.fileno, + :heartbeat_fd => @heartbeat_socket.fileno, + } + @message_receiver = FluentMessageReceiver.new(@loop, options, &@on_message) + @message_receiver.start + logger.trace("start_heartbeat_socket: done") + end + + def shutdown_message_receiver + logger.trace("shutdown_message_receiver: start") + @message_receiver.shutdown + logger.trace("shutdown_message_receiver: done") + end + + def log_tag + "internal-fluent-message-receiver" + end + end +end -------------- next part -------------- HTML����������������������������... Download