YUKI Hiroshi
null+****@clear*****
Tue Apr 21 22:29:59 JST 2015
YUKI Hiroshi 2015-04-21 22:29:59 +0900 (Tue, 21 Apr 2015) New Revision: 8e0a473192c92af412ed9f65d740680f50e8f37c https://github.com/droonga/droonga-engine/commit/8e0a473192c92af412ed9f65d740680f50e8f37c Message: Report actual progress Modified files: lib/droonga/plugins/system/absorb_data.rb Modified: lib/droonga/plugins/system/absorb_data.rb (+109 -36) =================================================================== --- lib/droonga/plugins/system/absorb_data.rb 2015-04-21 22:05:09 +0900 (c0d8a13) +++ lib/droonga/plugins/system/absorb_data.rb 2015-04-21 22:29:59 +0900 (ac445ab) @@ -49,7 +49,7 @@ module Droonga on_start count_total_n_objects do |n_objects| - @initial_n_objects = n_objects + @n_initial_objects = n_objects do_absorb end @@ -60,29 +60,13 @@ module Droonga logger.trace("do_absorb: start") @dumper_error_message = nil - @dumper = Drndump::DumpClient.new(dumper_params) - @dumper.on_finish = lambda do - ensure_completely_restored do - on_finish - logger.trace("start: finish") - end - end - @dumper.on_progress = lambda do |message| - logger.trace("dump progress", - :message => message) - end - @dumper.on_error = lambda do |error| - if error.is_a?(Exception) - logger.exception("unexpected exception while dump", - error) - else - logger.error("unexpected error while dump", - :error => error) - end - end - + @n_restored_objects = 0 + @measure_start_time = Time.now + @previous_measure_time = @measure_start_time + @previous_n_restored_objects = 0.0 @previous_report_time = Time.now + @dumper = create_dumper begin logger.info("starting to absorb the source dataset") @dumper_error_message =****@dumpe*****(dump_options) do |message| @@ -92,12 +76,7 @@ module Droonga @messenger.forward(message, "to" => my_node_name, "type" => message["type"]) - now = Time.now - elapsed_seconds = (now - @previous_report_time).to_i - if elapsed_seconds >= progress_interval_seconds - @previous_report_time = now - report_progress - end + try_report_progress(:count_restored_objects => true) rescue Exception => exception @dumper_error_message = exception.to_s logger.exception("failed to process progress", @@ -134,11 +113,9 @@ module Droonga n_expected_objects =****@dumpe*****_forecasted_messages while not completely_restored count_total_n_objects do |count| - n_restored_objects = count - @initial_n_objects - logger.trace("ensure_completely_restored: check", - :current => n_restored_objects, - :forecasted => n_expected_objects) - completely_restored ||= n_restored_objects == n_expected_objects + @n_restored_objects = count - @n_initial_objects + completely_restored ||= @n_restored_objects == n_expected_objects + try_report_progress end Fiber.yield end @@ -211,6 +188,30 @@ module Droonga super end + def create_dumper + dumper = Drndump::DumpClient.new(dumper_params) + dumper.on_finish = lambda do + ensure_completely_restored do + on_finish + logger.trace("start: finish") + end + end + dumper.on_progress = lambda do |message| + logger.trace("dump progress", + :message => message) + end + dumper.on_error = lambda do |error| + if error.is_a?(Exception) + logger.exception("unexpected exception while dump", + error) + else + logger.error("unexpected error while dump", + :error => error) + end + end + dumper + end + def dumper_params { :host => source_host, @@ -231,15 +232,87 @@ module Droonga } end + def try_report_progress(options={}) + now = Time.now + elapsed_seconds = (now - @previous_report_time).to_i + if elapsed_seconds >= progress_interval_seconds + if options[:count_restored_objects] + count_total_n_objects do |count| + @previous_report_time = Time.now + @n_restored_objects = count - @n_initial_objects + try_report_progress + end + else + @previous_report_time = now + report_progress + end + end + end + def report_progress - message = "#{@dumper.progress_percentage}% done " + - "(maybe #{@dumper.formatted_remaining_time} remaining)" + message = "#{progress_percentage}% done " + + "(maybe #{formatted_remaining_time} remaining)" forward("#{prefix}.progress", "nProcessedMessages" => @dumper.n_received_messages, - "percentage" => @dumper.progress_percentage, + "nRestoredObjects" => @n_restored_objects, + "percentage" => progress_percentage, "message" => message) end + MIN_REPORTED_THROUGHPUT = 0.01 + + def recent_throughput + now = Time.now + n_objects = @n_restored_objects - @previous_n_restored_objects + + if now - @previous_measure_time < 1 + now = @previous_measure_time + n_objects = @previous_n_restored_objects + else + @previous_measure_time = now + @previous_n_restored_objects = n_objects.to_f + end + + if now == @measure_start_time + actual_throughput = 0 + else + elapsed_seconds = now - @measure_start_time + actual_throughput = n_objects / elapsed_seconds + end + + [actual_throughput, MIN_REPORTED_THROUGHPUT].max + end + + def n_remaining_objects + [@dumper.n_forecasted_messages - @n_restored_objects, 0].max + end + + def remaining_seconds + throughput = [recent_throughput, messages_per_second].min + remaining_seconds = n_remaining_objects.to_f / throughput + @dumper.remaining_seconds + remaining_seconds + end + + ONE_MINUTE_IN_SECONDS = 60 + ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60 + + def formatted_remaining_time + seconds = remaining_seconds + hours = (seconds / ONE_HOUR_IN_SECONDS).floor + seconds -= hours * ONE_HOUR_IN_SECONDS + minutes = (seconds / ONE_MINUTE_IN_SECONDS).floor + seconds -= minutes * ONE_MINUTE_IN_SECONDS + sprintf("%02i:%02i:%02i", hours, minutes, seconds) + end + + def progress_percentage + return 0 if****@dumpe*****_forecasted_messages.zero? + processed =****@dumpe*****_received_messages + @n_restored_objects + expected =****@dumpe*****_forecasted_messages * 2 + progress = processed / expected + [(progress * 100).to_i, 100].min + end + def myself @myself ||= NodeName.parse(my_node_name) end -------------- next part -------------- HTML����������������������������... Descargar