[Groonga-commit] groonga/groonga at 732596f [support-arrow] Support dump and load Apache Arrow file format

Back to archive index

Kouhei Sutou null+****@clear*****
Mon May 15 11:35:16 JST 2017


Kouhei Sutou	2017-05-15 11:35:16 +0900 (Mon, 15 May 2017)

  New Revision: 732596f3caa9a65800771bdde969e8c61c8614a8
  https://github.com/groonga/groonga/commit/732596f3caa9a65800771bdde969e8c61c8614a8

  Message:
    Support dump and load Apache Arrow file format

  Added files:
    groonga-arrow.pc.in
  Copied files:
    include/groonga/arrow.h
      (from include/groonga.hpp)
  Modified files:
    .gitignore
    Makefile.am
    configure.ac
    include/groonga.h
    include/groonga.hpp
    include/groonga/Makefile.am
    include/groonga/arrow.hpp
    lib/arrow.cpp

  Modified: .gitignore (+1 -0)
===================================================================
--- .gitignore    2017-05-13 10:49:02 +0900 (04ccca2)
+++ .gitignore    2017-05-15 11:35:16 +0900 (7a1817b)
@@ -34,6 +34,7 @@ cmake_install.cmake
 /missing
 /test-driver
 /groonga.pc
+/groonga-arrow.pc
 /groonga-httpd-conf.sh
 /data/groonga-httpd.conf
 /data/logrotate.d/centos/groonga-httpd

  Modified: Makefile.am (+3 -0)
===================================================================
--- Makefile.am    2017-05-13 10:49:02 +0900 (0721be8)
+++ Makefile.am    2017-05-15 11:35:16 +0900 (1fc7028)
@@ -34,6 +34,9 @@ EXTRA_DIST =					\
 
 pkgconfigdir = $(libdir)/pkgconfig
 pkgconfig_DATA = groonga.pc
+if GRN_WITH_ARROW
+pkgconfig_DATA += groonga-arrow.pc
+endif
 
 .PHONY: FORCE
 

  Modified: configure.ac (+2 -0)
===================================================================
--- configure.ac    2017-05-13 10:49:02 +0900 (7724f82)
+++ configure.ac    2017-05-15 11:35:16 +0900 (5cafef0)
@@ -982,6 +982,7 @@ if test "x$enable_arrow" != "xno"; then
     fi
   fi
 fi
+AM_CONDITIONAL([GRN_WITH_ARROW], [test "$arrow_available" = "yes"])
 
 # MeCab
 # NOTE: MUST be checked last
