[Groonga-commit] groonga/grnxx at feb1ac5 [master] Add grnxx::map::DoubleArrayImpl for defrag() and truncate().

Zurück zum Archiv-Index

susumu.yata null+****@clear*****
Wed Aug 7 15:37:15 JST 2013


susumu.yata	2013-08-07 15:37:15 +0900 (Wed, 07 Aug 2013)

  New Revision: feb1ac510bc5bfa958842c6534edf2fe0a19c084
  https://github.com/groonga/grnxx/commit/feb1ac510bc5bfa958842c6534edf2fe0a19c084

  Message:
    Add grnxx::map::DoubleArrayImpl for defrag() and truncate().

  Modified files:
    lib/grnxx/map/double_array.cpp
    lib/grnxx/map/double_array.hpp

  Modified: lib/grnxx/map/double_array.cpp (+370 -149)
===================================================================
--- lib/grnxx/map/double_array.cpp    2013-08-07 13:17:27 +0900 (9ab632c)
+++ lib/grnxx/map/double_array.cpp    2013-08-07 15:37:15 +0900 (7ad6f97)
@@ -19,6 +19,7 @@
 
 #include <new>
 
+#include "grnxx/array.hpp"
 #include "grnxx/exception.hpp"
 #include "grnxx/geo_point.hpp"
 #include "grnxx/intrinsic.hpp"
@@ -34,11 +35,11 @@ namespace {
 
 constexpr char FORMAT_STRING[] = "grnxx::map::DoubleArray";
 
-constexpr uint64_t BLOCK_MAX_FAILURE_COUNT  = 4;
-constexpr uint64_t BLOCK_MAX_LEVEL          = 5;
-constexpr uint64_t BLOCK_INVALID_ID         = (1ULL << 40) - 1;
-constexpr uint64_t BLOCK_SIZE               = 1ULL << 9;
-constexpr uint64_t BLOCK_MAX_COUNT          = 16;
+constexpr uint64_t BLOCK_MAX_FAILURE_COUNT = 4;
+constexpr uint64_t BLOCK_MAX_LEVEL         = 5;
+constexpr uint64_t BLOCK_INVALID_ID        = (1ULL << 40) - 1;
+constexpr uint64_t BLOCK_SIZE              = 1ULL << 9;
+constexpr uint64_t BLOCK_MAX_COUNT         = 16;
 
 constexpr uint64_t NODE_TERMINAL_LABEL = 0x100;
 constexpr uint64_t NODE_MAX_LABEL      = NODE_TERMINAL_LABEL;
@@ -47,32 +48,22 @@ constexpr uint64_t NODE_INVALID_OFFSET = 0;
 
 constexpr uint64_t ROOT_NODE_ID = 0;
 
-}  // namespace
-
-struct DoubleArrayHeader {
-  CommonHeader common_header;
+struct ImplHeader {
   uint32_t nodes_storage_node_id;
   uint32_t siblings_storage_node_id;
   uint32_t blocks_storage_node_id;
-  uint32_t pool_storage_node_id;
   uint64_t num_blocks;
   uint64_t num_phantoms;
   uint64_t num_zombies;
   uint64_t latest_blocks[BLOCK_MAX_LEVEL + 1];
 
-  // Initialize the member variables.
-  DoubleArrayHeader();
-
-  // Return true iff the header seems to be correct.
-  explicit operator bool() const;
+  ImplHeader();
 };
 
-DoubleArrayHeader::DoubleArrayHeader()
-    : common_header(FORMAT_STRING, MAP_DOUBLE_ARRAY),
-      nodes_storage_node_id(STORAGE_INVALID_NODE_ID),
+ImplHeader::ImplHeader()
+    : nodes_storage_node_id(STORAGE_INVALID_NODE_ID),
       siblings_storage_node_id(STORAGE_INVALID_NODE_ID),
       blocks_storage_node_id(STORAGE_INVALID_NODE_ID),
-      pool_storage_node_id(STORAGE_INVALID_NODE_ID),
       num_blocks(0),
       num_phantoms(0),
       num_zombies(0),
@@ -82,11 +73,7 @@ DoubleArrayHeader::DoubleArrayHeader()
   }
 }
 
-DoubleArrayHeader::operator bool() const {
-  return common_header.format() == FORMAT_STRING;
-}
-
-// The internal structure of DoubleArrayBlock is as follows:
+// The internal structure of Block is as follows:
 // - values_[0]
 //    0-15 (16): first_phantom
 //   16-23 ( 8): level
