[Groonga-commit] droonga/droonga-engine at 8e0a473 [master] Report actual progress

Back to archive index

YUKI Hiroshi null+****@clear*****
Tue Apr 21 22:29:59 JST 2015


YUKI Hiroshi	2015-04-21 22:29:59 +0900 (Tue, 21 Apr 2015)

  New Revision: 8e0a473192c92af412ed9f65d740680f50e8f37c
  https://github.com/droonga/droonga-engine/commit/8e0a473192c92af412ed9f65d740680f50e8f37c

  Message:
    Report actual progress

  Modified files:
    lib/droonga/plugins/system/absorb_data.rb

  Modified: lib/droonga/plugins/system/absorb_data.rb (+109 -36)
===================================================================
--- lib/droonga/plugins/system/absorb_data.rb    2015-04-21 22:05:09 +0900 (c0d8a13)
+++ lib/droonga/plugins/system/absorb_data.rb    2015-04-21 22:29:59 +0900 (ac445ab)
@@ -49,7 +49,7 @@ module Droonga
             on_start
 
             count_total_n_objects do |n_objects|
-              @initial_n_objects = n_objects
+              @n_initial_objects = n_objects
               do_absorb
             end
 
@@ -60,29 +60,13 @@ module Droonga
             logger.trace("do_absorb: start")
             @dumper_error_message = nil
 
-            @dumper = Drndump::DumpClient.new(dumper_params)
-            @dumper.on_finish = lambda do
-              ensure_completely_restored do
-                on_finish
-                logger.trace("start: finish")
-              end
-            end
-            @dumper.on_progress = lambda do |message|
-              logger.trace("dump progress",
-                           :message => message)
-            end
-            @dumper.on_error = lambda do |error|
-              if error.is_a?(Exception)
-                logger.exception("unexpected exception while dump",
-                                 error)
-              else
-                logger.error("unexpected error while dump",
-                             :error => error)
-              end
-            end
-
+            @n_restored_objects = 0
+            @measure_start_time = Time.now
+            @previous_measure_time = @measure_start_time
+            @previous_n_restored_objects = 0.0
             @previous_report_time = Time.now
 
+            @dumper = create_dumper
             begin
               logger.info("starting to absorb the source dataset")
               @dumper_error_message =****@dumpe*****(dump_options) do |message|
@@ -92,12 +76,7 @@ module Droonga
                   @messenger.forward(message,
                                      "to"   => my_node_name,
                                      "type" => message["type"])
-                  now = Time.now
-                  elapsed_seconds = (now - @previous_report_time).to_i
-                  if elapsed_seconds >= progress_interval_seconds
-                    @previous_report_time = now
-                    report_progress
-                  end
+                  try_report_progress(:count_restored_objects => true)
                 rescue Exception => exception
                   @dumper_error_message = exception.to_s
                   logger.exception("failed to process progress",
@@ -134,11 +113,9 @@ module Droonga
               n_expected_objects =****@dumpe*****_forecasted_messages
               while not completely_restored
                 count_total_n_objects do |count|
-                  n_restored_objects = count - @initial_n_objects
-                  logger.trace("ensure_completely_restored: check",
-                               :current    => n_restored_objects,
-                               :forecasted => n_expected_objects)
-                  completely_restored ||= n_restored_objects == n_expected_objects
+                  @n_restored_objects = count - @n_initial_objects
+                  completely_restored ||= @n_restored_objects == n_expected_objects
+                  try_report_progress
                 end
                 Fiber.yield
               end
@@ -211,6 +188,30 @@ module Droonga
             super
           end
 
+          def create_dumper
+            dumper = Drndump::DumpClient.new(dumper_params)
+            dumper.on_finish = lambda do
+              ensure_completely_restored do
+                on_finish
+                logger.trace("start: finish")
+              end
+            end
+            dumper.on_progress = lambda do |message|
+              logger.trace("dump progress",
+                           :message => message)
+            end
+            dumper.on_error = lambda do |error|
+              if error.is_a?(Exception)
+                logger.exception("unexpected exception while dump",
+                                 error)
+              else
+                logger.error("unexpected error while dump",
+                             :error => error)
+              end
+            end
+            dumper
+          end
+
           def dumper_params
             {
               :host    => source_host,
@@ -231,15 +232,87 @@ module Droonga
             }
           end
 
+          def try_report_progress(options={})
+            now = Time.now
+            elapsed_seconds = (now - @previous_report_time).to_i
+            if elapsed_seconds >= progress_interval_seconds
+              if options[:count_restored_objects]
+                count_total_n_objects do |count|
+                  @previous_report_time = Time.now
+                  @n_restored_objects = count - @n_initial_objects
+                  try_report_progress
+                end
+              else
+                @previous_report_time = now
+                report_progress
+              end
+            end
+          end
+
           def report_progress
-            message = "#{@dumper.progress_percentage}% done " +
-                        "(maybe #{@dumper.formatted_remaining_time} remaining)"
+            message = "#{progress_percentage}% done " +
+                        "(maybe #{formatted_remaining_time} remaining)"
             forward("#{prefix}.progress",
                     "nProcessedMessages" => @dumper.n_received_messages,
-                    "percentage"         => @dumper.progress_percentage,
+                    "nRestoredObjects"   => @n_restored_objects,
+                    "percentage"         => progress_percentage,
                     "message"            => message)
           end
 
+          MIN_REPORTED_THROUGHPUT = 0.01
+
+          def recent_throughput
+            now = Time.now
+            n_objects = @n_restored_objects - @previous_n_restored_objects
+
+            if now - @previous_measure_time < 1
+              now = @previous_measure_time
+              n_objects = @previous_n_restored_objects
+            else
+              @previous_measure_time = now
+              @previous_n_restored_objects = n_objects.to_f
+            end
+
+            if now == @measure_start_time
+              actual_throughput = 0
+            else
+              elapsed_seconds = now - @measure_start_time
+              actual_throughput = n_objects / elapsed_seconds
+            end
+
+            [actual_throughput, MIN_REPORTED_THROUGHPUT].max
+          end
+
+          def n_remaining_objects
+            [@dumper.n_forecasted_messages - @n_restored_objects, 0].max
+          end
+
+          def remaining_seconds
+            throughput = [recent_throughput, messages_per_second].min
+            remaining_seconds = n_remaining_objects.to_f / throughput
+            @dumper.remaining_seconds + remaining_seconds
+          end
+
+          ONE_MINUTE_IN_SECONDS = 60
+          ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60
+
+          def formatted_remaining_time
+            seconds  = remaining_seconds
+            hours    = (seconds / ONE_HOUR_IN_SECONDS).floor
+            seconds -= hours * ONE_HOUR_IN_SECONDS
+            minutes  = (seconds / ONE_MINUTE_IN_SECONDS).floor
+            seconds -= minutes * ONE_MINUTE_IN_SECONDS
+            sprintf("%02i:%02i:%02i", hours, minutes, seconds)
+          end
+
+          def progress_percentage
+            return 0 if****@dumpe*****_forecasted_messages.zero?
+            processed =****@dumpe*****_received_messages + @n_restored_objects
+            expected  =****@dumpe*****_forecasted_messages * 2
+            progress  = processed / expected
+            [(progress * 100).to_i, 100].min
+          end
+
           def myself
             @myself ||= NodeName.parse(my_node_name)
           end
-------------- next part --------------
HTML����������������������������...
Descargar 



More information about the Groonga-commit mailing list
Back to archive index