Kouhei Sutou
null+****@clear*****
Fri Dec 20 15:24:34 JST 2013
Kouhei Sutou 2013-12-20 15:24:34 +0900 (Fri, 20 Dec 2013) New Revision: 3e3c80d0231420195059b06e336843cd5bfbe7bf https://github.com/droonga/fluent-plugin-droonga/commit/3e3c80d0231420195059b06e336843cd5bfbe7bf Message: Extract task for the first collector creation There are many duplicated codes. I will reduce them. Modified files: lib/droonga/collector.rb lib/droonga/dispatcher.rb Modified: lib/droonga/collector.rb (+52 -2) =================================================================== --- lib/droonga/collector.rb 2013-12-20 11:41:26 +0900 (e238f7b) +++ lib/droonga/collector.rb 2013-12-20 15:24:34 +0900 (8018d04) @@ -32,14 +32,64 @@ module Droonga load_plugins(["basic"]) # TODO: make customizable end - def handle(name, value) + def start + tasks = @inputs[nil] + tasks.each do |task| + component = task["component"] + type = component["type"] + command = component["command"] + n_of_expects = component["n_of_expects"] + synchronous = nil + synchronous = true unless n_of_expects.zero? + # TODO: check if asynchronous execution is available. + message = { + "task"=>task, + "name"=>nil, + "value"=>nil, + } + unless synchronous + descendants = {} + component["descendants"].each do |name, indices| + descendants[name] = indices.collect do |index| + @components[index]["routes"].map do |route| + @dispatcher.farm_path(route) + end + end + end + message["descendants"] = descendants + message["id"] = @id + end + @dispatcher.deliver(@id, task["route"], message, command, synchronous) + if synchronous + result = task["values"] + post = component["post"] + @dispatcher.post(result, post) if post + component["descendants"].each do |name, indices| + message = { + "id" => @id, + "input" => name, + "value" => result[name] + } + indices.each do |index| + @components[index]["routes"].each do |route| + @dispatcher.dispatch(message, route) + end + end + end + end + @n_dones += 1 + @dispatcher.collectors.delete(@id) if @n_dones ==****@tasks***** + end + end + + def receive(name, value) tasks = @inputs[name] unless tasks #TODO: result arrived before its query return end tasks.each do |task| - task["n_of_inputs"] += 1 if name + task["n_of_inputs"] += 1 component = task["component"] type = component["type"] command = component["command"] || ("collector_" + type) Modified: lib/droonga/dispatcher.rb (+4 -2) =================================================================== --- lib/droonga/dispatcher.rb 2013-12-20 11:41:26 +0900 (24e42f9) +++ lib/droonga/dispatcher.rb 2013-12-20 15:24:34 +0900 (5f65d92) @@ -124,7 +124,9 @@ module Droonga def handle_internal_message(message) id = message["id"] collector = @collectors[id] - unless collector + if collector + collector.receive(message["input"], message["value"]) + else components = message["components"] if components planner = Planner.new(self, components) @@ -132,8 +134,8 @@ module Droonga else #todo: take cases receiving result before its query into account end + collector.start end - collector.handle(message["input"], message["value"]) end def dispatch(message, destination) -------------- next part -------------- HTML����������������������������... Download