YUKI Hiroshi
null+****@clear*****
Wed Jan 29 17:49:15 JST 2014
YUKI Hiroshi 2014-01-29 17:49:15 +0900 (Wed, 29 Jan 2014) New Revision: 2b8b9560566fff1d686137d7469daa80a2539034 https://github.com/droonga/fluent-plugin-droonga/commit/2b8b9560566fff1d686137d7469daa80a2539034 Message: Build distributed command messages by the DistributedCommandPlanner Added files: lib/droonga/distributed_command_planner.rb Modified files: lib/droonga/distributor_plugin.rb lib/droonga/plugin/distributor/crud.rb Added: lib/droonga/distributed_command_planner.rb (+100 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/distributed_command_planner.rb 2014-01-29 17:49:15 +0900 (41cd9a8) @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2014 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +module Droonga + class DistributedCommandPlanner + attr_accessor :key + attr_reader :outputs + + def initialize(source_message) + @source_message = source_message + + @key = nil + @outputs = [] + + @reducers = [] + @gatherers = [] + @processors = [] + + plan_errors_handling + end + + def messages + @reducers + @gatherers + @processors + end + + def reduce(name, reducer) + @reducers << { + "type" => "reduce", + "body" => { + name => { + "#{name}_reduced" => reducer, + }, + }, + "inputs" => [name], + "outputs" => ["#{name}_reduced"], + } + + @gatherers << { + "type" => "gather", + "body" => { + "#{name}_reduced" => { + "output" => name, + }, + }, + "inputs" => ["#{name}_reduced"], + "post" => true, + } + end + + def scatter_all + raise MessageProcessingError.new("missing key") unless @key + @processors << { + "command" => @source_message["type"], + "dataset" => @source_message["dataset"], + "body" => @source_message["body"], + "key" => @key, + "type" => "scatter", + "outputs" => @outputs + ["errors"], + "replica" => "all", + "post" => true + } + end + + def broadcast_all + @processors << { + "command" => @source_message["type"], + "dataset" => @source_message["dataset"], + "body" => @source_message["body"], + "type" => "broadcast", + "outputs" => @outputs + ["errors"], + "replica" => "all", + "post" => true + } + end + + private + #XXX Now, we include definitions to merge errors in the body. + # However, this makes the term "errors" reserved, so plugins + # cannot use their custom "errors" in the body. + # This must be rewritten. + def plan_errors_handling + @outputs << "errors" + reduce("errors", "type" => "sum", "limit" => -1) + end + end +end Modified: lib/droonga/distributor_plugin.rb (+1 -69) =================================================================== --- lib/droonga/distributor_plugin.rb 2014-01-29 17:18:54 +0900 (ceb77bf) +++ lib/droonga/distributor_plugin.rb 2014-01-29 17:49:15 +0900 (b5c149d) @@ -16,6 +16,7 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA require "droonga/plugin" +require "droonga/distributed_command_planner" module Droonga class DistributorPlugin < Plugin @@ -30,16 +31,6 @@ module Droonga @distributor.distribute(messages) end - def scatter_all(message, key) - messages = [reducer(message), gatherer(message), scatterer(message, key)] - distribute(messages) - end - - def broadcast_all(message) - messages = [reducer(message), gatherer(message), broadcaster(message)] - distribute(messages) - end - private def process_error(command, error, arguments) if error.is_a?(MessageProcessingError) @@ -48,64 +39,5 @@ module Droonga super end end - - #XXX Now, default scatterer/broadcaster/reducer/gatherer includes - # definitions to merge errors in the body. However, this makes - # the term "errors" reserved, so plugins cannot use their custom - # "errors" in the body. This must be rewritten. - - def scatterer(message, key) - { - "command" => message["type"], - "dataset" => message["dataset"], - "body" => message["body"], - "key" => key, - "type" => "scatter", - "outputs" => ["errors"], - "replica" => "all", - "post" => true - } - end - - def broadcaster(message) - { - "command" => message["type"], - "dataset" => message["dataset"], - "body" => message["body"], - "type" => "broadcast", - "outputs" => ["errors"], - "replica" => "all", - "post" => true - } - end - - def reducer(message) - { - "type" => "reduce", - "body" => { - "errors" => { - "errors_reduced" => { - "type" => "sum", - "limit" => -1, - }, - }, - }, - "inputs" => ["errors"], - "outputs" => ["errors_reduced"], - } - end - - def gatherer(message) - { - "type" => "gather", - "body" => { - "errors_reduced" => { - "output" => "errors", - }, - }, - "inputs" => ["errors_reduced"], - "post" => true, - } - end end end Modified: lib/droonga/plugin/distributor/crud.rb (+10 -31) =================================================================== --- lib/droonga/plugin/distributor/crud.rb 2014-01-29 17:18:54 +0900 (f4c471e) +++ lib/droonga/plugin/distributor/crud.rb 2014-01-29 17:49:15 +0900 (127371c) @@ -23,49 +23,28 @@ module Droonga command :add def add(message) - key = message["body"]["key"] || rand.to_s - scatter_all(message, key) + scatter_all(message) end command :update def update(message) - key = message["body"]["key"] || rand.to_s - scatter_all(message, key) + scatter_all(message) end # TODO: What is this? command :reset def reset(message) - key = message["body"]["key"] || rand.to_s - scatter_all(message, key) + scatter_all(message) end private - def scatterer(message, key) - scatterer = super - scatterer["outputs"] << "success" - scatterer - end - - def reducer(message) - reducer = super - reducer["body"]["success"] = { - "success_reduced" => { - "type" => "and", - }, - } - reducer["inputs"] << "success" - reducer["outputs"] << "success_reduced" - reducer - end - - def gatherer(message) - gatherer = super - gatherer["body"]["success_reduced"] = { - "output" => "success", - } - gatherer["inputs"] << "success_reduced" - gatherer + def scatter_all(message) + planner = DistributedCommandPlanner.new(message) + planner.key = message["body"]["key"] || rand.to_s + planner.outputs << "success" + planner.reduce("success", "type" => "and") + planner.scatter_all + distribute(planner.messages) end end end -------------- next part -------------- HTML����������������������������... Descargar