@@ -96,7 +83,7 @@ DoubleArrayHeader::operator bool() const {
 //   16-23 ( 8): failure_count
 //   24-63 (40): prev
 // where 0 is the LSB and 63 is the MSB.
-class DoubleArrayBlock {
+class Block {
   // For values_[0].
   static constexpr uint64_t FIRST_PHANTOM_MASK  = (1ULL << 16) - 1;
   static constexpr uint8_t  FIRST_PHANTOM_SHIFT = 0;
@@ -114,8 +101,8 @@ class DoubleArrayBlock {
   static constexpr uint8_t  PREV_SHIFT          = 24;
 
  public:
-  static DoubleArrayBlock empty_block() {
-    return DoubleArrayBlock(0, BLOCK_SIZE << NUM_PHANTOMS_SHIFT);
+  static Block empty_block() {
+    return Block(0, BLOCK_SIZE << NUM_PHANTOMS_SHIFT);
   }
 
   // Return the first phantom node.
@@ -173,8 +160,7 @@ class DoubleArrayBlock {
  private:
   uint64_t values_[2];
 
-  DoubleArrayBlock(uint64_t value_0, uint64_t value_1)
-       : values_{ value_0, value_1 } {}
+  Block(uint64_t value_0, uint64_t value_1) : values_{ value_0, value_1 } {}
 };
 
 // The internal structure of DoubleArray is as follows:
@@ -196,7 +182,7 @@ class DoubleArrayBlock {
 //    9-17 ( 9): child
 //   18-59 (42): offset
 // where 0 is the LSB and 63 is the MSB.
-class DoubleArrayNode {
+class Node {
   static constexpr uint64_t IS_PHANTOM_FLAG  = 1ULL << 62;
   static constexpr uint64_t IS_ORIGIN_FLAG   = 1ULL << 63;
 
@@ -220,10 +206,9 @@ class DoubleArrayNode {
 
  public:
   // Create a phantom node.
-  static DoubleArrayNode phantom_node(uint64_t next, uint64_t prev) {
-    return DoubleArrayNode(IS_PHANTOM_FLAG |
-                           ((next & NEXT_MASK) << NEXT_SHIFT) |
-                           ((prev & PREV_MASK) << PREV_SHIFT));
+  static Node phantom_node(uint64_t next, uint64_t prev) {
+    return Node(IS_PHANTOM_FLAG | ((next & NEXT_MASK) << NEXT_SHIFT) |
+                                  ((prev & PREV_MASK) << PREV_SHIFT));
   }
 
   // Return true iff this node is a phantom node.
@@ -336,83 +321,146 @@ class DoubleArrayNode {
  private:
   uint64_t value_;
 
-  explicit DoubleArrayNode(uint64_t value) : value_(value) {}
+  explicit Node(uint64_t value) : value_(value) {}
 };
 
-template <typename T>
-Map<T> *DoubleArray<T>::create(Storage *, uint32_t, const MapOptions &) {
-  GRNXX_ERROR() << "unsupported type";
-  throw LogicError();
-}
+}  // namespace
 
-template <typename T>
-Map<T> *DoubleArray<T>::open(Storage *, uint32_t) {
-  GRNXX_ERROR() << "unsupported type";
-  throw LogicError();
-}
+class DoubleArrayImpl {
+  using Header       = ImplHeader;
+  using NodeArray    = Array<Node,     65536, 8192>;  // 42-bit
+  using SiblingArray = Array<uint8_t, 262144, 4096>;  // 42-bit
+  using BlockArray   = Array<Block,     8192, 1024>;  // 33-bit
+  using Pool         = KeyPool<Bytes>;
 
-template class DoubleArray<int8_t>;
-template class DoubleArray<uint8_t>;
-template class DoubleArray<int16_t>;
-template class DoubleArray<uint16_t>;
-template class DoubleArray<int32_t>;
-template class DoubleArray<uint32_t>;
-template class DoubleArray<int64_t>;
-template class DoubleArray<uint64_t>;
-template class DoubleArray<double>;
-template class DoubleArray<GeoPoint>;
+  static constexpr uint64_t NODE_ARRAY_SIZE    = 1ULL << 42;
+  static constexpr uint64_t SIBLING_ARRAY_SIZE = 1ULL << 42;
+  static constexpr uint64_t BLOCK_ARRAY_SIZE   = 1ULL << 33;
 
-DoubleArray<Bytes>::DoubleArray()
+ public:
+  using Key = typename Map<Bytes>::Key;
+  using KeyArg = typename Map<Bytes>::KeyArg;
+  using Cursor = typename Map<Bytes>::Cursor;
+
+  DoubleArrayImpl();
+  ~DoubleArrayImpl();
+
+  static DoubleArrayImpl *create(Storage *storage, uint32_t storage_node_id,
+                                 const MapOptions &options);
+  static DoubleArrayImpl *open(Storage *storage, uint32_t storage_node_id);
+
+  void set_pool(Pool *pool) {
+    pool_ = pool;
+  }
+
+  uint32_t storage_node_id() const {
+    return storage_node_id_;
+  }
+
+  int64_t max_key_id() const {
+    return pool_->max_key_id();
+  }
+  uint64_t num_keys() const {
+    return pool_->num_keys();
+  }
+
+  bool get(int64_t key_id, Key *key = nullptr);
+  bool unset(int64_t key_id);
+  bool reset(int64_t key_id, KeyArg dest_key);
+
+  bool find(KeyArg key, int64_t *key_id = nullptr);
+  bool add(KeyArg key, int64_t *key_id = nullptr);
+  bool remove(KeyArg key);
+  bool replace(KeyArg src_key, KeyArg dest_key, int64_t *key_id = nullptr);
+
+  void defrag(double usage_rate_threshold);
+
+  void truncate();
+
+  bool find_longest_prefix_match(KeyArg query,
+                                 int64_t *key_id = nullptr,
+                                 Key *key = nullptr);
+
+//  Cursor *create_cursor(
+//      MapCursorAllKeys<Bytes> query,
+//      const MapCursorOptions &options = MapCursorOptions());
+//  Cursor *create_cursor(
+//      const MapCursorKeyIDRange<Bytes> &query,
+//      const MapCursorOptions &options = MapCursorOptions());
+//  Cursor *create_cursor(
+//      const MapCursorKeyRange<Bytes> &query,
+//      const MapCursorOptions &options = MapCursorOptions());
+
+ private:
+  Storage *storage_;
+  uint32_t storage_node_id_;
+  Header *header_;
+  std::unique_ptr<NodeArray> nodes_;
+  std::unique_ptr<SiblingArray> siblings_;
+  std::unique_ptr<BlockArray> blocks_;
+  Pool *pool_;
+
+  void create_impl(Storage *storage, uint32_t storage_node_id,
+                   const MapOptions &options);
+  void open_impl(Storage *storage, uint32_t storage_node_id);
+
+  bool replace_key(int64_t key_id, KeyArg src_key, KeyArg dest_key);
+
+  bool find_leaf(KeyArg key, Node **leaf_node, uint64_t *leaf_key_pos);
+  bool insert_leaf(KeyArg key, Node *node, uint64_t key_pos, Node **leaf_node);
+
+  Node *insert_node(Node *node, uint64_t label);
+  Node *separate(Node *node, uint64_t labels[2]);
+
+  void resolve(Node *node, uint64_t label);
+  void migrate_nodes(Node *node, uint64_t dest_offset,
+                     const uint64_t *labels, uint64_t num_labels);
+
+  uint64_t find_offset(const uint64_t *labels, uint64_t num_labels);
+
+  Node *reserve_node(uint64_t node_id);
+  Block *reserve_block(uint64_t block_id);
+
+  void update_block_level(uint64_t block_id, Block *block, uint64_t level);
+  void set_block_level(uint64_t block_id, Block *block, uint64_t level);
+  void unset_block_level(uint64_t block_id, Block *block);
+};
+
+DoubleArrayImpl::DoubleArrayImpl()
     : storage_(nullptr),
       storage_node_id_(STORAGE_INVALID_NODE_ID),
       header_(nullptr),
       nodes_(),
       siblings_(),
       blocks_(),
-      pool_() {}
+      pool_(nullptr) {}
 
-DoubleArray<Bytes>::~DoubleArray() {}
+DoubleArrayImpl::~DoubleArrayImpl() {}
 
-DoubleArray<Bytes> *DoubleArray<Bytes>::create(Storage *storage,
-                                               uint32_t storage_node_id,
-                                               const MapOptions &options) {
-  std::unique_ptr<DoubleArray> map(new (std::nothrow) DoubleArray);
-  if (!map) {
-    GRNXX_ERROR() << "new grnxx::map::DoubleArray failed";
+DoubleArrayImpl *DoubleArrayImpl::create(Storage *storage,
+                                         uint32_t storage_node_id,
+                                         const MapOptions &options) {
+  std::unique_ptr<DoubleArrayImpl> impl(new (std::nothrow) DoubleArrayImpl);
+  if (!impl) {
+    GRNXX_ERROR() << "new grnxx::map::DoubleArrayImpl failed";
     throw MemoryError();
   }
-  map->create_map(storage, storage_node_id, options);
-  return map.release();
+  impl->create_impl(storage, storage_node_id, options);
+  return impl.release();
 }
 
-DoubleArray<Bytes> *DoubleArray<Bytes>::open(Storage *storage,
-                                             uint32_t storage_node_id) {
-  std::unique_ptr<DoubleArray> map(new (std::nothrow) DoubleArray);
-  if (!map) {
-    GRNXX_ERROR() << "new grnxx::map::DoubleArray failed";
+DoubleArrayImpl *DoubleArrayImpl::open(Storage *storage,
+                                       uint32_t storage_node_id) {
+  std::unique_ptr<DoubleArrayImpl> impl(new (std::nothrow) DoubleArrayImpl);
+  if (!impl) {
+    GRNXX_ERROR() << "new grnxx::map::DoubleArrayImpl failed";
     throw MemoryError();
   }
-  map->open_map(storage, storage_node_id);
-  return map.release();
-}
-
-uint32_t DoubleArray<Bytes>::storage_node_id() const {
-  return storage_node_id_;
-}
-
-MapType DoubleArray<Bytes>::type() const {
-  return MAP_DOUBLE_ARRAY;
-}
-
-int64_t DoubleArray<Bytes>::max_key_id() const {
-  return pool_->max_key_id();
-}
-
-uint64_t DoubleArray<Bytes>::num_keys() const {
-  return pool_->num_keys();
+  impl->open_impl(storage, storage_node_id);
+  return impl.release();
 }
 
-bool DoubleArray<Bytes>::get(int64_t key_id, Key *key) {
+bool DoubleArrayImpl::get(int64_t key_id, Key *key) {
   if ((key_id < MAP_MIN_KEY_ID) || (key_id > max_key_id())) {
     // Out of range.
     return false;
@@ -420,7 +468,7 @@ bool DoubleArray<Bytes>::get(int64_t key_id, Key *key) {
   return pool_->get(key_id, key);
 }
 
-bool DoubleArray<Bytes>::unset(int64_t key_id) {
+bool DoubleArrayImpl::unset(int64_t key_id) {
   Key key;
   if (!get(key_id, &key)) {
     // Not found.
@@ -429,7 +477,7 @@ bool DoubleArray<Bytes>::unset(int64_t key_id) {
   return remove(key);
 }
 
-bool DoubleArray<Bytes>::reset(int64_t key_id, KeyArg dest_key) {
+bool DoubleArrayImpl::reset(int64_t key_id, KeyArg dest_key) {
   Key src_key;
   if (!get(key_id, &src_key)) {
     // Not found.
@@ -438,7 +486,7 @@ bool DoubleArray<Bytes>::reset(int64_t key_id, KeyArg dest_key) {
   return replace(src_key, dest_key);
 }
 
-bool DoubleArray<Bytes>::find(KeyArg key, int64_t *key_id) {
+bool DoubleArrayImpl::find(KeyArg key, int64_t *key_id) {
   uint64_t node_id = ROOT_NODE_ID;
   Node node = nodes_->get(node_id);
   uint64_t key_pos;
@@ -481,7 +529,7 @@ bool DoubleArray<Bytes>::find(KeyArg key, int64_t *key_id) {
   return true;
 }
 
-bool DoubleArray<Bytes>::add(KeyArg key, int64_t *key_id) {
+bool DoubleArrayImpl::add(KeyArg key, int64_t *key_id) {
   Node *node;
   uint64_t key_pos;
   find_leaf(key, &node, &key_pos);
@@ -499,7 +547,7 @@ bool DoubleArray<Bytes>::add(KeyArg key, int64_t *key_id) {
   return true;
 }
 
-bool DoubleArray<Bytes>::remove(KeyArg key) {
+bool DoubleArrayImpl::remove(KeyArg key) {
   Node *node;
   uint64_t key_pos;
   if (!find_leaf(key, &node, &key_pos)) {
@@ -520,7 +568,7 @@ bool DoubleArray<Bytes>::remove(KeyArg key) {
   return true;
 }
 
-bool DoubleArray<Bytes>::replace(KeyArg src_key, KeyArg dest_key,
+bool DoubleArrayImpl::replace(KeyArg src_key, KeyArg dest_key,
                                  int64_t *key_id) {
   int64_t src_key_id;
   if (!find(src_key, &src_key_id)) {
@@ -537,12 +585,12 @@ bool DoubleArray<Bytes>::replace(KeyArg src_key, KeyArg dest_key,
   return true;
 }
 
-void DoubleArray<Bytes>::defrag(double usage_rate_threshold) {
+void DoubleArrayImpl::defrag(double usage_rate_threshold) {
   // TODO
   pool_->defrag(usage_rate_threshold);
 }
 
-void DoubleArray<Bytes>::truncate() {
+void DoubleArrayImpl::truncate() {
   // TODO: How to recycle nodes.
   Node * const node = &nodes_->get_value(ROOT_NODE_ID);
   node->set_child(NODE_INVALID_LABEL);
@@ -550,7 +598,7 @@ void DoubleArray<Bytes>::truncate() {
   pool_->truncate();
 }
 
-bool DoubleArray<Bytes>::find_longest_prefix_match(KeyArg query,
+bool DoubleArrayImpl::find_longest_prefix_match(KeyArg query,
                                                    int64_t *key_id,
                                                    Key *key) {
   bool found = false;
@@ -620,26 +668,8 @@ bool DoubleArray<Bytes>::find_longest_prefix_match(KeyArg query,
   return found;
 }
 
-//MapCursor<Bytes> *DoubleArray<Bytes>::create_cursor(
-//    MapCursorAllKeys<Bytes> query, const MapCursorOptions &options) {
-//  // TODO
-//  return nullptr;
-//}
-
-//MapCursor<Bytes> *DoubleArray<Bytes>::create_cursor(
-//    const MapCursorKeyIDRange<Bytes> &query, const MapCursorOptions &options) {
-//  // TODO
-//  return nullptr;
-//}
-
-//MapCursor<Bytes> *DoubleArray<Bytes>::create_cursor(
-//    const MapCursorKeyRange<Bytes> &query, const MapCursorOptions &options) {
-//  // TODO
-//  return nullptr;
-//}
-
-void DoubleArray<Bytes>::create_map(Storage *storage, uint32_t storage_node_id,
-                                    const MapOptions &) {
+void DoubleArrayImpl::create_impl(Storage *storage, uint32_t storage_node_id,
+                                  const MapOptions &) {
   storage_ = storage;
   StorageNode storage_node =
       storage->create_node(storage_node_id, sizeof(Header));
@@ -653,11 +683,9 @@ void DoubleArray<Bytes>::create_map(Storage *storage, uint32_t storage_node_id,
                                          SIBLING_ARRAY_SIZE));
     blocks_.reset(BlockArray::create(storage, storage_node_id_,
                                      BLOCK_ARRAY_SIZE));
-    pool_.reset(KeyPool<Bytes>::create(storage, storage_node_id_));
     header_->nodes_storage_node_id = nodes_->storage_node_id();
     header_->siblings_storage_node_id = siblings_->storage_node_id();
     header_->blocks_storage_node_id = blocks_->storage_node_id();
-    header_->pool_storage_node_id = pool_->storage_node_id();
     Node * const root_node = reserve_node(ROOT_NODE_ID);
     root_node[NODE_INVALID_OFFSET - ROOT_NODE_ID].set_is_origin(true);
   } catch (...) {
@@ -666,7 +694,7 @@ void DoubleArray<Bytes>::create_map(Storage *storage, uint32_t storage_node_id,
   }
 }
 
-void DoubleArray<Bytes>::open_map(Storage *storage, uint32_t storage_node_id) {
+void DoubleArrayImpl::open_impl(Storage *storage, uint32_t storage_node_id) {
   storage_ = storage;
   storage_node_id_ = storage_node_id;
   StorageNode storage_node = storage->open_node(storage_node_id_);
@@ -676,20 +704,14 @@ void DoubleArray<Bytes>::open_map(Storage *storage, uint32_t storage_node_id) {
     throw LogicError();
   }
   header_ = static_cast<Header *>(storage_node.body());
-  if (!*header_) {
-    GRNXX_ERROR() << "wrong format: expected = " << FORMAT_STRING
-                  << ", actual = " << header_->common_header.format();
-    throw LogicError();
-  }
   nodes_.reset(NodeArray::open(storage, header_->nodes_storage_node_id));
   siblings_.reset(
       SiblingArray::open(storage, header_->siblings_storage_node_id));
   blocks_.reset(BlockArray::open(storage, header_->blocks_storage_node_id));
-  pool_.reset(KeyPool<Bytes>::open(storage, header_->pool_storage_node_id));
 }
 
-bool DoubleArray<Bytes>::replace_key(int64_t key_id, KeyArg src_key,
-                                     KeyArg dest_key) {
+bool DoubleArrayImpl::replace_key(int64_t key_id, KeyArg src_key,
+                                  KeyArg dest_key) {
   Node *dest_node;
   uint64_t key_pos;
   find_leaf(dest_key, &dest_node, &key_pos);
@@ -708,8 +730,8 @@ bool DoubleArray<Bytes>::replace_key(int64_t key_id, KeyArg src_key,
   return true;
 }
 
-bool DoubleArray<Bytes>::find_leaf(KeyArg key, Node **leaf_node,
-                                   uint64_t *leaf_key_pos) {
+bool DoubleArrayImpl::find_leaf(KeyArg key, Node **leaf_node,
+                                uint64_t *leaf_key_pos) {
   Node *node = &nodes_->get_value(ROOT_NODE_ID);
   uint64_t key_pos;
   for (key_pos = 0; key_pos < key.size(); ++key_pos) {
@@ -745,8 +767,8 @@ bool DoubleArray<Bytes>::find_leaf(KeyArg key, Node **leaf_node,
   return node->is_leaf();
 }
 
-bool DoubleArray<Bytes>::insert_leaf(KeyArg key, Node *node,
-                                     uint64_t key_pos, Node **leaf_node) {
+bool DoubleArrayImpl::insert_leaf(KeyArg key, Node *node,
+                                  uint64_t key_pos, Node **leaf_node) {
   if (node->is_leaf()) {
     Key stored_key;
     if (!pool_->get(node->key_id(), &stored_key)) {
@@ -785,7 +807,7 @@ bool DoubleArray<Bytes>::insert_leaf(KeyArg key, Node *node,
   }
 }
 
-auto DoubleArray<Bytes>::insert_node(Node *node, uint64_t label) -> Node * {
+Node *DoubleArrayImpl::insert_node(Node *node, uint64_t label) {
   uint64_t offset = node->offset();
   if (node->is_leaf() || (offset == NODE_INVALID_OFFSET)) {
     offset = find_offset(&label, 1);
@@ -833,7 +855,7 @@ auto DoubleArray<Bytes>::insert_node(Node *node, uint64_t label) -> Node * {
   return next_node;
 }
 
-auto DoubleArray<Bytes>::separate(Node *node, uint64_t labels[2]) -> Node * {
+Node *DoubleArrayImpl::separate(Node *node, uint64_t labels[2]) {
   const uint64_t offset = find_offset(labels, 2);
   uint64_t node_ids[2] = { offset ^ labels[0], offset ^ labels[1] };
   Node *nodes[2];
@@ -859,7 +881,7 @@ auto DoubleArray<Bytes>::separate(Node *node, uint64_t labels[2]) -> Node * {
   return nodes[1];
 }
 
-void DoubleArray<Bytes>::resolve(Node *node, uint64_t label) {
+void DoubleArrayImpl::resolve(Node *node, uint64_t label) {
   uint64_t offset = node->offset();
   if (offset == NODE_INVALID_OFFSET) {
     return;
@@ -889,9 +911,9 @@ void DoubleArray<Bytes>::resolve(Node *node, uint64_t label) {
   migrate_nodes(node, offset, labels, num_labels);
 }
 
-void DoubleArray<Bytes>::migrate_nodes(Node *node, uint64_t dest_offset,
-                                       const uint64_t *labels,
-                                       uint64_t num_labels) {
+void DoubleArrayImpl::migrate_nodes(Node *node, uint64_t dest_offset,
+                                    const uint64_t *labels,
+                                    uint64_t num_labels) {
   const uint64_t src_offset = node->offset();
   Node * const src_node_block =
       &nodes_->get_value(src_offset & ~(BLOCK_SIZE - 1));
@@ -919,8 +941,8 @@ void DoubleArray<Bytes>::migrate_nodes(Node *node, uint64_t dest_offset,
   node->set_offset(dest_offset);
 }
 
-uint64_t DoubleArray<Bytes>::find_offset(const uint64_t *labels,
-                                         uint64_t num_labels) {
+uint64_t DoubleArrayImpl::find_offset(const uint64_t *labels,
+                                      uint64_t num_labels) {
   // Blocks are tested in descending order of level.
   // Generally, a lower level contains more phantom nodes.
   uint64_t level = bit_scan_reverse(num_labels) + 1;
@@ -979,7 +1001,7 @@ uint64_t DoubleArray<Bytes>::find_offset(const uint64_t *labels,
   return (header_->num_blocks * BLOCK_SIZE) ^ labels[0];
 }
 
-auto DoubleArray<Bytes>::reserve_node(uint64_t node_id) -> Node * {
+Node *DoubleArrayImpl::reserve_node(uint64_t node_id) {
   const uint64_t block_id = node_id / BLOCK_SIZE;
   Block *block;
   if (node_id >= (header_->num_blocks * BLOCK_SIZE)) {
@@ -1009,7 +1031,7 @@ auto DoubleArray<Bytes>::reserve_node(uint64_t node_id) -> Node * {
   return node;
 }
 
-auto DoubleArray<Bytes>::reserve_block(uint64_t block_id) -> Block * {
+Block *DoubleArrayImpl::reserve_block(uint64_t block_id) {
   if (block_id >= blocks_->size()) {
     GRNXX_ERROR() << "too many blocks: block_id = " << block_id
                   << ", max_block_id = " << (blocks_->size() - 1);
@@ -1028,15 +1050,15 @@ auto DoubleArray<Bytes>::reserve_block(uint64_t block_id) -> Block * {
   return block;
 }
 
-void DoubleArray<Bytes>::update_block_level(uint64_t block_id, Block *block,
-                                            uint64_t level) {
+void DoubleArrayImpl::update_block_level(uint64_t block_id, Block *block,
+                                         uint64_t level) {
   // FIXME: If set_block_level() fails, the block gets lost.
   unset_block_level(block_id, block);
   set_block_level(block_id, block, level);
 }
 
-void DoubleArray<Bytes>::set_block_level(uint64_t block_id, Block *block,
-                                         uint64_t level) {
+void DoubleArrayImpl::set_block_level(uint64_t block_id, Block *block,
+                                      uint64_t level) {
   if (header_->latest_blocks[level] == BLOCK_INVALID_ID) {
     // The block becomes the only one member of the level group.
     block->set_next(block_id);
@@ -1057,7 +1079,7 @@ void DoubleArray<Bytes>::set_block_level(uint64_t block_id, Block *block,
   block->set_failure_count(0);
 }
 
-void DoubleArray<Bytes>::unset_block_level(uint64_t block_id, Block *block) {
+void DoubleArrayImpl::unset_block_level(uint64_t block_id, Block *block) {
   const uint64_t level = block->level();
   const uint64_t next_block_id = block->next();
   const uint64_t prev_block_id = block->prev();
@@ -1076,5 +1098,204 @@ void DoubleArray<Bytes>::unset_block_level(uint64_t block_id, Block *block) {
   }
 }
 
+struct DoubleArrayHeader {
+  CommonHeader common_header;
+  uint32_t impl_storage_node_id;
+  uint32_t pool_storage_node_id;
+
+  // Initialize the member variables.
+  DoubleArrayHeader();
+
+  // Return true iff the header seems to be correct.
+  explicit operator bool() const;
+};
+
+DoubleArrayHeader::DoubleArrayHeader()
+    : common_header(FORMAT_STRING, MAP_DOUBLE_ARRAY),
+      impl_storage_node_id(STORAGE_INVALID_NODE_ID),
+      pool_storage_node_id(STORAGE_INVALID_NODE_ID) {}
+
+DoubleArrayHeader::operator bool() const {
+  return common_header.format() == FORMAT_STRING;
+}
+
+template <typename T>
+Map<T> *DoubleArray<T>::create(Storage *, uint32_t, const MapOptions &) {
+  GRNXX_ERROR() << "unsupported type";
+  throw LogicError();
+}
+
+template <typename T>
+Map<T> *DoubleArray<T>::open(Storage *, uint32_t) {
+  GRNXX_ERROR() << "unsupported type";
+  throw LogicError();
+}
+
+template class DoubleArray<int8_t>;
+template class DoubleArray<uint8_t>;
+template class DoubleArray<int16_t>;
+template class DoubleArray<uint16_t>;
+template class DoubleArray<int32_t>;
+template class DoubleArray<uint32_t>;
+template class DoubleArray<int64_t>;
+template class DoubleArray<uint64_t>;
+template class DoubleArray<double>;
+template class DoubleArray<GeoPoint>;
+
+DoubleArray<Bytes>::DoubleArray()
+    : storage_(nullptr),
+      storage_node_id_(STORAGE_INVALID_NODE_ID),
+      header_(nullptr),
+      impl_(),
+      pool_() {}
+
+DoubleArray<Bytes>::~DoubleArray() {}
+
+DoubleArray<Bytes> *DoubleArray<Bytes>::create(Storage *storage,
+                                               uint32_t storage_node_id,
+                                               const MapOptions &options) {
+  std::unique_ptr<DoubleArray> map(new (std::nothrow) DoubleArray);
+  if (!map) {
+    GRNXX_ERROR() << "new grnxx::map::DoubleArray failed";
+    throw MemoryError();
+  }
+  map->create_map(storage, storage_node_id, options);
+  return map.release();
+}
+
+DoubleArray<Bytes> *DoubleArray<Bytes>::open(Storage *storage,
+                                             uint32_t storage_node_id) {
+  std::unique_ptr<DoubleArray> map(new (std::nothrow) DoubleArray);
+  if (!map) {
+    GRNXX_ERROR() << "new grnxx::map::DoubleArray failed";
+    throw MemoryError();
+  }
+  map->open_map(storage, storage_node_id);
+  return map.release();
+}
+
+uint32_t DoubleArray<Bytes>::storage_node_id() const {
+  return storage_node_id_;
+}
+
+MapType DoubleArray<Bytes>::type() const {
+  return MAP_DOUBLE_ARRAY;
+}
+
+int64_t DoubleArray<Bytes>::max_key_id() const {
+  return pool_->max_key_id();
+}
+
+uint64_t DoubleArray<Bytes>::num_keys() const {
+  return pool_->num_keys();
+}
+
+bool DoubleArray<Bytes>::get(int64_t key_id, Key *key) {
+  if ((key_id < MAP_MIN_KEY_ID) || (key_id > max_key_id())) {
+    // Out of range.
+    return false;
+  }
+  return pool_->get(key_id, key);
+}
+
+bool DoubleArray<Bytes>::unset(int64_t key_id) {
+  return impl_->unset(key_id);
+}
+
+bool DoubleArray<Bytes>::reset(int64_t key_id, KeyArg dest_key) {
+  return impl_->reset(key_id, dest_key);
+}
+
+bool DoubleArray<Bytes>::find(KeyArg key, int64_t *key_id) {
+  return impl_->find(key, key_id);
+}
+
+bool DoubleArray<Bytes>::add(KeyArg key, int64_t *key_id) {
+  return impl_->add(key, key_id);
+}
+
+bool DoubleArray<Bytes>::remove(KeyArg key) {
+  return impl_->remove(key);
+}
+
+bool DoubleArray<Bytes>::replace(KeyArg src_key, KeyArg dest_key,
+                                 int64_t *key_id) {
+  return impl_->replace(src_key, dest_key, key_id);
+}
+
+void DoubleArray<Bytes>::defrag(double usage_rate_threshold) {
+  // TODO
+  impl_->defrag(usage_rate_threshold);
+}
+
+void DoubleArray<Bytes>::truncate() {
+  // TODO
+  impl_->truncate();
+}
+
+bool DoubleArray<Bytes>::find_longest_prefix_match(KeyArg query,
+                                                   int64_t *key_id,
+                                                   Key *key) {
+  return impl_->find_longest_prefix_match(query, key_id, key);
+}
+
+//MapCursor<Bytes> *DoubleArray<Bytes>::create_cursor(
+//    MapCursorAllKeys<Bytes> query, const MapCursorOptions &options) {
+//  // TODO
+//  return nullptr;
+//}
+
+//MapCursor<Bytes> *DoubleArray<Bytes>::create_cursor(
+//    const MapCursorKeyIDRange<Bytes> &query, const MapCursorOptions &options) {
+//  // TODO
+//  return nullptr;
+//}
+
+//MapCursor<Bytes> *DoubleArray<Bytes>::create_cursor(
+//    const MapCursorKeyRange<Bytes> &query, const MapCursorOptions &options) {
+//  // TODO
+//  return nullptr;
+//}
+
+void DoubleArray<Bytes>::create_map(Storage *storage, uint32_t storage_node_id,
+                                    const MapOptions &options) {
+  storage_ = storage;
+  StorageNode storage_node =
+      storage->create_node(storage_node_id, sizeof(Header));
+  storage_node_id_ = storage_node.id();
+  try {
+    header_ = static_cast<Header *>(storage_node.body());
+    *header_ = Header();
+    impl_.reset(Impl::create(storage, storage_node_id_, options));
+    pool_.reset(Pool::create(storage, storage_node_id_));
+    impl_->set_pool(pool_.get());
+    header_->impl_storage_node_id = impl_->storage_node_id();
+    header_->pool_storage_node_id = pool_->storage_node_id();
+  } catch (...) {
+    storage->unlink_node(storage_node_id_);
+    throw;
+  }
+}
+
+void DoubleArray<Bytes>::open_map(Storage *storage, uint32_t storage_node_id) {
+  storage_ = storage;
+  storage_node_id_ = storage_node_id;
+  StorageNode storage_node = storage->open_node(storage_node_id_);
+  if (storage_node.size() < sizeof(Header)) {
+    GRNXX_ERROR() << "invalid format: size = " << storage_node.size()
+                  << ", header_size = " << sizeof(Header);
+    throw LogicError();
+  }
+  header_ = static_cast<Header *>(storage_node.body());
+  if (!*header_) {
+    GRNXX_ERROR() << "wrong format: expected = " << FORMAT_STRING
+                  << ", actual = " << header_->common_header.format();
+    throw LogicError();
+  }
+  impl_.reset(Impl::open(storage, header_->impl_storage_node_id));
+  pool_.reset(Pool::open(storage, header_->pool_storage_node_id));
+  impl_->set_pool(pool_.get());
+}
+
 }  // namespace map
 }  // namespace grnxx

  Modified: lib/grnxx/map/double_array.hpp (+5 -38)
===================================================================
--- lib/grnxx/map/double_array.hpp    2013-08-07 13:17:27 +0900 (545efda)
+++ lib/grnxx/map/double_array.hpp    2013-08-07 15:37:15 +0900 (fa679c0)
@@ -22,7 +22,6 @@
 
 #include <memory>
 
-#include "grnxx/array.hpp"
 #include "grnxx/bytes.hpp"
 #include "grnxx/map.hpp"
 #include "grnxx/map_cursor.hpp"
@@ -38,8 +37,7 @@ namespace map {
 template <typename T> class KeyPool;
 
 struct DoubleArrayHeader;
-class DoubleArrayBlock;
-class DoubleArrayNode;
+class DoubleArrayImpl;
 
 template <typename T>
 class DoubleArray {
@@ -52,16 +50,8 @@ class DoubleArray {
 template <>
 class DoubleArray<Bytes> : public Map<Bytes> {
   using Header = DoubleArrayHeader;
-  using Node = DoubleArrayNode;
-  using Block = DoubleArrayBlock;
-
-  using NodeArray    = Array<Node,     65536, 8192>;  // 42-bit
-  using SiblingArray = Array<uint8_t, 262144, 4096>;  // 42-bit
-  using BlockArray   = Array<Block,     8192, 1024>;  // 33-bit
-
-  static constexpr uint64_t NODE_ARRAY_SIZE    = 1ULL << 42;
-  static constexpr uint64_t SIBLING_ARRAY_SIZE = 1ULL << 42;
-  static constexpr uint64_t BLOCK_ARRAY_SIZE   = 1ULL << 33;
+  using Impl   = DoubleArrayImpl;
+  using Pool   = KeyPool<Bytes>;
 
  public:
   using Key = typename Map<Bytes>::Key;
@@ -112,35 +102,12 @@ class DoubleArray<Bytes> : public Map<Bytes> {
   Storage *storage_;
   uint32_t storage_node_id_;
   Header *header_;
-  std::unique_ptr<NodeArray> nodes_;
-  std::unique_ptr<SiblingArray> siblings_;
-  std::unique_ptr<BlockArray> blocks_;
-  std::unique_ptr<KeyPool<Bytes>> pool_;
+  std::unique_ptr<Impl> impl_;
+  std::unique_ptr<Pool> pool_;
 
   void create_map(Storage *storage, uint32_t storage_node_id,
                   const MapOptions &options);
   void open_map(Storage *storage, uint32_t storage_node_id);
-
-  bool replace_key(int64_t key_id, KeyArg src_key, KeyArg dest_key);
-
-  bool find_leaf(KeyArg key, Node **leaf_node, uint64_t *leaf_key_pos);
-  bool insert_leaf(KeyArg key, Node *node, uint64_t key_pos, Node **leaf_node);
-
-  Node *insert_node(Node *node, uint64_t label);
-  Node *separate(Node *node, uint64_t labels[2]);
-
-  void resolve(Node *node, uint64_t label);
-  void migrate_nodes(Node *node, uint64_t dest_offset,
-                     const uint64_t *labels, uint64_t num_labels);
-
-  uint64_t find_offset(const uint64_t *labels, uint64_t num_labels);
-
-  Node *reserve_node(uint64_t node_id);
-  Block *reserve_block(uint64_t block_id);
-
-  void update_block_level(uint64_t block_id, Block *block, uint64_t level);
-  void set_block_level(uint64_t block_id, Block *block, uint64_t level);
-  void unset_block_level(uint64_t block_id, Block *block);
 };
 
 }  // namespace map
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Zurück zum Archiv-Index