YUKI Hiroshi
null+****@clear*****
Wed Jan 29 19:40:05 JST 2014
YUKI Hiroshi 2014-01-29 19:40:05 +0900 (Wed, 29 Jan 2014) New Revision: 0e319fe6747cac75be40890305955a06060fdcdb https://github.com/droonga/fluent-plugin-droonga/commit/0e319fe6747cac75be40890305955a06060fdcdb Message: Re-implement DistributedSearchPlanner based on DistributedCommandPlanner Modified files: lib/droonga/distributed_command_planner.rb lib/droonga/plugin/distributor/distributed_search_planner.rb lib/droonga/plugin/distributor/search.rb Modified: lib/droonga/distributed_command_planner.rb (+2 -2) =================================================================== --- lib/droonga/distributed_command_planner.rb 2014-01-29 19:35:30 +0900 (33eaf6d) +++ lib/droonga/distributed_command_planner.rb 2014-01-29 19:40:05 +0900 (9430f7c) @@ -125,13 +125,13 @@ module Droonga } end - def gatherer_message(command, name) + def gatherer_message(command, name, gatherer={}) { "type" => command, "body" => { output_name(name) => { "output" => name, - }, + }.merge(gatherer), }, "inputs" => [output_name(name)], "post" => true, Modified: lib/droonga/plugin/distributor/distributed_search_planner.rb (+16 -54) =================================================================== --- lib/droonga/plugin/distributor/distributed_search_planner.rb 2014-01-29 19:35:30 +0900 (c10c7ea) +++ lib/droonga/plugin/distributor/distributed_search_planner.rb 2014-01-29 19:40:05 +0900 (12c508a) @@ -16,32 +16,16 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA module Droonga - class DistributedSearchPlanner - attr_reader :reducers, :gatherer, :searcher - + class DistributedSearchPlanner < DistributedCommandPlanner def initialize(search_request_message) - @source_message = search_request_message + super + @request = @source_message["body"] @queries = @request["queries"] - @input_names = [] - @output_names = [] - @output_mappers = {} - - @reducers = [] - @gatherer = nil - @searcher = nil - - build_messages - end - - def messages - @reducers + [@gatherer, @searcher] + @query_outputs = [] end - private - UNLIMITED = -1 - def build_messages Searcher::QuerySorter.validate_dependencies(@queries) @@ -51,23 +35,13 @@ module Droonga transform_query(input_name, query) end - @gatherer = { - "type" => "search_gather", - "body" => @output_mappers, - "inputs" => @output_names, # XXX should be placed in the "body"? - "post" => true, # XXX should be placed in the "body"? - } - - @searcher = { - "type" => "broadcast", - "command" => "search", # XXX should be placed in the "body"? - "dataset" => @source_message["dataset"] || @request["dataset"], - "body" => @request, - "outputs" => @input_names, # XXX should be placed in the "body"? - "replica" => "random", # XXX should be placed in the "body"? - } + @dataset = @source_message["dataset"] || @request["dataset"] + broadcast_at_random(@request) end + private + UNLIMITED = -1 + def ensure_unifiable! @queries.each do |name, query| if unifiable?(name) && query["output"] @@ -95,28 +69,16 @@ module Droonga return end - @input_names << input_name - output_name = input_name + "_reduced" - @output_names << output_name + @query_outputs << output_name(input_name) transformer = QueryTransformer.new(query) - reducer = { - "type" => "search_reduce", - "body" => { - input_name => { - output_name => transformer.reducers, - }, - }, - "inputs" => [input_name], # XXX should be placed in the "body"? - "outputs" => [output_name], # XXX should be placed in the "body"? - } - @reducers << reducer - - @output_mappers[output_name] = { - "output" => input_name, - "elements" => transformer.mappers, - } + @reducer << reducer_message("search_reduce", + input_name, + transformer.reducers) + @gatherers << gatherer_message("search_gather", + input_name, + "elements" => transformer.mappers) end class QueryTransformer Modified: lib/droonga/plugin/distributor/search.rb (+2 -6) =================================================================== --- lib/droonga/plugin/distributor/search.rb 2014-01-29 19:35:30 +0900 (c32b528) +++ lib/droonga/plugin/distributor/search.rb 2014-01-29 19:40:05 +0900 (02139ca) @@ -25,12 +25,8 @@ module Droonga command :search def search(message) planner = DistributedSearchPlanner.new(message) - - #XXX This is just a temporary solution. We should handle errors by the framework itself. - planner.searcher["outputs"] << "errors" - messages = planner.messages + [reducer(message), gatherer(message)] - - distribute(messages) + planner.build_messages + distribute(planner.messages) end end end -------------- next part -------------- HTML����������������������������... Descargar