[Groonga-commit] droonga/droonga-engine at 71eb380 [master] Isolate Droonga::DataAbsorber from external commands

Back to archive index

YUKI Hiroshi null+****@clear*****
Fri Apr 10 17:58:03 JST 2015


YUKI Hiroshi	2015-04-10 17:58:03 +0900 (Fri, 10 Apr 2015)

  New Revision: 71eb380b00fb3d79614a6c65ed841fa6d8de9924
  https://github.com/droonga/droonga-engine/commit/71eb380b00fb3d79614a6c65ed841fa6d8de9924

  Message:
    Isolate Droonga::DataAbsorber from external commands

  Modified files:
    lib/droonga/data_absorber.rb

  Modified: lib/droonga/data_absorber.rb (+52 -103)
===================================================================
--- lib/droonga/data_absorber.rb    2015-04-10 17:56:47 +0900 (5ec3b25)
+++ lib/droonga/data_absorber.rb    2015-04-10 17:58:03 +0900 (1269f41)
@@ -13,8 +13,6 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
-require "open3"
-
 require "droonga/loggable"
 require "droonga/client"
 require "droonga/catalog_generator"
@@ -43,18 +41,14 @@ module Droonga
 
     attr_reader :params
     attr_reader :dataset, :port, :tag, :messages_per_second
-    attr_reader :source_host, :destination_host, :receiver_host, :receiver_port
+    attr_reader :source_host, :destination_host
+    attr_reader :error_message
 
     def initialize(params)
       @params = params
 
       @messages_per_second = @params[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND
 
-      @drndump = @params[:drndump] || "drndump"
-      # We should use droonga-send instead of droonga-request,
-      # because droonga-request is too slow.
-      @client = @params[:client] || "droonga-send"
-
       @dataset = @params[:dataset] || CatalogGenerator::DEFAULT_DATASET
       @port    = @params[:port]    || CatalogGenerator::DEFAULT_PORT
       @tag     = @params[:tag]     || CatalogGenerator::DEFAULT_TAG
@@ -65,58 +59,75 @@ module Droonga
 
       @receiver_port = @params[:receiver_port]
 
+      @destination_client_options = @params[:client_options] || {}
+
+      @error_message = nil
+
       #XXX We must instantiate the number of total soruce records before absorbing,
       #    because parallel commands while doing "dump" can be timed out.
-      @required_time_in_seconds = calculate_required_time_in_seconds
+      @total_n_source_records = count_total_n_source_records
     end
 
-    MESSAGES_PER_SECOND_MATCHER = /(\d+(\.\d+)?) messages\/second/
-
-    def absorb
-      drndump_command_line = [@drndump] + drndump_options
-      client_command_line  = [@client] + client_options(@client)
-
-      start_time_in_seconds = Time.new.to_i
-      env = {}
-      Open3.pipeline_r([env, *drndump_command_line],
-                       [env, *client_command_line]) do |last_stdout, thread|
-        last_stdout.each do |output|
-          if block_given?
-            messages_per_second = nil
-            if output =~ MESSAGES_PER_SECOND_MATCHER
-              messages_per_second = $1.to_f
+    def run
+      n_absorbers = 0
+
+      absorb_message = {
+        "type" => "system.absorb-data",
+        "body" => {
+          "host"    => @source_host,
+          "port"    => @port,
+          "tag"     => @tag,
+          "dataset" => @dataset,
+          "messagesPerSecond" => @messages_per_second,
+        },
+      }
+      destination_client.subscribe(absorb_message) do |message|
+        case message
+        when Droonga::Client::Error
+          destination_client.close
+          @error_message = message.to_s
+        else
+          case message["type"]
+          when "system.absorb-data.result", "system.absorb-data.error"
+            if message["statusCode"] != 200
+              client.close
+              error = message["body"]
+              @error_message = "#{error['name']}: #{error['message']}"
             end
-            yield(:progress => report_progress(start_time_in_seconds),
-                  :output   => output,
-                  :messages_per_second => messages_per_second)
+          when "system.absorb-data.progress"
+            @n_prosessed_messages = message["body"]["count"]
+            yield(:n_processed_messages => @n_processed_messages,
+                  :percentage           => progress_percentage,
+                  :message              => progress_message)
+          when "system.absorb-data.start"
+            n_absorbers += 1
+          when "system.absorb-data.end"
+            n_absorbers -= 1
+            client.close if n_absorbers <= 0
           end
         end
       end
     end
 
-    def can_report_remaining_time?
-      @required_time_in_seconds != Droonga::DataAbsorber::TIME_UNKNOWN and
-        @required_time_in_seconds > 0
-    end
-
     ONE_MINUTE_IN_SECONDS = 60
     ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60
 
-    def report_progress(start_time_in_seconds)
-      return nil unless can_report_remaining_time?
+    def progress_percentage
+      progress = @n_prosessed_messages / @total_n_source_records
+      [(progress * 100).to_i, 100].min
+    end
 
-      elapsed_time = Time.new.to_i - start_time_in_seconds
-      progress = elapsed_time.to_f / @required_time_in_seconds
-      progress = [(progress * 100).to_i, 100].min
+    def progress_message
+      n_remaining_records = [@total_n_source_records - @n_prosessed_messages, 0].max
 
-      remaining_seconds  = [@required_time_in_seconds - elapsed_time, 0].max
+      remaining_seconds  = n_remaining_records / @messages_per_second
       remaining_hours    = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
       remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
       remaining_minutes  = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
       remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
       remaining_time     = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds)
 
