YUKI Hiroshi
null+****@clear*****
Wed Apr 29 02:12:55 JST 2015
YUKI Hiroshi 2015-04-29 02:12:55 +0900 (Wed, 29 Apr 2015) New Revision: d03be413b99c8639aef99a27106adc47f8fffccd https://github.com/droonga/droonga-engine/commit/d03be413b99c8639aef99a27106adc47f8fffccd Message: Bounce message with "targetRole" to correct node if possible Modified files: lib/droonga/cluster.rb lib/droonga/dispatcher.rb lib/droonga/engine_node.rb lib/droonga/node_role.rb Modified: lib/droonga/cluster.rb (+30 -0) =================================================================== --- lib/droonga/cluster.rb 2015-04-29 02:09:53 +0900 (fd50750) +++ lib/droonga/cluster.rb 2015-04-29 02:12:55 +0900 (2585555) @@ -34,6 +34,12 @@ module Droonga class UnknownTarget < StandardError end + class NoAcceptableReceiver < StandardError + def initialize(message) + super(message.inspect) + end + end + class << self def load_state_file path = Path.cluster_state @@ -163,6 +169,30 @@ module Droonga raise UnknownTarget.new(receiver) end + def bounce(message) + role = message["targetRole"] + logger.info("bounce: trying to bounce message to another " + + "node with the role: #{role}") + raise NotStartedYet.new unless @engine_nodes + + acceptable_nodes = engine_nodes.select do |node| + node.role == role and + node.live? + end + receiver = acceptable_nodes.sample + if receiver + destination = { + "to" => receiver.name, + "type" => message["type"], + } + receiver.forward(message, destination) + else + logger.error("bounce: no available node with the role #{role}", + :message => message) + # raise NoAcceptableReceiver.new(message) + end + end + def engine_node_names @engine_node_names ||= engine_nodes.collect(&:name) end Modified: lib/droonga/dispatcher.rb (+10 -0) =================================================================== --- lib/droonga/dispatcher.rb 2015-04-29 02:09:53 +0900 (42824b3) +++ lib/droonga/dispatcher.rb 2015-04-29 02:12:55 +0900 (f7f9b64) @@ -350,7 +350,17 @@ module Droonga @engine_state.public_farm_path(route) end + def acceptable_role?(message) + message["targetRole"].nil? or + message["targetRole"] == NodeRole::ANY or + message["targetRole"] == NodeRole.mine + end + def process_input_message(message) + unless acceptable_role?(message) + @cluster.bounce(message) + return + end dataset = message["dataset"] adapter_runner = @adapter_runners[dataset] adapted_message = adapter_runner.adapt_input(message) Modified: lib/droonga/engine_node.rb (+4 -4) =================================================================== --- lib/droonga/engine_node.rb 2015-04-29 02:09:53 +0900 (7599425) +++ lib/droonga/engine_node.rb 2015-04-29 02:12:55 +0900 (5d57f1c) @@ -102,6 +102,10 @@ module Droonga end end + def live? + @state.nil? or @state["live"] + end + def forwardable? return false unless live? role == NodeRole.mine @@ -180,10 +184,6 @@ module Droonga end end - def live? - @state.nil? or @state["live"] - end - def have_unprocessed_messages? @state and @state["have_unprocessed_messages"] end Modified: lib/droonga/node_role.rb (+2 -0) =================================================================== --- lib/droonga/node_role.rb 2015-04-29 02:09:53 +0900 (690058a) +++ lib/droonga/node_role.rb 2015-04-29 02:12:55 +0900 (6eecb43) @@ -24,6 +24,8 @@ module Droonga ABSORB_SOURCE = "absorb-source" ABSORB_DESTINATION = "absorb-destination" + ANY = "any" + ROLES = [ SERVICE_PROVIDER, ABSORB_SOURCE, -------------- next part -------------- HTML����������������������������...Download