susumu.yata
null+****@clear*****
Tue Dec 16 10:45:02 JST 2014
susumu.yata 2014-11-24 16:32:09 +0900 (Mon, 24 Nov 2014) New Revision: 7110b254711fe8b67b65a3f29e765a03dae659ff https://github.com/groonga/grnxx/commit/7110b254711fe8b67b65a3f29e765a03dae659ff Message: Remove the old implementation of Pipeline. Removed files: lib/grnxx/pipeline-old.cpp Deleted: lib/grnxx/pipeline-old.cpp (+0 -390) 100644 =================================================================== --- lib/grnxx/pipeline-old.cpp 2014-11-24 16:31:08 +0900 (dc740ae) +++ /dev/null @@ -1,390 +0,0 @@ -#include "grnxx/pipeline.hpp" - -#include "grnxx/cursor.hpp" -#include "grnxx/expression.hpp" -#include "grnxx/merger.hpp" - -namespace grnxx { -namespace pipeline { - -// -- Node -- - -class Node { - public: - Node() {} - virtual ~Node() {} - - virtual Int read_next(Error *error, Array<Record> *records) = 0; - virtual Int read_all(Error *error, Array<Record> *records); -}; - -Int Node::read_all(Error *error, Array<Record> *records) { - Int total_count = 0; - for ( ; ; ) { - Int count = read_next(error, records); - if (count == -1) { - return -1; - } else if (count == 0) { - break; - } - total_count += count; - } - return total_count; -} - -// --- CursorNode --- - -class CursorNode : public Node { - public: - explicit CursorNode(unique_ptr<Cursor> &&cursor) - : Node(), - cursor_(std::move(cursor)) {} - ~CursorNode() {} - - Int read_next(Error *error, Array<Record> *records); - Int read_all(Error *error, Array<Record> *records); - - private: - unique_ptr<Cursor> cursor_; -}; - -Int CursorNode::read_next(Error *error, Array<Record> *records) { - // TODO: The following block size (1024) should be optimized. - auto result = cursor_->read(error, 1024, records); - if (!result.is_ok) { - return -1; - } - return result.count; -} - -Int CursorNode::read_all(Error *error, Array<Record> *records) { - auto result = cursor_->read_all(error, records); - if (!result.is_ok) { - return -1; - } - return result.count; -} - -// --- FilterNode --- - -class FilterNode : public Node { - public: - FilterNode(unique_ptr<Node> &&arg, - unique_ptr<Expression> &&expression, - Int offset, - Int limit) - : Node(), - arg_(std::move(arg)), - expression_(std::move(expression)), - offset_(offset), - limit_(limit) {} - ~FilterNode() {} - - Int read_next(Error *error, Array<Record> *records); - - private: - unique_ptr<Node> arg_; - unique_ptr<Expression> expression_; - Int offset_; - Int limit_; -}; - -Int FilterNode::read_next(Error *error, Array<Record> *records) { - // TODO: The following threshold (1024) should be optimized. - Int offset = records->size(); - while (limit_ > 0) { - Int count = arg_->read_next(error, records); - if (count == -1) { - return -1; - } else if (count == 0) { - break; - } - ArrayRef<Record> ref = records->ref(records->size() - count, count); - if (!expression_->filter(error, ref, &ref)) { - return -1; - } - if (offset_ > 0) { - if (offset_ >= ref.size()) { - offset_ -= ref.size(); - ref = ref.ref(0, 0); - } else { - for (Int i = offset_; i < ref.size(); ++i) { - ref.set(i - offset_, ref[i]); - } - ref = ref.ref(0, ref.size() - offset_); - offset_ = 0; - } - } - if (ref.size() > limit_) { - ref = ref.ref(0, limit_); - } - limit_ -= ref.size(); - if (!records->resize(error, records->size() - count + ref.size())) { - return -1; - } - if ((records->size() - offset) >= 1024) { - break; - } - } - return records->size() - offset; -} - -// --- AdjusterNode --- - -class AdjusterNode : public Node { - public: - explicit AdjusterNode(unique_ptr<Node> &&arg, - unique_ptr<Expression> &&expression) - : Node(), - arg_(std::move(arg)), - expression_(std::move(expression)) {} - ~AdjusterNode() {} - - Int read_next(Error *error, Array<Record> *records); - - private: - unique_ptr<Node> arg_; - unique_ptr<Expression> expression_; -}; - -Int AdjusterNode::read_next(Error *error, Array<Record> *records) { - Int offset = records->size(); - Int count = arg_->read_next(error, records); - if (count == -1) { - return -1; - } - if (!expression_->adjust(error, records, offset)) { - return -1; - } - return count; -} - -// --- SorterNode --- - -class SorterNode : public Node { - public: - explicit SorterNode(unique_ptr<Node> &&arg, - unique_ptr<Sorter> &&sorter) - : Node(), - arg_(std::move(arg)), - sorter_(std::move(sorter)) {} - ~SorterNode() {} - - Int read_next(Error *error, Array<Record> *records); - - private: - unique_ptr<Node> arg_; - unique_ptr<Sorter> sorter_; -}; - -Int SorterNode::read_next(Error *error, Array<Record> *records) { - Int count = arg_->read_all(error, records); - if (count == -1) { - return -1; - } else if (count == 0) { - return 0; - } - if (!sorter_->sort(error, records)) { - return -1; - } - return records->size(); -} - -// --- MergerNode --- - -class MergerNode : public Node { - public: - explicit MergerNode(unique_ptr<Node> &&arg1, - unique_ptr<Node> &&arg2, - unique_ptr<Merger> &&merger) - : Node(), - arg1_(std::move(arg1)), - arg2_(std::move(arg2)), - merger_(std::move(merger)) {} - ~MergerNode() {} - - Int read_next(Error *error, Array<Record> *records); - - private: - unique_ptr<Node> arg1_; - unique_ptr<Node> arg2_; - unique_ptr<Merger> merger_; -}; - -Int MergerNode::read_next(Error *error, Array<Record> *records) { - Array<Record> arg1_records; - Int count = arg1_->read_all(error, &arg1_records); - if (count == -1) { - return -1; - } - Array<Record> arg2_records; - count = arg2_->read_all(error, &arg2_records); - if (count == -1) { - return -1; - } - if ((arg1_records.size() == 0) && (arg2_records.size() == 0)) { - return 0; - } - if (!merger_->merge(error, &arg1_records, &arg2_records, records)) { - return -1; - } - return records->size(); -} - -} // namespace pipeline - -using namespace pipeline; - -Pipeline::~Pipeline() {} - -bool Pipeline::flush(Error *error, Array<Record> *records) { - return root_->read_all(error, records) >= 0; -} - -unique_ptr<Pipeline> Pipeline::create(Error *error, - const Table *table, - unique_ptr<PipelineNode> &&root, - const PipelineOptions &) { - unique_ptr<Pipeline> pipeline( - new (nothrow) Pipeline(table, std::move(root))); - if (!pipeline) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - return pipeline; -} - -Pipeline::Pipeline(const Table *table, - unique_ptr<PipelineNode> &&root) - : table_(table), - root_(std::move(root)) {} - -unique_ptr<PipelineBuilder> PipelineBuilder::create(Error *error, - const Table *table) { - unique_ptr<PipelineBuilder> builder(new (nothrow) PipelineBuilder); - if (!builder) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return nullptr; - } - builder->table_ = table; - return builder; -} - -PipelineBuilder::~PipelineBuilder() {} - -bool PipelineBuilder::push_cursor(Error *error, unique_ptr<Cursor> &&cursor) { - // Reserve a space for a new node. - if (!stack_.reserve(error, stack_.size() + 1)) { - return false; - } - unique_ptr<Node> node(new (nothrow) CursorNode(std::move(cursor))); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - // This push_back() must not fail because a space is already reserved. - stack_.push_back(nullptr, std::move(node)); - return true; -} - -bool PipelineBuilder::push_filter(Error *error, - unique_ptr<Expression> &&expression, - Int offset, Int limit) { - if (stack_.size() < 1) { - GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); - return false; - } - unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]); - stack_.resize(nullptr, stack_.size() - 1); - unique_ptr<Node> node( - new (nothrow) FilterNode(std::move(arg), std::move(expression), - offset, limit)); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - stack_.push_back(error, std::move(node)); - return true; -} - -bool PipelineBuilder::push_adjuster(Error *error, - unique_ptr<Expression> &&expression) { - if (stack_.size() < 1) { - GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); - return false; - } - unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]); - stack_.resize(nullptr, stack_.size() - 1); - unique_ptr<Node> node( - new (nothrow) AdjusterNode(std::move(arg), std::move(expression))); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - stack_.push_back(error, std::move(node)); - return true; -} - -bool PipelineBuilder::push_sorter(Error *error, unique_ptr<Sorter> &&sorter) { - if (stack_.size() < 1) { - GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); - return false; - } - unique_ptr<Node> arg = std::move(stack_[stack_.size() - 1]); - stack_.resize(nullptr, stack_.size() - 1); - unique_ptr<Node> node( - new (nothrow) SorterNode(std::move(arg), std::move(sorter))); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - stack_.push_back(error, std::move(node)); - return true; -} - -bool PipelineBuilder::push_merger(Error *error, const MergerOptions &options) { - if (stack_.size() < 2) { - GRNXX_ERROR_SET(error, INVALID_OPERAND, "Not enough nodes"); - return false; - } - auto merger = Merger::create(error, options); - if (!merger) { - return false; - } - unique_ptr<Node> arg2 = std::move(stack_[stack_.size() - 2]); - unique_ptr<Node> arg1 = std::move(stack_[stack_.size() - 1]); - stack_.resize(nullptr, stack_.size() - 2); - unique_ptr<Node> node(new (nothrow) MergerNode(std::move(arg1), - std::move(arg2), - std::move(merger))); - if (!node) { - GRNXX_ERROR_SET(error, NO_MEMORY, "Memory allocation failed"); - return false; - } - stack_.push_back(error, std::move(node)); - return true; - - - // TODO - GRNXX_ERROR_SET(error, NOT_SUPPORTED_YET, "Not supported yet"); - return false; -} - -void PipelineBuilder::clear() { - stack_.clear(); -} - -unique_ptr<Pipeline> PipelineBuilder::release(Error *error, - const PipelineOptions &options) { - if (stack_.size() != 1) { - GRNXX_ERROR_SET(error, INVALID_ARGUMENT, "Incomplete pipeline"); - return nullptr; - } - unique_ptr<PipelineNode> root = std::move(stack_[0]); - stack_.clear(); - return Pipeline::create(error, table_, std::move(root), options); -} - -PipelineBuilder::PipelineBuilder() : table_(nullptr), stack_() {} - -} // namespace grnxx -------------- next part -------------- HTML����������������������������...Download