From f0a8be253e2a16f22acfaf13b7f7bc9988742e21 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 10 Jul 2014 09:31:42 -0700 Subject: [PATCH] JSON (Document) API sketch Summary: This is a rough sketch of our new document API. Would like to get some thoughts and comments about the high-level architecture and API. I didn't optimize for performance at all. Leaving some low-hanging fruit so that we can be happy when we fix them! :) Currently, bunch of features are not supported at all. Indexes can be only specified when creating database. There is no query planner whatsoever. This will all be added in due time. Test Plan: Added a simple unit test Reviewers: haobo, yhchiang, dhruba, sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D18747 --- Makefile | 4 + db/write_batch.cc | 17 + db/write_batch_internal.h | 3 + include/rocksdb/slice.h | 1 + include/rocksdb/write_batch.h | 4 + include/utilities/document_db.h | 149 ++++ include/utilities/json_document.h | 2 + utilities/document/document_db.cc | 1141 ++++++++++++++++++++++++ utilities/document/document_db_test.cc | 262 ++++++ utilities/document/json_document.cc | 54 ++ 10 files changed, 1637 insertions(+) create mode 100644 include/utilities/document_db.h create mode 100644 utilities/document/document_db.cc create mode 100644 utilities/document/document_db_test.cc diff --git a/Makefile b/Makefile index 606a92c1e..451b69057 100644 --- a/Makefile +++ b/Makefile @@ -104,6 +104,7 @@ TESTS = \ stringappend_test \ ttl_test \ backupable_db_test \ + document_db_test \ json_document_test \ version_edit_test \ version_set_test \ @@ -345,6 +346,9 @@ prefix_test: db/prefix_test.o $(LIBOBJECTS) $(TESTHARNESS) backupable_db_test: utilities/backupable/backupable_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) utilities/backupable/backupable_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +document_db_test: utilities/document/document_db_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) utilities/document/document_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + json_document_test: utilities/document/json_document_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) utilities/document/json_document_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/write_batch.cc b/db/write_batch.cc index 734d1e376..113f8a218 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -236,6 +236,23 @@ void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); } +void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeDeletion)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, key); +} + +void WriteBatch::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); +} + void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 85e85b33d..9a191f4cb 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -71,6 +71,9 @@ class WriteBatchInternal { static void Put(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static void Delete(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key); + static void Delete(WriteBatch* batch, uint32_t column_family_id, const Slice& key); diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 225371571..406a8abb9 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -107,6 +107,7 @@ class Slice { struct SliceParts { SliceParts(const Slice* _parts, int _num_parts) : parts(_parts), num_parts(_num_parts) { } + SliceParts() : parts(nullptr), num_parts(0) {} const Slice* parts; int num_parts; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 74ee2ad16..3272fd2f9 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -67,6 +67,10 @@ class WriteBatch { void Delete(ColumnFamilyHandle* column_family, const Slice& key); void Delete(const Slice& key) { Delete(nullptr, key); } + // variant that takes SliceParts + void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); + void Delete(const SliceParts& key) { Delete(nullptr, key); } + // Append a blob of arbitrary size to the records in this batch. The blob will // be stored in the transaction log but not in any other file. In particular, // it will not be persisted to the SST files. When iterating over this diff --git a/include/utilities/document_db.h b/include/utilities/document_db.h new file mode 100644 index 000000000..8e072ad29 --- /dev/null +++ b/include/utilities/document_db.h @@ -0,0 +1,149 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include + +#include "utilities/stackable_db.h" +#include "utilities/json_document.h" +#include "rocksdb/db.h" + +namespace rocksdb { + +// IMPORTANT: DocumentDB is a work in progress. It is unstable and we might +// change the API without warning. Talk to RocksDB team before using this in +// production ;) + +// DocumentDB is a layer on top of RocksDB that provides a very simple JSON API. +// When creating a DB, you specify a list of indexes you want to keep on your +// data. You can insert a JSON document to the DB, which is automatically +// indexed. Every document added to the DB needs to have "_id" field which is +// automatically indexed and is an unique primary key. All other indexes are +// non-unique. + +// NOTE: field names in the JSON are NOT allowed to start with '$' or +// contain '.'. We don't currently enforce that rule, but will start behaving +// badly. + +// Cursor is what you get as a result of executing query. To get all +// results from a query, call Next() on a Cursor while Valid() returns true +class Cursor { + public: + Cursor() = default; + virtual ~Cursor() {} + + virtual bool Valid() const = 0; + virtual void Next() = 0; + // Lifecycle of the returned JSONDocument is until the next Next() call + virtual const JSONDocument& document() const = 0; + virtual Status status() const = 0; + + private: + // No copying allowed + Cursor(const Cursor&); + void operator=(const Cursor&); +}; + +struct DocumentDBOptions { + int background_threads = 4; + uint64_t memtable_size = 128 * 1024 * 1024; // 128 MB + uint64_t cache_size = 1 * 1024 * 1024 * 1024; // 1 GB +}; + +// TODO(icanadi) Add `JSONDocument* info` parameter to all calls that can be +// used by the caller to get more information about the call execution (number +// of dropped records, number of updated records, etc.) +class DocumentDB : public StackableDB { + public: + struct IndexDescriptor { + // Currently, you can only define an index on a single field. To specify an + // index on a field X, set index description to JSON "{X: 1}" + // Currently the value needs to be 1, which means ascending. + // In the future, we plan to also support indexes on multiple keys, where + // you could mix ascending sorting (1) with descending sorting indexes (-1) + JSONDocument* description; + std::string name; + }; + + // Open DocumentDB with specified indexes. The list of indexes has to be + // complete, i.e. include all indexes present in the DB, except the primary + // key index. + // Otherwise, Open() will return an error + static Status Open(const DocumentDBOptions& options, const std::string& name, + const std::vector& indexes, + DocumentDB** db, bool read_only = false); + + explicit DocumentDB(DB* db) : StackableDB(db) {} + + // Create a new index. It will stop all writes for the duration of the call. + // All current documents in the DB are scanned and corresponding index entries + // are created + virtual Status CreateIndex(const WriteOptions& write_options, + const IndexDescriptor& index) = 0; + + // Drop an index. Client is responsible to make sure that index is not being + // used by currently executing queries + virtual Status DropIndex(const std::string& name) = 0; + + // Insert a document to the DB. The document needs to have a primary key "_id" + // which can either be a string or an integer. Otherwise the write will fail + // with InvalidArgument. + virtual Status Insert(const WriteOptions& options, + const JSONDocument& document) = 0; + + // Deletes all documents matching a filter atomically + virtual Status Remove(const ReadOptions& read_options, + const WriteOptions& write_options, + const JSONDocument& query) = 0; + + // Does this sequence of operations: + // 1. Find all documents matching a filter + // 2. For all documents, atomically: + // 2.1. apply the update operators + // 2.2. update the secondary indexes + // + // Currently only $set update operator is supported. + // Syntax is: {$set: {key1: value1, key2: value2, etc...}} + // This operator will change a document's key1 field to value1, key2 to + // value2, etc. New values will be set even if a document didn't have an entry + // for the specified key. + // + // You can not change a primary key of a document. + // + // Update example: Update({id: {$gt: 5}, $index: id}, {$set: {enabled: true}}) + virtual Status Update(const ReadOptions& read_options, + const WriteOptions& write_options, + const JSONDocument& filter, + const JSONDocument& updates) = 0; + + // query has to be an array in which every element is an operator. Currently + // only $filter operator is supported. Syntax of $filter operator is: + // {$filter: {key1: condition1, key2: condition2, etc.}} where conditions can + // be either: + // 1) a single value in which case the condition is equality condition, or + // 2) a defined operators, like {$gt: 4}, which will match all documents that + // have key greater than 4. + // + // Supported operators are: + // 1) $gt -- greater than + // 2) $gte -- greater than or equal + // 3) $lt -- less than + // 4) $lte -- less than or equal + // If you want the filter to use an index, you need to specify it like this: + // {$filter: {...(conditions)..., $index: index_name}} + // + // Example query: + // * [{$filter: {name: John, age: {$gte: 18}, $index: age}}] + // will return all Johns whose age is greater or equal to 18 and it will use + // index "age" to satisfy the query. + virtual Cursor* Query(const ReadOptions& read_options, + const JSONDocument& query) = 0; +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/include/utilities/json_document.h b/include/utilities/json_document.h index daac5a64f..ceb058cf9 100644 --- a/include/utilities/json_document.h +++ b/include/utilities/json_document.h @@ -99,6 +99,8 @@ class JSONDocument { bool operator==(const JSONDocument& rhs) const; + std::string DebugString() const; + private: class ItemsIteratorGenerator; diff --git a/utilities/document/document_db.cc b/utilities/document/document_db.cc new file mode 100644 index 000000000..b30bdf830 --- /dev/null +++ b/utilities/document/document_db.cc @@ -0,0 +1,1141 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include "utilities/document_db.h" + +#include "rocksdb/cache.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "util/mutexlock.h" +#include "port/port.h" +#include "utilities/json_document.h" + +namespace rocksdb { + +// IMPORTANT NOTE: Secondary index column families should be very small and +// generally fit in memory. Assume that accessing secondary index column +// families is much faster than accessing primary index (data heap) column +// family. Accessing a key (i.e. checking for existance) from a column family in +// RocksDB is not much faster than accessing both key and value since they are +// kept together and loaded from storage together. + +namespace { +// < 0 <=> lhs < rhs +// == 0 <=> lhs == rhs +// > 0 <=> lhs == rhs +// TODO(icanadi) move this to JSONDocument? +int DocumentCompare(const JSONDocument& lhs, const JSONDocument& rhs) { + assert(rhs.IsObject() == false && rhs.IsObject() == false && + lhs.type() == rhs.type()); + + switch (lhs.type()) { + case JSONDocument::kNull: + return 0; + case JSONDocument::kBool: + return static_cast(lhs.GetBool()) - static_cast(rhs.GetBool()); + case JSONDocument::kDouble: { + double res = lhs.GetDouble() - rhs.GetDouble(); + return res == 0.0 ? 0 : (res < 0.0 ? -1 : 1); + } + case JSONDocument::kInt64: { + int64_t res = lhs.GetInt64() - rhs.GetInt64(); + return res == 0 ? 0 : (res < 0 ? -1 : 1); + } + case JSONDocument::kString: + return Slice(lhs.GetString()).compare(Slice(rhs.GetString())); + default: + assert(false); + } + return 0; +} +} // namespace + +class Filter { + public: + // returns nullptr on parse failure + static Filter* ParseFilter(const JSONDocument& filter); + + struct Interval { + const JSONDocument* upper_bound; + const JSONDocument* lower_bound; + bool upper_inclusive; + bool lower_inclusive; + Interval() + : upper_bound(nullptr), + lower_bound(nullptr), + upper_inclusive(false), + lower_inclusive(false) {} + Interval(const JSONDocument* ub, const JSONDocument* lb, bool ui, bool li) + : upper_bound(ub), + lower_bound(lb), + upper_inclusive(ui), + lower_inclusive(li) {} + void UpdateUpperBound(const JSONDocument* ub, bool inclusive); + void UpdateLowerBound(const JSONDocument* lb, bool inclusive); + }; + + bool SatisfiesFilter(const JSONDocument& document) const; + const Interval* GetInterval(const std::string& field) const; + + private: + explicit Filter(const JSONDocument& filter) : filter_(filter) {} + + // copied from the parameter + const JSONDocument filter_; + // upper_bound and lower_bound point to JSONDocuments in filter_, so no need + // to free them + // constant after construction + std::unordered_map intervals_; +}; + +void Filter::Interval::UpdateUpperBound(const JSONDocument* ub, + bool inclusive) { + bool update = (upper_bound == nullptr); + if (!update) { + int cmp = DocumentCompare(*upper_bound, *ub); + update = (cmp > 0) || (cmp == 0 && !inclusive); + } + if (update) { + upper_bound = ub; + upper_inclusive = inclusive; + } +} + +void Filter::Interval::UpdateLowerBound(const JSONDocument* lb, + bool inclusive) { + bool update = (lower_bound == nullptr); + if (!update) { + int cmp = DocumentCompare(*lower_bound, *lb); + update = (cmp < 0) || (cmp == 0 && !inclusive); + } + if (update) { + lower_bound = lb; + lower_inclusive = inclusive; + } +} + +Filter* Filter::ParseFilter(const JSONDocument& filter) { + if (filter.IsObject() == false) { + return nullptr; + } + + std::unique_ptr f(new Filter(filter)); + + for (const auto& items : f->filter_.Items()) { + if (items.first.size() && items.first[0] == '$') { + // fields starting with '$' are commands + continue; + } + assert(f->intervals_.find(items.first) == f->intervals_.end()); + if (items.second->IsObject()) { + if (items.second->Count() == 0) { + // uhm...? + return nullptr; + } + Interval interval; + for (const auto& condition : items.second->Items()) { + if (condition.second->IsObject() || condition.second->IsArray()) { + // comparison operators not defined on objects. invalid array + return nullptr; + } + // comparison operators: + if (condition.first == "$gt") { + interval.UpdateLowerBound(condition.second, false); + } else if (condition.first == "$gte") { + interval.UpdateLowerBound(condition.second, true); + } else if (condition.first == "$lt") { + interval.UpdateUpperBound(condition.second, false); + } else if (condition.first == "$lte") { + interval.UpdateUpperBound(condition.second, true); + } else { + // TODO(icanadi) more logical operators + return nullptr; + } + } + f->intervals_.insert({items.first, interval}); + } else { + // equality + f->intervals_.insert( + {items.first, Interval(items.second, items.second, true, true)}); + } + } + + return f.release(); +} + +const Filter::Interval* Filter::GetInterval(const std::string& field) const { + auto itr = intervals_.find(field); + if (itr == intervals_.end()) { + return nullptr; + } + // we can do that since intervals_ is constant after construction + return &itr->second; +} + +bool Filter::SatisfiesFilter(const JSONDocument& document) const { + for (const auto& interval : intervals_) { + auto value = document.Get(interval.first); + if (value == nullptr) { + // doesn't have the value, doesn't satisfy the filter + // (we don't support null queries yet) + return false; + } + if (interval.second.upper_bound != nullptr) { + if (value->type() != interval.second.upper_bound->type()) { + // no cross-type queries yet + // TODO(icanadi) do this at least for numbers! + return false; + } + int cmp = DocumentCompare(*interval.second.upper_bound, *value); + if (cmp < 0 || (cmp == 0 && interval.second.upper_inclusive == false)) { + // bigger (or equal) than upper bound + return false; + } + } + if (interval.second.lower_bound != nullptr) { + if (value->type() != interval.second.lower_bound->type()) { + // no cross-type queries yet + return false; + } + int cmp = DocumentCompare(*interval.second.lower_bound, *value); + if (cmp > 0 || (cmp == 0 && interval.second.lower_inclusive == false)) { + // smaller (or equal) than the lower bound + return false; + } + } + } + return true; +} + +class Index { + public: + Index() = default; + virtual ~Index() {} + + virtual const char* Name() const = 0; + + // Functions that are executed during write time + // --------------------------------------------- + // GetIndexKey() generates a key that will be used to index document and + // returns the key though the second std::string* parameter + virtual void GetIndexKey(const JSONDocument& document, + std::string* key) const = 0; + // Keys generated with GetIndexKey() will be compared using this comparator. + // It should be assumed that there will be a suffix added to the index key + // according to IndexKey implementation + virtual const Comparator* GetComparator() const = 0; + + // Functions that are executed during query time + // --------------------------------------------- + enum Direction { + kForwards, + kBackwards, + }; + // Returns true if this index can provide some optimization for satisfying + // filter. False otherwise + virtual bool UsefulIndex(const Filter& filter) const = 0; + // For every filter (assuming UsefulIndex()) there is a continuous interval of + // keys in the index that satisfy the index conditions. That interval can be + // three things: + // * [A, B] + // * [A, infinity> + // * <-infinity, B] + // + // Query engine that uses this Index for optimization will access the interval + // by first calling Position() and then iterating in the Direction (returned + // by Position()) while ShouldContinueLooking() is true. + // * For [A, B] interval Position() will Seek() to A and return kForwards. + // ShouldContinueLooking() will be true until the iterator value gets beyond B + // -- then it will return false + // * For [A, infinity> Position() will Seek() to A and return kForwards. + // ShouldContinueLooking() will always return true + // * For <-infinity, B] Position() will Seek() to B and return kBackwards. + // ShouldContinueLooking() will always return true (given that iterator is + // advanced by calling Prev()) + virtual Direction Position(const Filter& filter, + Iterator* iterator) const = 0; + virtual bool ShouldContinueLooking(const Filter& filter, + const Slice& secondary_key, + Direction direction) const = 0; + + // Static function that is executed when Index is created + // --------------------------------------------- + // Create Index from user-supplied description. Return nullptr on parse + // failure. + static Index* CreateIndexFromDescription(const JSONDocument& description, + const std::string& name); + + private: + // No copying allowed + Index(const Index&); + void operator=(const Index&); +}; + +// Encoding helper function +namespace { +std::string InternalSecondaryIndexName(const std::string& user_name) { + return "index_" + user_name; +} + +// Don't change these, they are persisted in secondary indexes +enum JSONPrimitivesEncoding : char { + kNull = 0x1, + kBool = 0x2, + kDouble = 0x3, + kInt64 = 0x4, + kString = 0x5, +}; + +// encodes simple JSON members (meaning string, integer, etc) +// the end result of this will be lexicographically compared to each other +bool EncodeJSONPrimitive(const JSONDocument& json, std::string* dst) { + // TODO(icanadi) revise this at some point, have a custom comparator + switch (json.type()) { + case JSONDocument::kNull: + dst->push_back(kNull); + break; + case JSONDocument::kBool: + dst->push_back(kBool); + dst->push_back(static_cast(json.GetBool())); + break; + case JSONDocument::kDouble: + dst->push_back(kDouble); + PutFixed64(dst, static_cast(json.GetDouble())); + break; + case JSONDocument::kInt64: + dst->push_back(kInt64); + // TODO(icanadi) oops, this will not work correctly for negative numbers + PutFixed64(dst, static_cast(json.GetInt64())); + break; + case JSONDocument::kString: + dst->push_back(kString); + dst->append(json.GetString()); + break; + default: + return false; + } + return true; +} + +} // namespace + +// format of the secondary key is: +// +class IndexKey { + public: + IndexKey() : ok_(false) {} + explicit IndexKey(const Slice& slice) { + if (slice.size() < sizeof(uint32_t)) { + ok_ = false; + return; + } + uint32_t primary_key_offset = + DecodeFixed32(slice.data() + slice.size() - sizeof(uint32_t)); + if (primary_key_offset >= slice.size() - sizeof(uint32_t)) { + ok_ = false; + return; + } + parts_[0] = Slice(slice.data(), primary_key_offset); + parts_[1] = Slice(slice.data() + primary_key_offset, + slice.size() - primary_key_offset - sizeof(uint32_t)); + ok_ = true; + } + IndexKey(const Slice& secondary_key, const Slice& primary_key) : ok_(true) { + parts_[0] = secondary_key; + parts_[1] = primary_key; + } + + SliceParts GetSliceParts() { + uint32_t primary_key_offset = static_cast(parts_[0].size()); + EncodeFixed32(primary_key_offset_buf_, primary_key_offset); + parts_[2] = Slice(primary_key_offset_buf_, sizeof(uint32_t)); + return SliceParts(parts_, 3); + } + + const Slice& GetPrimaryKey() const { return parts_[1]; } + const Slice& GetSecondaryKey() const { return parts_[0]; } + + bool ok() const { return ok_; } + + private: + bool ok_; + // 0 -- secondary key + // 1 -- primary key + // 2 -- primary key offset + Slice parts_[3]; + char primary_key_offset_buf_[sizeof(uint32_t)]; +}; + +class SimpleSortedIndex : public Index { + public: + SimpleSortedIndex(const std::string field, const std::string& name) + : field_(field), name_(name) {} + + virtual const char* Name() const override { return name_.c_str(); } + + virtual void GetIndexKey(const JSONDocument& document, std::string* key) const + override { + auto value = document.Get(field_); + if (value == nullptr) { + // null + if (!EncodeJSONPrimitive(JSONDocument(JSONDocument::kNull), key)) { + assert(false); + } + } + if (!EncodeJSONPrimitive(*value, key)) { + assert(false); + } + } + virtual const Comparator* GetComparator() const override { + return BytewiseComparator(); + } + + virtual bool UsefulIndex(const Filter& filter) const { + return filter.GetInterval(field_) != nullptr; + } + // REQUIRES: UsefulIndex(filter) == true + virtual Direction Position(const Filter& filter, Iterator* iterator) const { + auto interval = filter.GetInterval(field_); + assert(interval != nullptr); // because index is useful + Direction direction; + + std::string op; + const JSONDocument* limit; + if (interval->lower_bound != nullptr) { + limit = interval->lower_bound; + direction = kForwards; + } else { + limit = interval->upper_bound; + direction = kBackwards; + } + + std::string encoded_limit; + if (!EncodeJSONPrimitive(*limit, &encoded_limit)) { + assert(false); + } + iterator->Seek(Slice(encoded_limit)); + + return direction; + } + // REQUIRES: UsefulIndex(filter) == true + virtual bool ShouldContinueLooking(const Filter& filter, + const Slice& secondary_key, + Index::Direction direction) const { + auto interval = filter.GetInterval(field_); + assert(interval != nullptr); // because index is useful + + if (direction == kForwards) { + if (interval->upper_bound == nullptr) { + // continue looking, no upper bound + return true; + } + std::string encoded_upper_bound; + if (!EncodeJSONPrimitive(*interval->upper_bound, &encoded_upper_bound)) { + // uhm...? + // TODO(icanadi) store encoded upper and lower bounds in Filter*? + assert(false); + } + // TODO(icanadi) we need to somehow decode this and use DocumentCompare() + int compare = secondary_key.compare(Slice(encoded_upper_bound)); + // if (current key is bigger than upper bound) OR (current key is equal to + // upper bound, but inclusive is false) THEN stop looking. otherwise, + // continue + return (compare > 0 || + (compare == 0 && interval->upper_inclusive == false)) + ? false + : true; + } else { + assert(direction == kBackwards); + if (interval->lower_bound == nullptr) { + // continue looking, no lower bound + return true; + } + std::string encoded_lower_bound; + if (!EncodeJSONPrimitive(*interval->lower_bound, &encoded_lower_bound)) { + // uhm...? + // TODO(icanadi) store encoded upper and lower bounds in Filter*? + assert(false); + } + // TODO(icanadi) we need to somehow decode this and use DocumentCompare() + int compare = secondary_key.compare(Slice(encoded_lower_bound)); + // if (current key is smaller than lower bound) OR (current key is equal + // to lower bound, but inclusive is false) THEN stop looking. otherwise, + // continue + return (compare < 0 || + (compare == 0 && interval->lower_inclusive == false)) + ? false + : true; + } + + assert(false); + // this is here just so compiler doesn't complain + return false; + } + + private: + std::string field_; + std::string name_; +}; + +Index* Index::CreateIndexFromDescription(const JSONDocument& description, + const std::string& name) { + if (!description.IsObject() || description.Count() != 1) { + // not supported yet + return nullptr; + } + const auto& field = *description.Items().begin(); + if (field.second->IsInt64() == false || field.second->GetInt64() != 1) { + // not supported yet + return nullptr; + } + return new SimpleSortedIndex(field.first, name); +} + +class CursorWithFilterIndexed : public Cursor { + public: + CursorWithFilterIndexed(Iterator* primary_index_iter, + Iterator* secondary_index_iter, const Index* index, + const Filter* filter) + : primary_index_iter_(primary_index_iter), + secondary_index_iter_(secondary_index_iter), + index_(index), + filter_(filter), + valid_(true), + current_json_document_(nullptr) { + assert(filter_.get() != nullptr); + direction_ = index->Position(*filter_.get(), secondary_index_iter_.get()); + UpdateIndexKey(); + AdvanceUntilSatisfies(); + } + + virtual bool Valid() const override { + return valid_ && secondary_index_iter_->Valid(); + } + virtual void Next() { + assert(Valid()); + Advance(); + AdvanceUntilSatisfies(); + } + // temporary object. copy it if you want to use it + virtual const JSONDocument& document() const { + assert(Valid()); + return *current_json_document_; + } + virtual Status status() const { + if (!status_.ok()) { + return status_; + } + if (!primary_index_iter_->status().ok()) { + return primary_index_iter_->status(); + } + return secondary_index_iter_->status(); + } + + private: + void Advance() { + if (direction_ == Index::kForwards) { + secondary_index_iter_->Next(); + } else { + secondary_index_iter_->Prev(); + } + UpdateIndexKey(); + } + void AdvanceUntilSatisfies() { + bool found = false; + while (secondary_index_iter_->Valid() && + index_->ShouldContinueLooking( + *filter_.get(), index_key_.GetSecondaryKey(), direction_)) { + if (!UpdateJSONDocument()) { + // corruption happened + return; + } + if (filter_->SatisfiesFilter(*current_json_document_)) { + // we found satisfied! + found = true; + break; + } else { + // doesn't satisfy :( + Advance(); + } + } + if (!found) { + valid_ = false; + } + } + + bool UpdateJSONDocument() { + assert(secondary_index_iter_->Valid()); + primary_index_iter_->Seek(index_key_.GetPrimaryKey()); + if (!primary_index_iter_->Valid()) { + status_ = Status::Corruption( + "Inconsistency between primary and secondary index"); + valid_ = false; + return false; + } + current_json_document_.reset( + JSONDocument::Deserialize(primary_index_iter_->value())); + if (current_json_document_.get() == nullptr) { + status_ = Status::Corruption("JSON deserialization failed"); + valid_ = false; + return false; + } + return true; + } + void UpdateIndexKey() { + if (secondary_index_iter_->Valid()) { + index_key_ = IndexKey(secondary_index_iter_->key()); + if (!index_key_.ok()) { + status_ = Status::Corruption("Invalid index key"); + valid_ = false; + } + } + } + std::unique_ptr primary_index_iter_; + std::unique_ptr secondary_index_iter_; + // we don't own index_ + const Index* index_; + Index::Direction direction_; + std::unique_ptr filter_; + bool valid_; + IndexKey index_key_; + std::unique_ptr current_json_document_; + Status status_; +}; + +class CursorFromIterator : public Cursor { + public: + explicit CursorFromIterator(Iterator* iter) + : iter_(iter), current_json_document_(nullptr) { + iter_->SeekToFirst(); + UpdateCurrentJSON(); + } + + virtual bool Valid() const override { return status_.ok() && iter_->Valid(); } + virtual void Next() override { + iter_->Next(); + UpdateCurrentJSON(); + } + virtual const JSONDocument& document() const override { + assert(Valid()); + return *current_json_document_; + }; + virtual Status status() const override { + if (!status_.ok()) { + return status_; + } + return iter_->status(); + } + + // not part of public Cursor interface + Slice key() const { return iter_->key(); } + + private: + void UpdateCurrentJSON() { + if (Valid()) { + current_json_document_.reset(JSONDocument::Deserialize(iter_->value())); + if (current_json_document_.get() == nullptr) { + status_ = Status::Corruption("JSON deserialization failed"); + } + } + } + + Status status_; + std::unique_ptr iter_; + std::unique_ptr current_json_document_; +}; + +class CursorWithFilter : public Cursor { + public: + CursorWithFilter(Cursor* base_cursor, const Filter* filter) + : base_cursor_(base_cursor), filter_(filter) { + assert(filter_.get() != nullptr); + SeekToNextSatisfies(); + } + virtual bool Valid() const override { return base_cursor_->Valid(); } + virtual void Next() override { + assert(Valid()); + base_cursor_->Next(); + SeekToNextSatisfies(); + } + virtual const JSONDocument& document() const override { + assert(Valid()); + return base_cursor_->document(); + } + virtual Status status() const override { return base_cursor_->status(); } + + private: + void SeekToNextSatisfies() { + for (; base_cursor_->Valid(); base_cursor_->Next()) { + if (filter_->SatisfiesFilter(base_cursor_->document())) { + break; + } + } + } + std::unique_ptr base_cursor_; + std::unique_ptr filter_; +}; + +class CursorError : public Cursor { + public: + explicit CursorError(Status s) : s_(s) { assert(!s.ok()); } + virtual Status status() const override { return s_; } + virtual bool Valid() const override { return false; } + virtual void Next() override {} + virtual const JSONDocument& document() const override { + assert(false); + // compiler complains otherwise + return trash_; + } + + private: + Status s_; + JSONDocument trash_; +}; + +class DocumentDBImpl : public DocumentDB { + public: + DocumentDBImpl( + DB* db, ColumnFamilyHandle* primary_key_column_family, + const std::vector>& indexes, + const Options& rocksdb_options) + : DocumentDB(db), + primary_key_column_family_(primary_key_column_family), + rocksdb_options_(rocksdb_options) { + for (const auto& index : indexes) { + name_to_index_.insert( + {index.first->Name(), IndexColumnFamily(index.first, index.second)}); + } + } + + ~DocumentDBImpl() { + for (auto& iter : name_to_index_) { + delete iter.second.index; + delete iter.second.column_family; + } + delete primary_key_column_family_; + } + + virtual Status CreateIndex(const WriteOptions& write_options, + const IndexDescriptor& index) { + auto index_obj = + Index::CreateIndexFromDescription(*index.description, index.name); + if (index_obj == nullptr) { + return Status::InvalidArgument("Failed parsing index description"); + } + + ColumnFamilyHandle* cf_handle; + Status s = + CreateColumnFamily(ColumnFamilyOptions(rocksdb_options_), + InternalSecondaryIndexName(index.name), &cf_handle); + if (!s.ok()) { + return s; + } + + MutexLock l(&write_mutex_); + + std::unique_ptr cursor(new CursorFromIterator( + DocumentDB::NewIterator(ReadOptions(), primary_key_column_family_))); + + WriteBatch batch; + for (; cursor->Valid(); cursor->Next()) { + std::string secondary_index_key; + index_obj->GetIndexKey(cursor->document(), &secondary_index_key); + IndexKey index_key(Slice(secondary_index_key), cursor->key()); + batch.Put(cf_handle, index_key.GetSliceParts(), SliceParts()); + } + + if (!cursor->status().ok()) { + delete index_obj; + return cursor->status(); + } + + { + MutexLock l_nti(&name_to_index_mutex_); + name_to_index_.insert( + {index.name, IndexColumnFamily(index_obj, cf_handle)}); + } + + return DocumentDB::Write(write_options, &batch); + } + + virtual Status DropIndex(const std::string& name) { + MutexLock l(&write_mutex_); + + auto index_iter = name_to_index_.find(name); + if (index_iter == name_to_index_.end()) { + return Status::InvalidArgument("No such index"); + } + + Status s = DropColumnFamily(index_iter->second.column_family); + if (!s.ok()) { + return s; + } + + delete index_iter->second.index; + delete index_iter->second.column_family; + + // remove from name_to_index_ + { + MutexLock l_nti(&name_to_index_mutex_); + name_to_index_.erase(index_iter); + } + + return Status::OK(); + } + + virtual Status Insert(const WriteOptions& options, + const JSONDocument& document) { + WriteBatch batch; + + if (!document.IsObject()) { + return Status::InvalidArgument("Document not an object"); + } + auto primary_key = document.Get(kPrimaryKey); + if (primary_key == nullptr || primary_key->IsNull() || + (!primary_key->IsString() && !primary_key->IsInt64())) { + return Status::InvalidArgument( + "No primary key or primary key format error"); + } + std::string encoded_document; + document.Serialize(&encoded_document); + std::string primary_key_encoded; + if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { + // previous call should be guaranteed to pass because of all primary_key + // conditions checked before + assert(false); + } + Slice primary_key_slice(primary_key_encoded); + + // Lock now, since we're starting DB operations + MutexLock l(&write_mutex_); + // check if there is already a document with the same primary key + std::string value; + Status s = DocumentDB::Get(ReadOptions(), primary_key_column_family_, + primary_key_slice, &value); + if (!s.IsNotFound()) { + return s.ok() ? Status::InvalidArgument("Duplicate primary key!") : s; + } + + batch.Put(primary_key_column_family_, primary_key_slice, encoded_document); + + for (const auto& iter : name_to_index_) { + std::string secondary_index_key; + iter.second.index->GetIndexKey(document, &secondary_index_key); + IndexKey index_key(Slice(secondary_index_key), primary_key_slice); + batch.Put(iter.second.column_family, index_key.GetSliceParts(), + SliceParts()); + } + + return DocumentDB::Write(options, &batch); + } + + virtual Status Remove(const ReadOptions& read_options, + const WriteOptions& write_options, + const JSONDocument& query) override { + MutexLock l(&write_mutex_); + std::unique_ptr cursor( + ConstructFilterCursor(read_options, nullptr, query)); + + WriteBatch batch; + for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) { + const auto& document = cursor->document(); + if (!document.IsObject()) { + return Status::Corruption("Document corruption"); + } + auto primary_key = document.Get(kPrimaryKey); + if (primary_key == nullptr || primary_key->IsNull() || + (!primary_key->IsString() && !primary_key->IsInt64())) { + return Status::Corruption("Document corruption"); + } + + // TODO(icanadi) Instead of doing this, just get primary key encoding from + // cursor, as it already has this information + std::string primary_key_encoded; + if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { + // previous call should be guaranteed to pass because of all primary_key + // conditions checked before + assert(false); + } + Slice primary_key_slice(primary_key_encoded); + batch.Delete(primary_key_column_family_, primary_key_slice); + + for (const auto& iter : name_to_index_) { + std::string secondary_index_key; + iter.second.index->GetIndexKey(document, &secondary_index_key); + IndexKey index_key(Slice(secondary_index_key), primary_key_slice); + batch.Delete(iter.second.column_family, index_key.GetSliceParts()); + } + } + + if (!cursor->status().ok()) { + return cursor->status(); + } + + return DocumentDB::Write(write_options, &batch); + } + + virtual Status Update(const ReadOptions& read_options, + const WriteOptions& write_options, + const JSONDocument& filter, + const JSONDocument& updates) { + MutexLock l(&write_mutex_); + std::unique_ptr cursor( + ConstructFilterCursor(read_options, nullptr, filter)); + + WriteBatch batch; + for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) { + const auto& old_document = cursor->document(); + JSONDocument new_document(old_document); + if (!new_document.IsObject()) { + return Status::Corruption("Document corruption"); + } + // TODO(icanadi) Make this nicer, something like class Filter + for (const auto& update : updates.Items()) { + if (update.first == "$set") { + for (const auto& itr : update.second->Items()) { + if (itr.first == kPrimaryKey) { + return Status::NotSupported("Please don't change primary key"); + } + new_document.Set(itr.first, *itr.second); + } + } else { + // TODO(icanadi) more commands + return Status::InvalidArgument("Can't understand update command"); + } + } + + // TODO(icanadi) reuse some of this code + auto primary_key = new_document.Get(kPrimaryKey); + if (primary_key == nullptr || primary_key->IsNull() || + (!primary_key->IsString() && !primary_key->IsInt64())) { + // This will happen when document on storage doesn't have primary key, + // since we don't support any update operations on primary key. That's + // why this is corruption error + return Status::Corruption("Corrupted document -- primary key missing"); + } + std::string encoded_document; + new_document.Serialize(&encoded_document); + std::string primary_key_encoded; + if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { + // previous call should be guaranteed to pass because of all primary_key + // conditions checked before + assert(false); + } + Slice primary_key_slice(primary_key_encoded); + batch.Put(primary_key_column_family_, primary_key_slice, + encoded_document); + + for (const auto& iter : name_to_index_) { + std::string old_key, new_key; + iter.second.index->GetIndexKey(old_document, &old_key); + iter.second.index->GetIndexKey(new_document, &new_key); + if (old_key == new_key) { + // don't need to update this secondary index + continue; + } + + IndexKey old_index_key(Slice(old_key), primary_key_slice); + IndexKey new_index_key(Slice(new_key), primary_key_slice); + + batch.Delete(iter.second.column_family, old_index_key.GetSliceParts()); + batch.Put(iter.second.column_family, new_index_key.GetSliceParts(), + SliceParts()); + } + } + + if (!cursor->status().ok()) { + return cursor->status(); + } + + return DocumentDB::Write(write_options, &batch); + } + + virtual Cursor* Query(const ReadOptions& read_options, + const JSONDocument& query) override { + Cursor* cursor = nullptr; + + if (!query.IsArray()) { + return new CursorError( + Status::InvalidArgument("Query has to be an array")); + } + + // TODO(icanadi) support index "_id" + for (size_t i = 0; i < query.Count(); ++i) { + const auto& command_doc = query[i]; + if (command_doc.Count() != 1) { + // there can be only one key-value pair in each of array elements. + // key is the command and value are the params + delete cursor; + return new CursorError(Status::InvalidArgument("Invalid query")); + } + const auto& command = *command_doc.Items().begin(); + + if (command.first == "$filter") { + cursor = ConstructFilterCursor(read_options, cursor, *command.second); + } else { + // only filter is supported for now + delete cursor; + return new CursorError(Status::InvalidArgument("Invalid query")); + } + } + + if (cursor == nullptr) { + cursor = new CursorFromIterator( + DocumentDB::NewIterator(read_options, primary_key_column_family_)); + } + + return cursor; + } + + // RocksDB functions + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) override { + return Status::NotSupported(""); + } + virtual Status Write(const WriteOptions& options, + WriteBatch* updates) override { + return Status::NotSupported(""); + } + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override { + return nullptr; + } + + private: + Cursor* ConstructFilterCursor(ReadOptions read_options, Cursor* cursor, + const JSONDocument& query) { + std::unique_ptr filter(Filter::ParseFilter(query)); + if (filter.get() == nullptr) { + return new CursorError(Status::InvalidArgument("Invalid query")); + } + + IndexColumnFamily tmp_storage(nullptr, nullptr); + + if (cursor == nullptr) { + auto index_name = query.Get("$index"); + IndexColumnFamily* index_column_family = nullptr; + if (index_name != nullptr && index_name->IsString()) { + { + MutexLock l(&name_to_index_mutex_); + auto index_iter = name_to_index_.find(index_name->GetString()); + if (index_iter != name_to_index_.end()) { + tmp_storage = index_iter->second; + index_column_family = &tmp_storage; + } else { + return new CursorError( + Status::InvalidArgument("Index does not exist")); + } + } + } + + if (index_column_family != nullptr && + index_column_family->index->UsefulIndex(*filter.get())) { + std::vector iterators; + Status s = DocumentDB::NewIterators( + read_options, + {primary_key_column_family_, index_column_family->column_family}, + &iterators); + if (!s.ok()) { + delete cursor; + return new CursorError(s); + } + assert(iterators.size() == 2); + return new CursorWithFilterIndexed(iterators[0], iterators[1], + index_column_family->index, + filter.release()); + } else { + return new CursorWithFilter( + new CursorFromIterator(DocumentDB::NewIterator( + read_options, primary_key_column_family_)), + filter.release()); + } + } else { + return new CursorWithFilter(cursor, filter.release()); + } + assert(false); + return nullptr; + } + + // currently, we lock and serialize all writes to rocksdb. reads are not + // locked and always get consistent view of the database. we should optimize + // locking in the future + port::Mutex write_mutex_; + port::Mutex name_to_index_mutex_; + const char* kPrimaryKey = "_id"; + struct IndexColumnFamily { + IndexColumnFamily(Index* _index, ColumnFamilyHandle* _column_family) + : index(_index), column_family(_column_family) {} + Index* index; + ColumnFamilyHandle* column_family; + }; + + + // name_to_index_ protected: + // 1) when writing -- 1. lock write_mutex_, 2. lock name_to_index_mutex_ + // 2) when reading -- lock name_to_index_mutex_ OR write_mutex_ + std::unordered_map name_to_index_; + ColumnFamilyHandle* primary_key_column_family_; + Options rocksdb_options_; +}; + +namespace { +Options GetRocksDBOptionsFromOptions(const DocumentDBOptions& options) { + Options rocksdb_options; + rocksdb_options.max_background_compactions = options.background_threads - 1; + rocksdb_options.max_background_flushes = 1; + rocksdb_options.write_buffer_size = options.memtable_size; + rocksdb_options.max_write_buffer_number = 6; + rocksdb_options.block_cache = NewLRUCache(options.cache_size); + return rocksdb_options; +} +} // namespace + +Status DocumentDB::Open(const DocumentDBOptions& options, + const std::string& name, + const std::vector& indexes, + DocumentDB** db, bool read_only) { + Options rocksdb_options = GetRocksDBOptionsFromOptions(options); + rocksdb_options.create_if_missing = true; + + std::vector column_families; + column_families.push_back(ColumnFamilyDescriptor( + kDefaultColumnFamilyName, ColumnFamilyOptions(rocksdb_options))); + for (const auto& index : indexes) { + column_families.emplace_back(InternalSecondaryIndexName(index.name), + ColumnFamilyOptions(rocksdb_options)); + } + std::vector handles; + DB* base_db; + Status s; + if (read_only) { + s = DB::OpenForReadOnly(DBOptions(rocksdb_options), name, column_families, + &handles, &base_db); + } else { + s = DB::Open(DBOptions(rocksdb_options), name, column_families, &handles, + &base_db); + } + if (!s.ok()) { + return s; + } + + std::vector> index_cf(indexes.size()); + assert(handles.size() == indexes.size() + 1); + for (size_t i = 0; i < indexes.size(); ++i) { + auto index = Index::CreateIndexFromDescription(*indexes[i].description, + indexes[i].name); + index_cf[i] = {index, handles[i + 1]}; + } + *db = new DocumentDBImpl(base_db, handles[0], index_cf, rocksdb_options); + return Status::OK(); +} + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/document/document_db_test.cc b/utilities/document/document_db_test.cc new file mode 100644 index 000000000..25d5effc2 --- /dev/null +++ b/utilities/document/document_db_test.cc @@ -0,0 +1,262 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include + +#include "utilities/json_document.h" +#include "utilities/document_db.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +class DocumentDBTest { + public: + DocumentDBTest() { + dbname_ = test::TmpDir() + "/document_db_test"; + DestroyDB(dbname_, Options()); + } + ~DocumentDBTest() { + delete db_; + DestroyDB(dbname_, Options()); + } + + void AssertCursorIDs(Cursor* cursor, std::vector expected) { + std::vector got; + while (cursor->Valid()) { + ASSERT_TRUE(cursor->Valid()); + ASSERT_TRUE(cursor->document().Contains("_id")); + got.push_back(cursor->document()["_id"].GetInt64()); + cursor->Next(); + } + std::sort(expected.begin(), expected.end()); + std::sort(got.begin(), got.end()); + ASSERT_TRUE(got == expected); + } + + // converts ' to ", so that we don't have to escape " all over the place + std::string ConvertQuotes(const std::string& input) { + std::string output; + for (auto x : input) { + if (x == '\'') { + output.push_back('\"'); + } else { + output.push_back(x); + } + } + return output; + } + + void CreateIndexes(std::vector indexes) { + for (auto i : indexes) { + ASSERT_OK(db_->CreateIndex(WriteOptions(), i)); + } + } + + JSONDocument* Parse(const std::string doc) { + return JSONDocument::ParseJSON(ConvertQuotes(doc).c_str()); + } + + std::string dbname_; + DocumentDB* db_; +}; + +TEST(DocumentDBTest, SimpleQueryTest) { + DocumentDBOptions options; + DocumentDB::IndexDescriptor index; + index.description = Parse("{'name': 1}"); + index.name = "name_index"; + + ASSERT_OK(DocumentDB::Open(options, dbname_, {}, &db_)); + CreateIndexes({index}); + delete db_; + // now there is index present + ASSERT_OK(DocumentDB::Open(options, dbname_, {index}, &db_)); + delete index.description; + + std::vector json_objects = { + "{'_id': 1, 'name': 'One'}", "{'_id': 2, 'name': 'Two'}", + "{'_id': 3, 'name': 'Three'}", "{'_id': 4, 'name': 'Four'}"}; + + for (auto& json : json_objects) { + std::unique_ptr document(Parse(json)); + ASSERT_TRUE(document.get() != nullptr); + ASSERT_OK(db_->Insert(WriteOptions(), *document)); + } + + // inserting a document with existing primary key should return failure + { + std::unique_ptr document(Parse(json_objects[0])); + ASSERT_TRUE(document.get() != nullptr); + Status s = db_->Insert(WriteOptions(), *document); + ASSERT_TRUE(s.IsInvalidArgument()); + } + + // find equal to "Two" + { + std::unique_ptr query( + Parse("[{'$filter': {'name': 'Two', '$index': 'name_index'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {2}); + } + + // find less than "Three" + { + std::unique_ptr query(Parse( + "[{'$filter': {'name': {'$lt': 'Three'}, '$index': " + "'name_index'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + + AssertCursorIDs(cursor.get(), {1, 4}); + } + + // find less than "Three" without index + { + std::unique_ptr query( + Parse("[{'$filter': {'name': {'$lt': 'Three'} }}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {1, 4}); + } + + // remove less or equal to "Three" + { + std::unique_ptr query( + Parse("{'name': {'$lte': 'Three'}, '$index': 'name_index'}")); + ASSERT_OK(db_->Remove(ReadOptions(), WriteOptions(), *query)); + } + + // find all -- only "Two" left, everything else should be deleted + { + std::unique_ptr query(Parse("[]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {2}); + } +} + +TEST(DocumentDBTest, ComplexQueryTest) { + DocumentDBOptions options; + DocumentDB::IndexDescriptor priority_index; + priority_index.description = Parse("{'priority': 1}"); + priority_index.name = "priority"; + DocumentDB::IndexDescriptor job_name_index; + job_name_index.description = Parse("{'job_name': 1}"); + job_name_index.name = "job_name"; + DocumentDB::IndexDescriptor progress_index; + progress_index.description = Parse("{'progress': 1}"); + progress_index.name = "progress"; + + ASSERT_OK(DocumentDB::Open(options, dbname_, {}, &db_)); + CreateIndexes({priority_index, progress_index}); + delete priority_index.description; + delete progress_index.description; + + std::vector json_objects = { + "{'_id': 1, 'job_name': 'play', 'priority': 10, 'progress': 14.2}", + "{'_id': 2, 'job_name': 'white', 'priority': 2, 'progress': 45.1}", + "{'_id': 3, 'job_name': 'straw', 'priority': 5, 'progress': 83.2}", + "{'_id': 4, 'job_name': 'temporary', 'priority': 3, 'progress': 14.9}", + "{'_id': 5, 'job_name': 'white', 'priority': 4, 'progress': 44.2}", + "{'_id': 6, 'job_name': 'tea', 'priority': 1, 'progress': 12.4}", + "{'_id': 7, 'job_name': 'delete', 'priority': 2, 'progress': 77.54}", + "{'_id': 8, 'job_name': 'rock', 'priority': 3, 'progress': 93.24}", + "{'_id': 9, 'job_name': 'steady', 'priority': 3, 'progress': 9.1}", + "{'_id': 10, 'job_name': 'white', 'priority': 1, 'progress': 61.4}", + "{'_id': 11, 'job_name': 'who', 'priority': 4, 'progress': 39.41}", }; + + // add index on the fly! + CreateIndexes({job_name_index}); + delete job_name_index.description; + + for (auto& json : json_objects) { + std::unique_ptr document(Parse(json)); + ASSERT_TRUE(document != nullptr); + ASSERT_OK(db_->Insert(WriteOptions(), *document)); + } + + // 2 < priority < 4 AND progress > 10.0, index priority + { + std::unique_ptr query(Parse( + "[{'$filter': {'priority': {'$lt': 4, '$gt': 2}, 'progress': {'$gt': " + "10.0}, '$index': 'priority'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {4, 8}); + } + + // 2 < priority < 4 AND progress > 10.0, index progress + { + std::unique_ptr query(Parse( + "[{'$filter': {'priority': {'$lt': 4, '$gt': 2}, 'progress': {'$gt': " + "10.0}, '$index': 'progress'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {4, 8}); + } + + // job_name == 'white' AND priority >= 2, index job_name + { + std::unique_ptr query(Parse( + "[{'$filter': {'job_name': 'white', 'priority': {'$gte': " + "2}, '$index': 'job_name'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {2, 5}); + } + + // 35.0 <= progress < 65.5, index progress + { + std::unique_ptr query(Parse( + "[{'$filter': {'progress': {'$gt': 5.0, '$gte': 35.0, '$lt': 65.5}, " + "'$index': 'progress'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {2, 5, 10, 11}); + } + + // 2 < priority <= 4, index priority + { + std::unique_ptr query(Parse( + "[{'$filter': {'priority': {'$gt': 2, '$lt': 8, '$lte': 4}, " + "'$index': 'priority'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {4, 5, 8, 9, 11}); + } + + // Delete all whose progress is bigger than 50% + { + std::unique_ptr query( + Parse("{'progress': {'$gt': 50.0}, '$index': 'progress'}")); + ASSERT_OK(db_->Remove(ReadOptions(), WriteOptions(), *query)); + } + + // 2 < priority < 6, index priority + { + std::unique_ptr query(Parse( + "[{'$filter': {'priority': {'$gt': 2, '$lt': 6}, " + "'$index': 'priority'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {4, 5, 9, 11}); + } + + // update set priority to 10 where job_name is 'white' + { + std::unique_ptr query(Parse("{'job_name': 'white'}")); + std::unique_ptr update(Parse("{'$set': {'priority': 10}}")); + ASSERT_OK(db_->Update(ReadOptions(), WriteOptions(), *query, *update)); + } + + // 4 < priority + { + std::unique_ptr query( + Parse("[{'$filter': {'priority': {'$gt': 4}, '$index': 'priority'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + ASSERT_OK(cursor->status()); + AssertCursorIDs(cursor.get(), {1, 2, 5}); + } + + Status s = db_->DropIndex("doesnt-exist"); + ASSERT_TRUE(!s.ok()); + ASSERT_OK(db_->DropIndex("priority")); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); } diff --git a/utilities/document/json_document.cc b/utilities/document/json_document.cc index 46da6f8ba..f3ccc3884 100644 --- a/utilities/document/json_document.cc +++ b/utilities/document/json_document.cc @@ -6,6 +6,8 @@ #include "utilities/json_document.h" +#define __STDC_FORMAT_MACROS +#include #include #include #include @@ -267,6 +269,58 @@ bool JSONDocument::operator==(const JSONDocument& rhs) const { return false; } +std::string JSONDocument::DebugString() const { + std::string ret; + switch (type_) { + case kNull: + ret = "null"; + break; + case kArray: + ret = "["; + for (size_t i = 0; i < data_.a.size(); ++i) { + if (i) { + ret += ", "; + } + ret += data_.a[i]->DebugString(); + } + ret += "]"; + break; + case kBool: + ret = data_.b ? "true" : "false"; + break; + case kDouble: { + char buf[100]; + snprintf(buf, sizeof(buf), "%lf", data_.d); + ret = buf; + break; + } + case kInt64: { + char buf[100]; + snprintf(buf, sizeof(buf), "%" PRIi64, data_.i); + ret = buf; + break; + } + case kObject: { + bool first = true; + ret = "{"; + for (const auto& iter : data_.o) { + ret += first ? "" : ", "; + first = false; + ret += iter.first + ": "; + ret += iter.second->DebugString(); + } + ret += "}"; + break; + } + case kString: + ret = "\"" + data_.s + "\""; + break; + default: + assert(false); + } + return ret; +} + JSONDocument::ItemsIteratorGenerator JSONDocument::Items() const { assert(type_ == kObject); return data_.o;