-      "#{progress}% done (maybe #{remaining_time} remaining)"
+      "#{progress_percentage}% done (maybe #{remaining_time} remaining)"
     end
 
     def source_client
@@ -139,7 +150,7 @@ module Droonga
         :progocol      => :droonga,
         :receiver_host => @receiver_host,
         :receiver_port => 0,
-      }
+      }.merge(@destination_client_options)
       @destination_client ||= Droonga::Client.new(options)
     end
 
@@ -148,68 +159,6 @@ module Droonga
     end
 
     private
-    def calculate_required_time_in_seconds
-      if****@clien*****?("droonga-send")
-        total_n_source_records / @messages_per_second
-      else
-        TIME_UNKNOWN
-      end
-    end
-
-    def drndump_options
-      options = []
-      options += ["--host", @source_host] if @source_host
-      options += ["--port", @port]
-      options += ["--tag", @tag]
-      options += ["--dataset", @dataset]
-      options += ["--receiver-host", @receiver_host]
-      options += ["--receiver-port", @receiver_port] if @receiver_port
-      options.collect(&:to_s)
-    end
-
-    def droonga_request_options
-      options = []
-      options += ["--host", @destination_host]
-      options += ["--port", @port]
-      options += ["--tag", @tag]
-      options += ["--receiver-host", @receiver_host]
-      options += ["--receiver-port", @receiver_port] if @receiver_port
-      options.collect(&:to_s)
-    end
-
-    def droonga_send_options
-      options = []
-  
-      #XXX Don't use round-robin with multiple endpoints
-      #    even if there are too much data.
-      #    Schema and indexes must be sent to just one endpoint
-      #    to keep their order, but currently there is no way to
-      #    extract only schema and indexes via drndump.
-      #    So, we always use just one endpoint for now,
-      #    even if there are too much data.
-      server = "droonga:#{params[:destination_host]}"
-      server = "#{server}:#{params[:port].to_s}"
-      server = "#{server}/#{params[:tag].to_s}"
-      options += ["--server", server]
-  
-      #XXX We should restrict the traffic to avoid overflowing!
-      options += ["--messages-per-second", @messages_per_second]
-
-      options += ["--report-throughput"]
-  
-      options.collect(&:to_s)
-    end
-
-    def client_options(client)
-      if client.include?("droonga-request")
-        droonga_request_options
-      elsif client.include?("droonga-send")
-        droonga_send_options
-      else
-        raise ArgumentError.new("Unknwon type client: #{client}")
-      end
-    end
-
     def source_tables
       response = source_client.request("dataset" => @dataset,
                                        "type"    => "table_list")
@@ -225,7 +174,7 @@ module Droonga
       end
     end
 
-    def total_n_source_records
+    def count_total_n_source_records
       queries = {}
       source_tables.each do |table|
         queries["n_records_of_#{table}"] = {
-------------- next part --------------
HTML����������������������������...
Descargar 



More information about the Groonga-commit mailing list
Back to archive index