@@ -1727,6 +1728,7 @@ AC_OUTPUT([
   packages/apt/env.sh
   packages/yum/env.sh
   groonga.pc
+  groonga-arrow.pc
   config.sh
   groonga-httpd-conf.sh
   data/groonga-httpd.conf

  Added: groonga-arrow.pc.in (+4 -0) 100644
===================================================================
--- /dev/null
+++ groonga-arrow.pc.in    2017-05-15 11:35:16 +0900 (d0b22f6)
@@ -0,0 +1,4 @@
+Name: Groonga Arrow
+Description: Apache Arrow support for Groonga
+Version: @VERSION@
+Requires: groonga arrow

  Modified: include/groonga.h (+1 -0)
===================================================================
--- include/groonga.h    2017-05-13 10:49:02 +0900 (9df6247)
+++ include/groonga.h    2017-05-15 11:35:16 +0900 (6e1be29)
@@ -23,6 +23,7 @@
 
 #include "groonga/accessor.h"
 #include "groonga/array.h"
+#include "groonga/arrow.h"
 #include "groonga/cache.h"
 #include "groonga/column.h"
 #include "groonga/config.h"

  Modified: include/groonga.hpp (+0 -2)
===================================================================
--- include/groonga.hpp    2017-05-13 10:49:02 +0900 (d847d03)
+++ include/groonga.hpp    2017-05-15 11:35:16 +0900 (3d8313b)
@@ -19,5 +19,3 @@
 #pragma once
 
 #include "groonga.h"
-
-#include "groonga/arrow.hpp"

  Modified: include/groonga/Makefile.am (+1 -0)
===================================================================
--- include/groonga/Makefile.am    2017-05-13 10:49:02 +0900 (034036a)
+++ include/groonga/Makefile.am    2017-05-15 11:35:16 +0900 (7cc4d56)
@@ -3,6 +3,7 @@ groonga_include_HEADERS =			\
 	accessor.h				\
 	array.h					\
 	arrow.h					\
+	arrow.hpp				\
 	cache.h					\
 	column.h				\
 	command.h				\

  Copied: include/groonga/arrow.h (+13 -2) 69%
===================================================================
--- include/groonga.hpp    2017-05-13 10:49:02 +0900 (d847d03)
+++ include/groonga/arrow.h    2017-05-15 11:35:16 +0900 (57f3abf)
@@ -18,6 +18,17 @@
 
 #pragma once
 
-#include "groonga.h"
+#ifdef  __cplusplus
+extern "C" {
+#endif
 
-#include "groonga/arrow.hpp"
+GRN_API grn_rc grn_arrow_load(grn_ctx *ctx,
+                              grn_obj *table,
+                              const char *path);
+GRN_API grn_rc grn_arrow_dump(grn_ctx *ctx,
+                              grn_obj *table,
+                              const char *path);
+
+#ifdef __cplusplus
+}
+#endif

  Modified: include/groonga/arrow.hpp (+2 -0)
===================================================================
--- include/groonga/arrow.hpp    2017-05-13 10:49:02 +0900 (ba9bc35)
+++ include/groonga/arrow.hpp    2017-05-15 11:35:16 +0900 (a35a4ab)
@@ -17,3 +17,5 @@
 */
 
 #pragma once
+
+#include <groonga.hpp>

  Modified: lib/arrow.cpp (+710 -50)
===================================================================
--- lib/arrow.cpp    2017-05-13 10:49:02 +0900 (92b1015)
+++ lib/arrow.cpp    2017-05-15 11:35:16 +0900 (cbefa18)
@@ -17,67 +17,343 @@
 */
 
 #include "grn.h"
-#include <groonga.hpp>
+
+#ifdef GRN_WITH_ARROW
+#include "grn_db.h"
+
+#include <groonga/arrow.hpp>
 
 #include <arrow/api.h>
 #include <arrow/io/file.h>
 #include <arrow/ipc/api.h>
 
+#include <sstream>
+
 namespace grnarrow {
-  class ColumnImportVisitor : public arrow::ArrayVisitor {
+  grn_rc status_to_rc(arrow::Status &status) {
+    switch (status.code()) {
+    case arrow::StatusCode::OK:
+      return GRN_SUCCESS;
+    case arrow::StatusCode::OutOfMemory:
+      return GRN_NO_MEMORY_AVAILABLE;
+    case arrow::StatusCode::KeyError:
+      return GRN_INVALID_ARGUMENT; // TODO
+    case arrow::StatusCode::TypeError:
+      return GRN_INVALID_ARGUMENT; // TODO
+    case arrow::StatusCode::Invalid:
+      return GRN_INVALID_ARGUMENT;
+    case arrow::StatusCode::IOError:
+      return GRN_INPUT_OUTPUT_ERROR;
+    case arrow::StatusCode::UnknownError:
+      return GRN_UNKNOWN_ERROR;
+    case arrow::StatusCode::NotImplemented:
+      return GRN_FUNCTION_NOT_IMPLEMENTED;
+    default:
+      return GRN_UNKNOWN_ERROR;
+    }
+  }
+
+  grn_bool check_status(grn_ctx *ctx,
+                        arrow::Status &status,
+                        const char *context) {
+    if (status.ok()) {
+      return GRN_TRUE;
+    } else {
+      auto rc = status_to_rc(status);
+      auto message = status.ToString();
+      ERR(rc, "%s: %s", context, message.c_str());
+      return GRN_FALSE;
+    }
+  }
+
+  grn_bool check_status(grn_ctx *ctx,
+                        arrow::Status &status,
+                        std::ostream &output) {
+    return check_status(ctx,
+                        status,
+                        static_cast<std::stringstream &>(output).str().c_str());
+  }
+
+  class ColumnLoadVisitor : public arrow::ArrayVisitor {
   public:
-    ColumnImportVisitor(grn_ctx *ctx,
-                        const grn_id *ids,
-                        grn_obj *grn_column,
-                        std::shared_ptr<arrow::Array> &arrow_array)
+    ColumnLoadVisitor(grn_ctx *ctx,
+                      grn_obj *grn_table,
+                      std::shared_ptr<arrow::Column> &arrow_column,
+                      const grn_id *ids)
       : ctx_(ctx),
+        grn_table_(grn_table),
         ids_(ids),
-        grn_column_(grn_column),
-        arrow_array_(arrow_array) {
-      switch (grn_column_->header.type) {
-      case GRN_DB_BOOL :
-        GRN_BOOL_INIT(&buffer_, 0);
+        time_unit_(arrow::TimeUnit::SECOND) {
+      auto column_name = arrow_column->name();
+      grn_column_ = grn_obj_column(ctx_, grn_table_,
+                                   column_name.data(),
+                                   column_name.size());
+
+      auto arrow_type = arrow_column->type();
+      grn_id type_id;
+      switch (arrow_type->id()) {
+      case arrow::Type::BOOL :
+        type_id = GRN_DB_BOOL;
+        break;
+      case arrow::Type::UINT8 :
+        type_id = GRN_DB_UINT8;
+        break;
+      case arrow::Type::INT8 :
+        type_id = GRN_DB_INT8;
+        break;
+      case arrow::Type::UINT16 :
+        type_id = GRN_DB_UINT16;
+        break;
+      case arrow::Type::INT16 :
+        type_id = GRN_DB_INT16;
+        break;
+      case arrow::Type::UINT32 :
+        type_id = GRN_DB_UINT32;
+        break;
+      case arrow::Type::INT32 :
+        type_id = GRN_DB_INT32;
+        break;
+      case arrow::Type::UINT64 :
+        type_id = GRN_DB_UINT64;
+        break;
+      case arrow::Type::INT64 :
+        type_id = GRN_DB_INT64;
+        break;
+      case arrow::Type::HALF_FLOAT :
+      case arrow::Type::FLOAT :
+      case arrow::Type::DOUBLE :
+        type_id = GRN_DB_FLOAT;
+        break;
+      case arrow::Type::STRING :
+        type_id = GRN_DB_TEXT;
+        break;
+      case arrow::Type::DATE64 :
+        type_id = GRN_DB_TIME;
+        break;
+      case arrow::Type::TIMESTAMP :
+        type_id = GRN_DB_TIME;
+        {
+          auto arrow_timestamp_type =
+            std::static_pointer_cast<arrow::TimestampType>(arrow_type);
+          time_unit_ = arrow_timestamp_type->unit();
+        }
+        break;
       default :
-        GRN_VOID_INIT(&buffer_);
+        type_id = GRN_DB_VOID;
         break;
       }
+
+      if (type_id == GRN_DB_VOID) {
+        // TODO
+        return;
+      }
+
+      if (!grn_column_) {
+        grn_column_ = grn_column_create(ctx_,
+                                        grn_table_,
+                                        column_name.data(),
+                                        column_name.size(),
+                                        NULL,
+                                        GRN_OBJ_COLUMN_SCALAR,
+                                        grn_ctx_at(ctx_, type_id));
+      }
+      if (type_id == GRN_DB_TEXT) {
+        GRN_TEXT_INIT(&buffer_, GRN_OBJ_DO_SHALLOW_COPY);
+      } else {
+        GRN_VALUE_FIX_SIZE_INIT(&buffer_, 0, type_id);
+      }
     }
 
-    ~ColumnImportVisitor() {
+    ~ColumnLoadVisitor() {
+      if (grn_obj_is_accessor(ctx_, grn_column_)) {
+        grn_obj_unlink(ctx_, grn_column_);
+      }
       GRN_OBJ_FIN(ctx_, &buffer_);
     }
 
     arrow::Status Visit(const arrow::BooleanArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Int8Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::UInt8Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Int16Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::UInt16Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Int32Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::UInt32Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Int64Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::UInt64Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::HalfFloatArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::FloatArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::DoubleArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::StringArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Date64Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::TimestampArray &array) {
+      return set_values(array);
+    }
+
+  private:
+    grn_ctx *ctx_;
+    grn_obj *grn_table_;
+    const grn_id *ids_;
+    arrow::TimeUnit::type time_unit_;
+    grn_obj *grn_column_;
+    grn_obj buffer_;
+
+    template <typename T>
+    arrow::Status set_values(const T &array) {
       int64_t n_rows = array.length();
       for (int i = 0; i < n_rows; ++i) {
         auto id = ids_[i];
         GRN_BULK_REWIND(&buffer_);
-        GRN_BOOL_SET(ctx_, &buffer_, array.Value(i));
+        get_value(array, i);
         grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET);
       }
       return arrow::Status::OK();
     }
 
-  private:
-    grn_ctx *ctx_;
-    const grn_id *ids_;
-    grn_obj *grn_column_;
-    std::shared_ptr<arrow::Array> arrow_array_;
-    grn_obj buffer_;
+    void
+    get_value(const arrow::BooleanArray &array, int i) {
+      GRN_BOOL_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::UInt8Array &array, int i) {
+      GRN_UINT8_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::Int8Array &array, int i) {
+      GRN_INT8_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::UInt16Array &array, int i) {
+      GRN_UINT16_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::Int16Array &array, int i) {
+      GRN_INT16_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::UInt32Array &array, int i) {
+      GRN_UINT32_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::Int32Array &array, int i) {
+      GRN_INT32_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::UInt64Array &array, int i) {
+      GRN_UINT64_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::Int64Array &array, int i) {
+      GRN_INT64_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::HalfFloatArray &array, int i) {
+      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::FloatArray &array, int i) {
+      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::DoubleArray &array, int i) {
+      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::StringArray &array, int i) {
+      int32_t size;
+      const auto data = array.GetValue(i, &size);
+      GRN_TEXT_SET(ctx_, &buffer_, data, size);
+    }
+
+    void
+    get_value(const arrow::Date64Array &array, int i) {
+      GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::TimestampArray &array, int i) {
+      switch (time_unit_) {
+      case arrow::TimeUnit::SECOND :
+        GRN_TIME_SET(ctx_, &buffer_, GRN_TIME_PACK(array.Value(i), 0));
+        break;
+      case arrow::TimeUnit::MILLI :
+        GRN_TIME_SET(ctx_, &buffer_, array.Value(i) * 1000);
+        break;
+      case arrow::TimeUnit::MICRO :
+        GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
+        break;
+      case arrow::TimeUnit::NANO :
+        GRN_TIME_SET(ctx_, &buffer_, array.Value(i) / 1000);
+        break;
+      }
+    }
   };
 
-  class Importer {
+  class FileLoader {
   public:
-    Importer(grn_ctx *ctx, grn_obj *grn_table)
+    FileLoader(grn_ctx *ctx, grn_obj *grn_table)
       : ctx_(ctx),
         grn_table_(grn_table),
-        key_column_name_(nullptr) {
+        key_column_name_("") {
     }
 
-    ~Importer() {
+    ~FileLoader() {
     }
 
-    grn_rc import_table(const std::shared_ptr<arrow::Table> &arrow_table) {
+    grn_rc load_table(const std::shared_ptr<arrow::Table> &arrow_table) {
       int n_columns = arrow_table->num_columns();
 
       if (key_column_name_.empty()) {
@@ -91,39 +367,39 @@ namespace grnarrow {
         for (int i = 0; i < n_columns; ++i) {
           int64_t offset = 0;
           auto arrow_column = arrow_table->column(i);
-          auto column_name = arrow_column->name();
-          auto grn_column = grn_obj_column(ctx_, grn_table_,
-                                           column_name.data(),
-                                           column_name.size());
           auto arrow_chunked_data = arrow_column->data();
           for (auto arrow_array : arrow_chunked_data->chunks()) {
             grn_id *sub_ids =
               reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset;
-            ColumnImportVisitor visitor(ctx_,
-                                        sub_ids,
-                                        grn_column,
-                                        arrow_array);
+            ColumnLoadVisitor visitor(ctx_,
+                                      grn_table_,
+                                      arrow_column,
+                                      sub_ids);
             arrow_array->Accept(&visitor);
             offset += arrow_array->length();
           }
-          if (grn_obj_is_accessor(ctx_, grn_column)) {
-            grn_obj_unlink(ctx_, grn_column);
-          }
         }
         GRN_OBJ_FIN(ctx_, &ids);
       } else {
+        auto status = arrow::Status::NotImplemented("_key isn't supported yet");
+        check_status(ctx_, status, "[arrow][load]");
       }
       return ctx_->rc;
     };
 
-    grn_rc import_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) {
+    grn_rc load_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) {
       std::shared_ptr<arrow::Table> arrow_table;
       std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1);
       arrow_record_batches[0] = arrow_record_batch;
       auto status =
         arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table);
-      // TODO: check status
-      return import_table(arrow_table);
+      if (!check_status(ctx_,
+                        status,
+                        "[arrow][load] "
+                        "failed to convert record batch to table")) {
+        return ctx_->rc;
+      }
+      return load_table(arrow_table);
     };
 
   private:
@@ -131,34 +407,418 @@ namespace grnarrow {
     grn_obj *grn_table_;
     std::string key_column_name_;
   };
+
+  class FileDumper {
+  public:
+    FileDumper(grn_ctx *ctx, grn_obj *grn_table)
+      : ctx_(ctx),
+        grn_table_(grn_table) {
+      grn_columns_ = grn_hash_create(ctx_,
+                                     NULL,
+                                     sizeof(grn_id),
+                                     0,
+                                     GRN_OBJ_TABLE_HASH_KEY | GRN_HASH_TINY);
+      grn_table_columns(ctx_,
+                        grn_table_,
+                        "", 0,
+                        reinterpret_cast<grn_obj *>(grn_columns_));
+    }
+
+    ~FileDumper() {
+      grn_hash_close(ctx_, grn_columns_);
+    }
+
+    grn_rc dump(arrow::io::OutputStream *output) {
+      std::vector<std::shared_ptr<arrow::Field>> fields;
+      GRN_HASH_EACH_BEGIN(ctx_, grn_columns_, cursor, id) {
+        void *key;
+        grn_hash_cursor_get_key(ctx_, cursor, &key);
+        auto column_id = static_cast<grn_id *>(key);
+        auto column = grn_ctx_at(ctx_, *column_id);
+
+        char column_name[GRN_TABLE_MAX_KEY_SIZE];
+        int column_name_size;
+        column_name_size =
+          grn_column_name(ctx_, column, column_name, GRN_TABLE_MAX_KEY_SIZE);
+        std::string field_name(column_name, column_name_size);
+        std::shared_ptr<arrow::DataType> field_type;
+        switch (grn_obj_get_range(ctx_, column)) {
+        case GRN_DB_BOOL :
+          field_type = arrow::boolean();
+          break;
+        case GRN_DB_UINT8 :
+          field_type = arrow::uint8();
+          break;
+        case GRN_DB_INT8 :
+          field_type = arrow::int8();
+          break;
+        case GRN_DB_UINT16 :
+          field_type = arrow::uint16();
+          break;
+        case GRN_DB_INT16 :
+          field_type = arrow::int16();
+          break;
+        case GRN_DB_UINT32 :
+          field_type = arrow::uint32();
+          break;
+        case GRN_DB_INT32 :
+          field_type = arrow::int32();
+          break;
+        case GRN_DB_UINT64 :
+          field_type = arrow::uint64();
+          break;
+        case GRN_DB_INT64 :
+          field_type = arrow::int64();
+          break;
+        case GRN_DB_FLOAT :
+          field_type = arrow::float64();
+          break;
+        case GRN_DB_TIME :
+          field_type =
+            std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
+          break;
+        case GRN_DB_SHORT_TEXT :
+        case GRN_DB_TEXT :
+        case GRN_DB_LONG_TEXT :
+          field_type = arrow::utf8();
+          break;
+        default :
+          break;
+        }
+        if (!field_type) {
+          continue;
+        }
+
+        auto field = std::make_shared<arrow::Field>(field_name,
+                                                    field_type,
+                                                    false);
+        fields.push_back(field);
+      } GRN_HASH_EACH_END(ctx_, cursor);
+
+      auto schema = std::make_shared<arrow::Schema>(fields);
+
+      std::shared_ptr<arrow::ipc::RecordBatchFileWriter> writer;
+      auto status =
+        arrow::ipc::RecordBatchFileWriter::Open(output, schema, &writer);
+      if (!check_status(ctx_,
+                        status,
+                        "[arrow][dump] failed to create file format writer")) {
+        return ctx_->rc;
+      }
+
+      std::vector<grn_id> ids;
+      int n_records_per_batch = 1000;
+      GRN_TABLE_EACH_BEGIN(ctx_, grn_table_, table_cursor, record_id) {
+        ids.push_back(record_id);
+        if (ids.size() == n_records_per_batch) {
+          write_record_batch(ids, schema, writer);
+          ids.clear();
+        }
+      } GRN_TABLE_EACH_END(ctx_, table_cursor);
+      if (!ids.empty()) {
+        write_record_batch(ids, schema, writer);
+      }
+      writer->Close();
+
+      return ctx_->rc;
+    }
+
+  private:
+    grn_ctx *ctx_;
+    grn_obj *grn_table_;
+    grn_hash *grn_columns_;
+
+    void write_record_batch(std::vector<grn_id> &ids,
+                            std::shared_ptr<arrow::Schema> &schema,
+                            std::shared_ptr<arrow::ipc::RecordBatchFileWriter> &writer) {
+      std::vector<std::shared_ptr<arrow::Array>> columns;
+      GRN_HASH_EACH_BEGIN(ctx_, grn_columns_, cursor, id) {
+        void *key;
+        grn_hash_cursor_get_key(ctx_, cursor, &key);
+        auto grn_column_id = static_cast<grn_id *>(key);
+        auto grn_column = grn_ctx_at(ctx_, *grn_column_id);
+
+        arrow::Status status;
+        std::shared_ptr<arrow::Array> column;
+
+        switch (grn_obj_get_range(ctx_, grn_column)) {
+        case GRN_DB_BOOL :
+          status = build_boolean_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_UINT8 :
+          status = build_uint8_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_INT8 :
+          status = build_int8_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_UINT16 :
+          status = build_uint16_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_INT16 :
+          status = build_int16_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_UINT32 :
+          status = build_uint32_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_INT32 :
+          status = build_int32_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_UINT64 :
+          status = build_uint64_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_INT64 :
+          status = build_int64_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_FLOAT :
+          status = build_double_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_TIME :
+          status = build_timestamp_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_SHORT_TEXT :
+        case GRN_DB_TEXT :
+        case GRN_DB_LONG_TEXT :
+          status = build_utf8_array(ids, grn_column, &column);
+          break;
+        default :
+          status =
+            arrow::Status::NotImplemented("[arrow][dumper] not supported type: TODO");
+          break;
+        }
+        if (!status.ok()) {
+          continue;
+        }
+        columns.push_back(column);
+      } GRN_HASH_EACH_END(ctx_, cursor);
+
+      arrow::RecordBatch record_batch(schema, ids.size(), columns);
+      writer->WriteRecordBatch(record_batch);
+    }
+
+    arrow::Status build_boolean_array(std::vector<grn_id> &ids,
+                                      grn_obj *grn_column,
+                                      std::shared_ptr<arrow::Array> *array) {
+      arrow::BooleanBuilder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const grn_bool *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_uint8_array(std::vector<grn_id> &ids,
+                                    grn_obj *grn_column,
+                                    std::shared_ptr<arrow::Array> *array) {
+      arrow::UInt8Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const uint8_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_int8_array(std::vector<grn_id> &ids,
+                                   grn_obj *grn_column,
+                                   std::shared_ptr<arrow::Array> *array) {
+      arrow::Int8Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const int8_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_uint16_array(std::vector<grn_id> &ids,
+                                     grn_obj *grn_column,
+                                     std::shared_ptr<arrow::Array> *array) {
+      arrow::UInt16Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const uint16_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_int16_array(std::vector<grn_id> &ids,
+                                    grn_obj *grn_column,
+                                    std::shared_ptr<arrow::Array> *array) {
+      arrow::Int16Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const int16_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_uint32_array(std::vector<grn_id> &ids,
+                                     grn_obj *grn_column,
+                                     std::shared_ptr<arrow::Array> *array) {
+      arrow::UInt32Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const uint32_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_int32_array(std::vector<grn_id> &ids,
+                                    grn_obj *grn_column,
+                                    std::shared_ptr<arrow::Array> *array) {
+      arrow::Int32Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const int32_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+    arrow::Status build_uint64_array(std::vector<grn_id> &ids,
+                                     grn_obj *grn_column,
+                                     std::shared_ptr<arrow::Array> *array) {
+      arrow::UInt64Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const uint64_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_int64_array(std::vector<grn_id> &ids,
+                                    grn_obj *grn_column,
+                                    std::shared_ptr<arrow::Array> *array) {
+      arrow::Int64Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const int64_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_double_array(std::vector<grn_id> &ids,
+                                     grn_obj *grn_column,
+                                     std::shared_ptr<arrow::Array> *array) {
+      arrow::DoubleBuilder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const double *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_timestamp_array(std::vector<grn_id> &ids,
+                                        grn_obj *grn_column,
+                                        std::shared_ptr<arrow::Array> *array) {
+      auto timestamp_ns_data_type =
+        std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
+      arrow::TimestampBuilder builder(arrow::default_memory_pool(),
+                                      timestamp_ns_data_type);
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        auto timestamp_ns = *(reinterpret_cast<const int64_t *>(data));
+        builder.Append(timestamp_ns);
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_utf8_array(std::vector<grn_id> &ids,
+                                   grn_obj *grn_column,
+                                   std::shared_ptr<arrow::Array> *array) {
+      arrow::StringBuilder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(data, size);
+      }
+      return builder.Finish(array);
+    }
+  };
 }
+#endif /* GRN_WITH_ARROW */
 
 extern "C" {
 grn_rc
-grn_arrow_import_from_path(grn_ctx *ctx,
-                           grn_obj *table,
-                           const char *path)
+grn_arrow_load(grn_ctx *ctx,
+               grn_obj *table,
+               const char *path)
 {
+  GRN_API_ENTER;
+#ifdef GRN_WITH_ARROW
   std::shared_ptr<arrow::io::MemoryMappedFile> input;
   auto status =
     arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input);
-  // TODO: check status
-  std::shared_ptr<arrow::ipc::FileReader> reader;
-  status = arrow::ipc::FileReader::Open(input, &reader);
-  // TODO: check status
+  if (!grnarrow::check_status(ctx,
+                              status,
+                              std::ostringstream() <<
+                              "[arrow][load] failed to open path: " <<
+                              "<" << path << ">")) {
+    GRN_API_RETURN(ctx->rc);
+  }
+  std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader;
+  status = arrow::ipc::RecordBatchFileReader::Open(input, &reader);
+  if (!grnarrow::check_status(ctx,
+                              status,
+                              "[arrow][load] "
+                              "failed to create file format reader")) {
+    GRN_API_RETURN(ctx->rc);
+  }
 
-  grnarrow::Importer importer(ctx, table);
+  grnarrow::FileLoader loader(ctx, table);
   int n_record_batches = reader->num_record_batches();
   for (int i = 0; i < n_record_batches; ++i) {
     std::shared_ptr<arrow::RecordBatch> record_batch;
     status = reader->GetRecordBatch(i, &record_batch);
-    // TODO: check status
-    importer.import_record_batch(record_batch);
+    if (!grnarrow::check_status(ctx,
+                                status,
+                                std::ostringstream("") <<
+                                "[arrow][load] failed to get " <<
+                                "the " << i << "-th " << "record")) {
+      break;
+    }
+    loader.load_record_batch(record_batch);
     if (ctx->rc != GRN_SUCCESS) {
       break;
     }
   }
+#else /* GRN_WITH_ARROW */
+  ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
+      "[arrow][load] Apache Arrow support isn't enabled");
+#endif /* GRN_WITH_ARROW */
+  GRN_API_RETURN(ctx->rc);
+}
+
+grn_rc
+grn_arrow_dump(grn_ctx *ctx,
+               grn_obj *table,
+               const char *path)
+{
+  GRN_API_ENTER;
+#ifdef GRN_WITH_ARROW
+  std::shared_ptr<arrow::io::FileOutputStream> output;
+  auto status = arrow::io::FileOutputStream::Open(path, &output);
+  if (!grnarrow::check_status(ctx,
+                              status,
+                              std::stringstream() <<
+                              "[arrow][dump] failed to open path: " <<
+                              "<" << path << ">")) {
+    GRN_API_RETURN(ctx->rc);
+  }
 
-  return ctx->rc;
+  grnarrow::FileDumper dumper(ctx, table);
+  dumper.dump(output.get());
+#else /* GRN_WITH_ARROW */
+  ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
+      "[arrow][dump] Apache Arrow support isn't enabled");
+#endif /* GRN_WITH_ARROW */
+  GRN_API_RETURN(ctx->rc);
 }
 }
-------------- next part --------------
HTML����������������������������...
Descargar 



More information about the Groonga-commit mailing list
Back to archive index