YUKI Hiroshi
null+****@clear*****
Thu Aug 28 18:45:15 JST 2014
YUKI Hiroshi 2014-08-28 18:45:15 +0900 (Thu, 28 Aug 2014) New Revision: c2987b0a64b61c9e4fbdf977d4408d0c2648aba9 https://github.com/droonga/droonga-engine/commit/c2987b0a64b61c9e4fbdf977d4408d0c2648aba9 Message: Implement remote commands in a separate file Copied files: lib/droonga/command/remote.rb (from lib/droonga/command/serf_event_handler.rb) Modified files: lib/droonga/command/serf_event_handler.rb Copied: lib/droonga/command/remote.rb (+40 -87) 78% =================================================================== --- lib/droonga/command/serf_event_handler.rb 2014-08-28 18:35:39 +0900 (22b49cb) +++ lib/droonga/command/remote.rb 2014-08-28 18:45:15 +0900 (cb79400) @@ -30,12 +30,12 @@ module Droonga class Base attr_reader :response - def initialize - @serf_name = ENV["SERF_SELF_NAME"] - @response = { + def initialize(serf_name, params) + @serf_name = serf_name + @params = params + @response = { "log" => [] } - @payload = JSON.parse($stdin.gets) end def process @@ -43,7 +43,7 @@ module Droonga end def should_process? - for_me? or****@paylo*****? or not****@paylo*****?("node") + for_me? or****@param*****? or not****@param*****?("node") end private @@ -56,7 +56,7 @@ module Droonga end def target_node - @payload && @payload["node"] + @params && @params["node"] end def for_me? @@ -70,13 +70,13 @@ module Droonga class ChangeRole < Base def process - NodeStatus.set(:role, @payload["role"]) + NodeStatus.set(:role, @params["role"]) end end class ReportStatus < Base def process - @response["value"] = NodeStatus.get(@payload["key"]) + @response["value"] = NodeStatus.get(@params["key"]) end end @@ -91,19 +91,19 @@ module Droonga private def type - @payload["type"] + @params["type"] end def source_node - @payload["source"] + @params["source"] end def joining_node - @payload["node"] + @params["node"] end def dataset_name - @payload["dataset"] + @params["dataset"] end def valid_params? @@ -146,7 +146,7 @@ module Droonga end def should_absorb_data? - @payload["copy"] + @params["copy"] end def join_as_replica @@ -154,23 +154,12 @@ module Droonga log("source_node = #{source_node}") - fetcher = CatalogFetcher.new(:host => source_host, - :port => port, - :tag => tag, - :receiver_host => joining_host) - catalog = fetcher.fetch(:dataset => dataset_name) - - generator = CatalogGenerator.new - generator.load(catalog) - dataset = generator.dataset_for_host(source_host) || - generator.dataset_for_host(host) - return unless dataset + other_hosts = replica_hosts + return if other_hosts.empty? # restart self with the fetched catalog. SafeFileWriter.write(Path.catalog, JSON.pretty_generate(catalog)) - other_hosts = dataset.replicas.hosts - absorb_data if should_absorb_data? log("joining to the cluster: update myself") @@ -181,6 +170,25 @@ module Droonga end end + def replica_hosts(catalog=nil) + catalog ||= fetch_catalog + + generator = CatalogGenerator.new + generator.load(catalog) + dataset = generator.dataset_for_host(source_host) || + generator.dataset_for_host(host) + return [] unless dataset + dataset.replicas.hosts + end + + def fetch_catalog + fetcher = CatalogFetcher.new(:host => source_host, + :port => port, + :tag => tag, + :receiver_host => joining_host) + fetcher.fetch(:dataset => dataset_name) + end + def absorb_data log("starting to copy data from #{source_host}") @@ -239,26 +247,26 @@ module Droonga private def source - @payload["source"] + @params["source"] end def dataset_name - @dataset_name ||= @payload["dataset"] + @dataset_name ||= @params["dataset"] end def port - @port ||= @payload["port"] + @port ||= @params["port"] end def tag - @tag ||= @payload["tag"] + @tag ||= @params["tag"] end end class ModifyReplicasBase < Base private def dataset - @payload["dataset"] + @params["dataset"] end def hosts @@ -266,7 +274,7 @@ module Droonga end def prepare_hosts - hosts = @payload["hosts"] + hosts = @params["hosts"] return nil unless hosts hosts = [hosts] if hosts.is_a?(String) hosts @@ -331,60 +339,5 @@ module Droonga end end end - - class SerfEventHandler - class << self - def run - new.run - end - end - - def run - command_class = detect_command_class - return true if command_class.nil? - - command = command_class.new - command.process if command.should_process? - output_response(command.response) - true - end - - private - def detect_command_class - case ENV["SERF_EVENT"] - when "user" - detect_command_class_from_custom_event(ENV["SERF_USER_EVENT"]) - when "query" - detect_command_class_from_custom_event(ENV["SERF_QUERY_NAME"]) - when "member-join", "member-leave", "member-update", "member-reap" - Remote::UpdateLiveNodes - end - end - - def detect_command_class_from_custom_event(event_name) - case event_name - when "change_role" - Remote::ChangeRole - when "report_status" - Remote::ReportStatus - when "join" - Remote::Join - when "set_replicas" - Remote::SetReplicas - when "add_replicas" - Remote::AddReplicas - when "remove_replicas" - Remote::RemoveReplicas - when "absorb_data" - Remote::AbsorbData - else - nil - end - end - - def output_response(response) - puts JSON.generate(response) - end - end end end Modified: lib/droonga/command/serf_event_handler.rb (+4 -315) =================================================================== --- lib/droonga/command/serf_event_handler.rb 2014-08-28 18:35:39 +0900 (22b49cb) +++ lib/droonga/command/serf_event_handler.rb 2014-08-28 18:45:15 +0900 (0b31954) @@ -15,323 +15,10 @@ require "json" -require "droonga/path" -require "droonga/serf" -require "droonga/node_status" -require "droonga/catalog_generator" -require "droonga/catalog_modifier" -require "droonga/catalog_fetcher" -require "droonga/data_absorber" -require "droonga/safe_file_writer" +require "droonga/command/remote" module Droonga module Command - module Remote - class Base - attr_reader :response - - def initialize - @serf_name = ENV["SERF_SELF_NAME"] - @response = { - "log" => [] - } - @payload = JSON.parse($stdin.gets) - end - - def process - # override me! - end - - def should_process? - for_me? or****@paylo*****? or not****@paylo*****?("node") - end - - private - def node - @serf_name - end - - def host - node.split(":").first - end - - def target_node - @payload && @payload["node"] - end - - def for_me? - target_node == @serf_name - end - - def log(message) - @response["log"] << message - end - end - - class ChangeRole < Base - def process - NodeStatus.set(:role, @payload["role"]) - end - end - - class ReportStatus < Base - def process - @response["value"] = NodeStatus.get(@payload["key"]) - end - end - - class Join < Base - def process - log("type = #{type}") - case type - when "replica" - join_as_replica - end - end - - private - def type - @payload["type"] - end - - def source_node - @payload["source"] - end - - def joining_node - @payload["node"] - end - - def dataset_name - @payload["dataset"] - end - - def valid_params? - have_required_params? and - valid_node?(source_node) and - valid_node?(joining_node) - end - - def have_required_params? - required_params = [ - source_node, - joining_node, - dataset_name, - ] - required_params.all? do |param| - not param.nil? - end - end - - NODE_PATTERN = /\A([^:]+):(\d+)\/(.+)\z/ - - def valid_node?(node) - node =~ NODE_PATTERN - end - - def source_host - @source_host ||= (source_node =~ NODE_PATTERN && $1) - end - - def joining_host - @source_host ||= (joining_node =~ NODE_PATTERN && $1) - end - - def port - @port ||= (source_node =~ NODE_PATTERN && $2 && $2.to_i) - end - - def tag - @tag ||= (source_node =~ NODE_PATTERN && $3) - end - - def should_absorb_data? - @payload["copy"] - end - - def join_as_replica - return unless valid_params? - - log("source_node = #{source_node}") - - fetcher = CatalogFetcher.new(:host => source_host, - :port => port, - :tag => tag, - :receiver_host => joining_host) - catalog = fetcher.fetch(:dataset => dataset_name) - - generator = CatalogGenerator.new - generator.load(catalog) - dataset = generator.dataset_for_host(source_host) || - generator.dataset_for_host(host) - return unless dataset - - # restart self with the fetched catalog. - SafeFileWriter.write(Path.catalog, JSON.pretty_generate(catalog)) - - other_hosts = dataset.replicas.hosts - - absorb_data if should_absorb_data? - - log("joining to the cluster: update myself") - - CatalogModifier.modify do |modifier| - modifier.datasets[dataset_name].replicas.hosts += other_hosts - modifier.datasets[dataset_name].replicas.hosts.uniq! - end - end - - def absorb_data - log("starting to copy data from #{source_host}") - - CatalogModifier.modify do |modifier| - modifier.datasets[dataset_name].replicas.hosts = [host] - end - sleep(5) #TODO: wait for restart. this should be done more safely, to avoid starting of absorbing with old catalog.json. - - status = NodeStatus.new - status.set(:absorbing, true) - DataAbsorber.absorb(:dataset => dataset_name, - :source_host => source_host, - :destination_host => joining_host, - :port => port, - :tag => tag) - status.delete(:absorbing) - sleep(1) - end - end - - class AbsorbData < Base - attr_writer :dataset_name, :port, :tag - - def process - return unless source - - log("start to absorb data from #{source}") - - if dataset_name.nil? or port.nil? or tag.nil? - current_catalog = JSON.parse(Path.catalog.read) - generator = CatalogGenerator.new - generator.load(current_catalog) - - dataset = generator.dataset_for_host(source) - return unless dataset - - self.dataset_name = dataset.name - self.port = dataset.replicas.port - self.tag = dataset.replicas.tag - end - - log("dataset = #{dataset_name}") - log("port = #{port}") - log("tag = #{tag}") - - status = NodeStatus.new - status.set(:absorbing, true) - DataAbsorber.absorb(:dataset => dataset_name, - :source_host => source, - :destination_host => host, - :port => port, - :tag => tag, - :client => "droonga-send") - status.delete(:absorbing) - end - - private - def source - @payload["source"] - end - - def dataset_name - @dataset_name ||= @payload["dataset"] - end - - def port - @port ||= @payload["port"] - end - - def tag - @tag ||= @payload["tag"] - end - end - - class ModifyReplicasBase < Base - private - def dataset - @payload["dataset"] - end - - def hosts - @hosts ||= prepare_hosts - end - - def prepare_hosts - hosts = @payload["hosts"] - return nil unless hosts - hosts = [hosts] if hosts.is_a?(String) - hosts - end - end - - class SetReplicas < ModifyReplicasBase - def process - return unless dataset - return unless hosts - - log("new replicas: #{hosts.join(",")}") - - CatalogModifier.modify do |modifier| - modifier.datasets[dataset].replicas.hosts = hosts - end - end - end - - class AddReplicas < ModifyReplicasBase - def process - return unless dataset - return unless hosts - - hosts -= [host] - return if hosts.empty? - - log("adding replicas: #{hosts.join(",")}") - - CatalogModifier.modify do |modifier| - modifier.datasets[dataset].replicas.hosts += hosts - modifier.datasets[dataset].replicas.hosts.uniq! - end - end - end - - class RemoveReplicas < ModifyReplicasBase - def process - return unless dataset - return unless hosts - - log("removing replicas: #{hosts.join(",")}") - - CatalogModifier.modify do |modifier| - modifier.datasets[dataset].replicas.hosts -= hosts - end - end - end - - class UpdateLiveNodes < Base - def process - def live_nodes - Serf.live_nodes(@serf_name) - end - - def output_live_nodes - path = Path.live_nodes - nodes = live_nodes - file_contents = JSON.pretty_generate(nodes) - SafeFileWriter.write(path, file_contents) - end - end - end - end - class SerfEventHandler class << self def run @@ -343,7 +30,9 @@ module Droonga command_class = detect_command_class return true if command_class.nil? - command = command_class.new + serf_name = ENV["SERF_SELF_NAME"] + payload = JSON.parse($stdin.gets) + command = command_class.new(serf_name, payload) command.process if command.should_process? output_response(command.response) true -------------- next part -------------- HTML����������������������������... Download