[Groonga-commit] droonga/droonga-engine at b3078ff [buffered-forward] Implement ForwardBuffer

Back to archive index

YUKI Hiroshi null+****@clear*****
Thu Dec 18 18:14:28 JST 2014


YUKI Hiroshi	2014-12-18 18:14:28 +0900 (Thu, 18 Dec 2014)

  New Revision: b3078ff2defb8f83637f7b68a346a43265e613bb
  https://github.com/droonga/droonga-engine/commit/b3078ff2defb8f83637f7b68a346a43265e613bb

  Message:
    Implement ForwardBuffer

  Added files:
    lib/droonga/forward_buffer.rb
  Modified files:
    lib/droonga/engine_state.rb
    lib/droonga/forwarder.rb

  Modified: lib/droonga/engine_state.rb (+17 -3)
===================================================================
--- lib/droonga/engine_state.rb    2014-12-18 17:20:47 +0900 (f11ccdb)
+++ lib/droonga/engine_state.rb    2014-12-18 18:14:28 +0900 (d7c2ee8)
@@ -41,7 +41,9 @@ module Droonga
       @internal_name = internal_name
       @sessions = {}
       @current_id = 0
-      @forwarder = Forwarder.new(@loop, :buffering => true)
+      @forwarder = Forwarder.new(@loop,
+                                 :buffering => true,
+                                 :engine_state => self)
       @replier = Replier.new(@forwarder)
       @on_ready = nil
       @on_finish = nil
@@ -65,6 +67,18 @@ module Droonga
       route.start_with?(@name) or route.start_with?(@internal_name)
     end
 
+    def unwritable_node?(node_name)
+      case node_status.role
+      when NodeStatus::Role::SERVICE_PROVIDER
+        absorb_source_nodes.include?(node_name) or
+          absorb_destination_nodes.include?(node_name)
+      when NodeStatus::Role::ABSORB_SOURCE
+        absorb_destination_nodes.include?(node_name)
+      else
+        false
+      end
+    end
+
     def farm_path(route)
       if /\A[^:]+:\d+\/[^.]+/ =~ route
         name = $MATCH
@@ -129,7 +143,7 @@ module Droonga
       if @live_nodes_list
         @live_nodes_list.absorb_source_nodes
       else
-        all_nodes
+        []
       end
     end
 
@@ -137,7 +151,7 @@ module Droonga
       if @live_nodes_list
         @live_nodes_list.absorb_destination_nodes
       else
-        all_nodes
+        []
       end
     end
 

  Added: lib/droonga/forward_buffer.rb (+87 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/forward_buffer.rb    2014-12-18 18:14:28 +0900 (d88a24f)
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+
+require "fileutils"
+require "json"
+require "pathname"
+
+require "droonga/loggable"
+require "droonga/path"
+require "droonga/safe_file_writer"
+
+module Droonga
+  class ForwardBuffer
+    include Loggable
+
+    SUFFIX = ".json"
+
+    def initialize(node_name, params)
+      @node_name = node_name
+      @forwarder = params[:forwarder]
+
+      @data_directory = Path.intentional_buffer + "#{@node_name}"
+      FileUtils.mkdir_p(@data_directory.to_s)
+    end
+
+    def add(receiver, message, command, arguments, options)
+      logger.trace("add: start")
+      buffered_message = {
+        "receiver"  => receiver,
+        "message"   => message,
+        "command"   => command,
+        "arguments" => arguments,
+        "options"   => options,
+      }
+      SafeFileWriter.write(file_path) do |output, file|
+        output.puts(JSON.generate(buffered_message))
+      end
+      logger.trace("add: done")
+    end
+
+    def resume
+      logger.trace("resume: start")
+      Pathname.glob("#{@data_directory}/*#{SUFFIX}").collect do |buffered_message_path|
+        output(buffered_message_path)
+      end
+      logger.trace("resume: done")
+    end
+
+    def empty?
+      @data_directory.children.empty?
+    end
+
+    private
+    def output(buffered_message_path)
+      file_contents = Pathname(buffered_message_path).read
+      buffered_message = JSON.parse(file_contents)
+      @forwarder.output(buffered_message["receiver"],
+                        buffered_message["message"],
+                        buffered_message["command"],
+                        buffered_message["arguments"],
+                        buffered_message["options"])
+      FileUtils.rm_f(buffered_message_path.to_s)
+    end
+
+    def file_path(timestamp=Time.now)
+      @data_directory + "#{time_stamp.iso8601(6)}.#{SUFFIX}"
+    end
+
+    def log_tag
+      "[#{Process.ppid}] forward-buffer"
+    end
+  end
+end

  Modified: lib/droonga/forwarder.rb (+35 -2)
===================================================================
--- lib/droonga/forwarder.rb    2014-12-18 17:20:47 +0900 (52e938b)
+++ lib/droonga/forwarder.rb    2014-12-18 18:14:28 +0900 (a3a95eb)
@@ -19,6 +19,7 @@ require "droonga/loggable"
 require "droonga/path"
 require "droonga/event_loop"
 require "droonga/buffered_tcp_socket"
+require "droonga/forward_buffer"
 require "droonga/fluent_message_sender"
 
 module Droonga
@@ -28,6 +29,8 @@ module Droonga
     def initialize(loop, options={})
       @loop = loop
       @buffering = options[:buffering]
+      @engine_state = options[:engine_state]
+      @buffers = {}
       @senders = {}
     end
 
@@ -50,11 +53,15 @@ module Droonga
       command = destination["type"]
       receiver = destination["to"]
       arguments = destination["arguments"]
-      output(receiver, message, command, arguments)
+      buffered_output(receiver, message, command, arguments)
       logger.trace("forward: done")
     end
 
     def resume
+      resume_from_accidents
+    end
+
+    def resume_from_accidents
       return unless Path.accidental_buffer.exist?
       Pathname.glob("#{Path.accidental_buffer}/*") do |path|
         next unless path.directory?
@@ -83,7 +90,6 @@ module Droonga
       end
     end
 
-    private
     def output(receiver, message, command, arguments, options={})
       logger.trace("output: start")
       if not receiver.is_a?(String) or not command.is_a?(String)
@@ -95,6 +101,7 @@ module Droonga
       unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/
         raise "format: hostname:port/tag(?params)"
       end
+
       host = $1
       port = $2
       tag  = $3
@@ -120,6 +127,27 @@ module Droonga
       logger.trace("output: done")
     end
 
+    private
+    def buffered_output(receiver, message, command, arguments, options={})
+      receiver_is_node = (receiver =~ /\A([^:]+:\d+\/[^\.]+)/)
+      node_name = $1
+      unless receiver_is_node
+        output(receiver, message, command, arguments, options)
+        return
+      end
+      
+      buffer = buffer_for(node_name)
+      if @engine_state and
+           @engine_state.unwritable_node?(node_name)
+        buffer.add(receiver, message, command, arguments, options)
+      elsif buffer.empty?
+        output(receiver, message, command, arguments, options)
+      else
+        buffer.add(receiver, message, command, arguments, options)
+        buffer.resume
+      end
+    end
+
     def find_sender(host, port, params)
       connection_id = extract_connection_id(params)
       destination = "#{host}:#{port}"
@@ -147,6 +175,11 @@ module Droonga
       sender
     end
 
+    def buffer_for(node_name)
+      @buffers[node_name] ||= ForwardBuffer.new(node_name,
+                                                :forwarder => self)
+    end
+
     def log_tag
       "[#{Process.ppid}] forwarder"
     end
-------------- next part --------------
HTML����������������������������...
Descargar 



More information about the Groonga-commit mailing list
Back to archive index