YUKI Hiroshi
null+****@clear*****
Fri Apr 10 17:58:03 JST 2015
YUKI Hiroshi 2015-04-10 17:58:03 +0900 (Fri, 10 Apr 2015) New Revision: 71eb380b00fb3d79614a6c65ed841fa6d8de9924 https://github.com/droonga/droonga-engine/commit/71eb380b00fb3d79614a6c65ed841fa6d8de9924 Message: Isolate Droonga::DataAbsorber from external commands Modified files: lib/droonga/data_absorber.rb Modified: lib/droonga/data_absorber.rb (+52 -103) =================================================================== --- lib/droonga/data_absorber.rb 2015-04-10 17:56:47 +0900 (5ec3b25) +++ lib/droonga/data_absorber.rb 2015-04-10 17:58:03 +0900 (1269f41) @@ -13,8 +13,6 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -require "open3" - require "droonga/loggable" require "droonga/client" require "droonga/catalog_generator" @@ -43,18 +41,14 @@ module Droonga attr_reader :params attr_reader :dataset, :port, :tag, :messages_per_second - attr_reader :source_host, :destination_host, :receiver_host, :receiver_port + attr_reader :source_host, :destination_host + attr_reader :error_message def initialize(params) @params = params @messages_per_second = @params[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND - @drndump = @params[:drndump] || "drndump" - # We should use droonga-send instead of droonga-request, - # because droonga-request is too slow. - @client = @params[:client] || "droonga-send" - @dataset = @params[:dataset] || CatalogGenerator::DEFAULT_DATASET @port = @params[:port] || CatalogGenerator::DEFAULT_PORT @tag = @params[:tag] || CatalogGenerator::DEFAULT_TAG @@ -65,58 +59,75 @@ module Droonga @receiver_port = @params[:receiver_port] + @destination_client_options = @params[:client_options] || {} + + @error_message = nil + #XXX We must instantiate the number of total soruce records before absorbing, # because parallel commands while doing "dump" can be timed out. - @required_time_in_seconds = calculate_required_time_in_seconds + @total_n_source_records = count_total_n_source_records end - MESSAGES_PER_SECOND_MATCHER = /(\d+(\.\d+)?) messages\/second/ - - def absorb - drndump_command_line = [@drndump] + drndump_options - client_command_line = [@client] + client_options(@client) - - start_time_in_seconds = Time.new.to_i - env = {} - Open3.pipeline_r([env, *drndump_command_line], - [env, *client_command_line]) do |last_stdout, thread| - last_stdout.each do |output| - if block_given? - messages_per_second = nil - if output =~ MESSAGES_PER_SECOND_MATCHER - messages_per_second = $1.to_f + def run + n_absorbers = 0 + + absorb_message = { + "type" => "system.absorb-data", + "body" => { + "host" => @source_host, + "port" => @port, + "tag" => @tag, + "dataset" => @dataset, + "messagesPerSecond" => @messages_per_second, + }, + } + destination_client.subscribe(absorb_message) do |message| + case message + when Droonga::Client::Error + destination_client.close + @error_message = message.to_s + else + case message["type"] + when "system.absorb-data.result", "system.absorb-data.error" + if message["statusCode"] != 200 + client.close + error = message["body"] + @error_message = "#{error['name']}: #{error['message']}" end - yield(:progress => report_progress(start_time_in_seconds), - :output => output, - :messages_per_second => messages_per_second) + when "system.absorb-data.progress" + @n_prosessed_messages = message["body"]["count"] + yield(:n_processed_messages => @n_processed_messages, + :percentage => progress_percentage, + :message => progress_message) + when "system.absorb-data.start" + n_absorbers += 1 + when "system.absorb-data.end" + n_absorbers -= 1 + client.close if n_absorbers <= 0 end end end end - def can_report_remaining_time? - @required_time_in_seconds != Droonga::DataAbsorber::TIME_UNKNOWN and - @required_time_in_seconds > 0 - end - ONE_MINUTE_IN_SECONDS = 60 ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60 - def report_progress(start_time_in_seconds) - return nil unless can_report_remaining_time? + def progress_percentage + progress = @n_prosessed_messages / @total_n_source_records + [(progress * 100).to_i, 100].min + end - elapsed_time = Time.new.to_i - start_time_in_seconds - progress = elapsed_time.to_f / @required_time_in_seconds - progress = [(progress * 100).to_i, 100].min + def progress_message + n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max - remaining_seconds = [@required_time_in_seconds - elapsed_time, 0].max + remaining_seconds = n_remaining_records / @messages_per_second remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS remaining_time = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds) - "#{progress}% done (maybe #{remaining_time} remaining)" + "#{progress_percentage}% done (maybe #{remaining_time} remaining)" end def source_client @@ -139,7 +150,7 @@ module Droonga :progocol => :droonga, :receiver_host => @receiver_host, :receiver_port => 0, - } + }.merge(@destination_client_options) @destination_client ||= Droonga::Client.new(options) end @@ -148,68 +159,6 @@ module Droonga end private - def calculate_required_time_in_seconds - if****@clien*****?("droonga-send") - total_n_source_records / @messages_per_second - else - TIME_UNKNOWN - end - end - - def drndump_options - options = [] - options += ["--host", @source_host] if @source_host - options += ["--port", @port] - options += ["--tag", @tag] - options += ["--dataset", @dataset] - options += ["--receiver-host", @receiver_host] - options += ["--receiver-port", @receiver_port] if @receiver_port - options.collect(&:to_s) - end - - def droonga_request_options - options = [] - options += ["--host", @destination_host] - options += ["--port", @port] - options += ["--tag", @tag] - options += ["--receiver-host", @receiver_host] - options += ["--receiver-port", @receiver_port] if @receiver_port - options.collect(&:to_s) - end - - def droonga_send_options - options = [] - - #XXX Don't use round-robin with multiple endpoints - # even if there are too much data. - # Schema and indexes must be sent to just one endpoint - # to keep their order, but currently there is no way to - # extract only schema and indexes via drndump. - # So, we always use just one endpoint for now, - # even if there are too much data. - server = "droonga:#{params[:destination_host]}" - server = "#{server}:#{params[:port].to_s}" - server = "#{server}/#{params[:tag].to_s}" - options += ["--server", server] - - #XXX We should restrict the traffic to avoid overflowing! - options += ["--messages-per-second", @messages_per_second] - - options += ["--report-throughput"] - - options.collect(&:to_s) - end - - def client_options(client) - if client.include?("droonga-request") - droonga_request_options - elsif client.include?("droonga-send") - droonga_send_options - else - raise ArgumentError.new("Unknwon type client: #{client}") - end - end - def source_tables response = source_client.request("dataset" => @dataset, "type" => "table_list") @@ -225,7 +174,7 @@ module Droonga end end - def total_n_source_records + def count_total_n_source_records queries = {} source_tables.each do |table| queries["n_records_of_#{table}"] = { -------------- next part -------------- HTML����������������������������... Descargar