// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_LITE #include "rocksdb/utilities/document_db.h" #include "rocksdb/cache.h" #include "rocksdb/table.h" #include "rocksdb/filter_policy.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/slice.h" #include "rocksdb/utilities/json_document.h" #include "util/coding.h" #include "util/mutexlock.h" #include "port/port.h" namespace rocksdb { // IMPORTANT NOTE: Secondary index column families should be very small and // generally fit in memory. Assume that accessing secondary index column // families is much faster than accessing primary index (data heap) column // family. Accessing a key (i.e. checking for existance) from a column family in // RocksDB is not much faster than accessing both key and value since they are // kept together and loaded from storage together. namespace { // < 0 <=> lhs < rhs // == 0 <=> lhs == rhs // > 0 <=> lhs == rhs // TODO(icanadi) move this to JSONDocument? int DocumentCompare(const JSONDocument& lhs, const JSONDocument& rhs) { assert(lhs.IsObject() == false && rhs.IsObject() == false && lhs.type() == rhs.type()); switch (lhs.type()) { case JSONDocument::kNull: return 0; case JSONDocument::kBool: return static_cast(lhs.GetBool()) - static_cast(rhs.GetBool()); case JSONDocument::kDouble: { double res = lhs.GetDouble() - rhs.GetDouble(); return res == 0.0 ? 0 : (res < 0.0 ? -1 : 1); } case JSONDocument::kInt64: { int64_t res = lhs.GetInt64() - rhs.GetInt64(); return res == 0 ? 0 : (res < 0 ? -1 : 1); } case JSONDocument::kString: return Slice(lhs.GetString()).compare(Slice(rhs.GetString())); default: assert(false); } return 0; } } // namespace class Filter { public: // returns nullptr on parse failure static Filter* ParseFilter(const JSONDocument& filter); struct Interval { const JSONDocument* upper_bound; const JSONDocument* lower_bound; bool upper_inclusive; bool lower_inclusive; Interval() : upper_bound(nullptr), lower_bound(nullptr), upper_inclusive(false), lower_inclusive(false) {} Interval(const JSONDocument* ub, const JSONDocument* lb, bool ui, bool li) : upper_bound(ub), lower_bound(lb), upper_inclusive(ui), lower_inclusive(li) {} void UpdateUpperBound(const JSONDocument* ub, bool inclusive); void UpdateLowerBound(const JSONDocument* lb, bool inclusive); }; bool SatisfiesFilter(const JSONDocument& document) const; const Interval* GetInterval(const std::string& field) const; private: explicit Filter(const JSONDocument& filter) : filter_(filter) {} // copied from the parameter const JSONDocument filter_; // upper_bound and lower_bound point to JSONDocuments in filter_, so no need // to free them // constant after construction std::unordered_map intervals_; }; void Filter::Interval::UpdateUpperBound(const JSONDocument* ub, bool inclusive) { bool update = (upper_bound == nullptr); if (!update) { int cmp = DocumentCompare(*upper_bound, *ub); update = (cmp > 0) || (cmp == 0 && !inclusive); } if (update) { upper_bound = ub; upper_inclusive = inclusive; } } void Filter::Interval::UpdateLowerBound(const JSONDocument* lb, bool inclusive) { bool update = (lower_bound == nullptr); if (!update) { int cmp = DocumentCompare(*lower_bound, *lb); update = (cmp < 0) || (cmp == 0 && !inclusive); } if (update) { lower_bound = lb; lower_inclusive = inclusive; } } Filter* Filter::ParseFilter(const JSONDocument& filter) { if (filter.IsObject() == false) { return nullptr; } std::unique_ptr f(new Filter(filter)); for (const auto& items : f->filter_.Items()) { if (items.first.size() && items.first[0] == '$') { // fields starting with '$' are commands continue; } assert(f->intervals_.find(items.first) == f->intervals_.end()); if (items.second->IsObject()) { if (items.second->Count() == 0) { // uhm...? return nullptr; } Interval interval; for (const auto& condition : items.second->Items()) { if (condition.second->IsObject() || condition.second->IsArray()) { // comparison operators not defined on objects. invalid array return nullptr; } // comparison operators: if (condition.first == "$gt") { interval.UpdateLowerBound(condition.second, false); } else if (condition.first == "$gte") { interval.UpdateLowerBound(condition.second, true); } else if (condition.first == "$lt") { interval.UpdateUpperBound(condition.second, false); } else if (condition.first == "$lte") { interval.UpdateUpperBound(condition.second, true); } else { // TODO(icanadi) more logical operators return nullptr; } } f->intervals_.insert({items.first, interval}); } else { // equality f->intervals_.insert( {items.first, Interval(items.second, items.second, true, true)}); } } return f.release(); } const Filter::Interval* Filter::GetInterval(const std::string& field) const { auto itr = intervals_.find(field); if (itr == intervals_.end()) { return nullptr; } // we can do that since intervals_ is constant after construction return &itr->second; } bool Filter::SatisfiesFilter(const JSONDocument& document) const { for (const auto& interval : intervals_) { auto value = document.Get(interval.first); if (value == nullptr) { // doesn't have the value, doesn't satisfy the filter // (we don't support null queries yet) return false; } if (interval.second.upper_bound != nullptr) { if (value->type() != interval.second.upper_bound->type()) { // no cross-type queries yet // TODO(icanadi) do this at least for numbers! return false; } int cmp = DocumentCompare(*interval.second.upper_bound, *value); if (cmp < 0 || (cmp == 0 && interval.second.upper_inclusive == false)) { // bigger (or equal) than upper bound return false; } } if (interval.second.lower_bound != nullptr) { if (value->type() != interval.second.lower_bound->type()) { // no cross-type queries yet return false; } int cmp = DocumentCompare(*interval.second.lower_bound, *value); if (cmp > 0 || (cmp == 0 && interval.second.lower_inclusive == false)) { // smaller (or equal) than the lower bound return false; } } } return true; } class Index { public: Index() = default; virtual ~Index() {} virtual const char* Name() const = 0; // Functions that are executed during write time // --------------------------------------------- // GetIndexKey() generates a key that will be used to index document and // returns the key though the second std::string* parameter virtual void GetIndexKey(const JSONDocument& document, std::string* key) const = 0; // Keys generated with GetIndexKey() will be compared using this comparator. // It should be assumed that there will be a suffix added to the index key // according to IndexKey implementation virtual const Comparator* GetComparator() const = 0; // Functions that are executed during query time // --------------------------------------------- enum Direction { kForwards, kBackwards, }; // Returns true if this index can provide some optimization for satisfying // filter. False otherwise virtual bool UsefulIndex(const Filter& filter) const = 0; // For every filter (assuming UsefulIndex()) there is a continuous interval of // keys in the index that satisfy the index conditions. That interval can be // three things: // * [A, B] // * [A, infinity> // * <-infinity, B] // // Query engine that uses this Index for optimization will access the interval // by first calling Position() and then iterating in the Direction (returned // by Position()) while ShouldContinueLooking() is true. // * For [A, B] interval Position() will Seek() to A and return kForwards. // ShouldContinueLooking() will be true until the iterator value gets beyond B // -- then it will return false // * For [A, infinity> Position() will Seek() to A and return kForwards. // ShouldContinueLooking() will always return true // * For <-infinity, B] Position() will Seek() to B and return kBackwards. // ShouldContinueLooking() will always return true (given that iterator is // advanced by calling Prev()) virtual Direction Position(const Filter& filter, Iterator* iterator) const = 0; virtual bool ShouldContinueLooking(const Filter& filter, const Slice& secondary_key, Direction direction) const = 0; // Static function that is executed when Index is created // --------------------------------------------- // Create Index from user-supplied description. Return nullptr on parse // failure. static Index* CreateIndexFromDescription(const JSONDocument& description, const std::string& name); private: // No copying allowed Index(const Index&); void operator=(const Index&); }; // Encoding helper function namespace { std::string InternalSecondaryIndexName(const std::string& user_name) { return "index_" + user_name; } // Don't change these, they are persisted in secondary indexes enum JSONPrimitivesEncoding : char { kNull = 0x1, kBool = 0x2, kDouble = 0x3, kInt64 = 0x4, kString = 0x5, }; // encodes simple JSON members (meaning string, integer, etc) // the end result of this will be lexicographically compared to each other bool EncodeJSONPrimitive(const JSONDocument& json, std::string* dst) { // TODO(icanadi) revise this at some point, have a custom comparator switch (json.type()) { case JSONDocument::kNull: dst->push_back(kNull); break; case JSONDocument::kBool: dst->push_back(kBool); dst->push_back(static_cast(json.GetBool())); break; case JSONDocument::kDouble: dst->push_back(kDouble); PutFixed64(dst, static_cast(json.GetDouble())); break; case JSONDocument::kInt64: dst->push_back(kInt64); // TODO(icanadi) oops, this will not work correctly for negative numbers PutFixed64(dst, static_cast(json.GetInt64())); break; case JSONDocument::kString: dst->push_back(kString); dst->append(json.GetString()); break; default: return false; } return true; } } // namespace // format of the secondary key is: // class IndexKey { public: IndexKey() : ok_(false) {} explicit IndexKey(const Slice& slice) { if (slice.size() < sizeof(uint32_t)) { ok_ = false; return; } uint32_t primary_key_offset = DecodeFixed32(slice.data() + slice.size() - sizeof(uint32_t)); if (primary_key_offset >= slice.size() - sizeof(uint32_t)) { ok_ = false; return; } parts_[0] = Slice(slice.data(), primary_key_offset); parts_[1] = Slice(slice.data() + primary_key_offset, slice.size() - primary_key_offset - sizeof(uint32_t)); ok_ = true; } IndexKey(const Slice& secondary_key, const Slice& primary_key) : ok_(true) { parts_[0] = secondary_key; parts_[1] = primary_key; } SliceParts GetSliceParts() { uint32_t primary_key_offset = static_cast(parts_[0].size()); EncodeFixed32(primary_key_offset_buf_, primary_key_offset); parts_[2] = Slice(primary_key_offset_buf_, sizeof(uint32_t)); return SliceParts(parts_, 3); } const Slice& GetPrimaryKey() const { return parts_[1]; } const Slice& GetSecondaryKey() const { return parts_[0]; } bool ok() const { return ok_; } private: bool ok_; // 0 -- secondary key // 1 -- primary key // 2 -- primary key offset Slice parts_[3]; char primary_key_offset_buf_[sizeof(uint32_t)]; }; class SimpleSortedIndex : public Index { public: SimpleSortedIndex(const std::string& field, const std::string& name) : field_(field), name_(name) {} virtual const char* Name() const override { return name_.c_str(); } virtual void GetIndexKey(const JSONDocument& document, std::string* key) const override { auto value = document.Get(field_); if (value == nullptr) { // null if (!EncodeJSONPrimitive(JSONDocument(JSONDocument::kNull), key)) { assert(false); } } if (!EncodeJSONPrimitive(*value, key)) { assert(false); } } virtual const Comparator* GetComparator() const override { return BytewiseComparator(); } virtual bool UsefulIndex(const Filter& filter) const { return filter.GetInterval(field_) != nullptr; } // REQUIRES: UsefulIndex(filter) == true virtual Direction Position(const Filter& filter, Iterator* iterator) const { auto interval = filter.GetInterval(field_); assert(interval != nullptr); // because index is useful Direction direction; std::string op; const JSONDocument* limit; if (interval->lower_bound != nullptr) { limit = interval->lower_bound; direction = kForwards; } else { limit = interval->upper_bound; direction = kBackwards; } std::string encoded_limit; if (!EncodeJSONPrimitive(*limit, &encoded_limit)) { assert(false); } iterator->Seek(Slice(encoded_limit)); return direction; } // REQUIRES: UsefulIndex(filter) == true virtual bool ShouldContinueLooking(const Filter& filter, const Slice& secondary_key, Index::Direction direction) const { auto interval = filter.GetInterval(field_); assert(interval != nullptr); // because index is useful if (direction == kForwards) { if (interval->upper_bound == nullptr) { // continue looking, no upper bound return true; } std::string encoded_upper_bound; if (!EncodeJSONPrimitive(*interval->upper_bound, &encoded_upper_bound)) { // uhm...? // TODO(icanadi) store encoded upper and lower bounds in Filter*? assert(false); } // TODO(icanadi) we need to somehow decode this and use DocumentCompare() int compare = secondary_key.compare(Slice(encoded_upper_bound)); // if (current key is bigger than upper bound) OR (current key is equal to // upper bound, but inclusive is false) THEN stop looking. otherwise, // continue return (compare > 0 || (compare == 0 && interval->upper_inclusive == false)) ? false : true; } else { assert(direction == kBackwards); if (interval->lower_bound == nullptr) { // continue looking, no lower bound return true; } std::string encoded_lower_bound; if (!EncodeJSONPrimitive(*interval->lower_bound, &encoded_lower_bound)) { // uhm...? // TODO(icanadi) store encoded upper and lower bounds in Filter*? assert(false); } // TODO(icanadi) we need to somehow decode this and use DocumentCompare() int compare = secondary_key.compare(Slice(encoded_lower_bound)); // if (current key is smaller than lower bound) OR (current key is equal // to lower bound, but inclusive is false) THEN stop looking. otherwise, // continue return (compare < 0 || (compare == 0 && interval->lower_inclusive == false)) ? false : true; } assert(false); // this is here just so compiler doesn't complain return false; } private: std::string field_; std::string name_; }; Index* Index::CreateIndexFromDescription(const JSONDocument& description, const std::string& name) { if (!description.IsObject() || description.Count() != 1) { // not supported yet return nullptr; } const auto& field = *description.Items().begin(); if (field.second->IsInt64() == false || field.second->GetInt64() != 1) { // not supported yet return nullptr; } return new SimpleSortedIndex(field.first, name); } class CursorWithFilterIndexed : public Cursor { public: CursorWithFilterIndexed(Iterator* primary_index_iter, Iterator* secondary_index_iter, const Index* index, const Filter* filter) : primary_index_iter_(primary_index_iter), secondary_index_iter_(secondary_index_iter), index_(index), filter_(filter), valid_(true), current_json_document_(nullptr) { assert(filter_.get() != nullptr); direction_ = index->Position(*filter_.get(), secondary_index_iter_.get()); UpdateIndexKey(); AdvanceUntilSatisfies(); } virtual bool Valid() const override { return valid_ && secondary_index_iter_->Valid(); } virtual void Next() { assert(Valid()); Advance(); AdvanceUntilSatisfies(); } // temporary object. copy it if you want to use it virtual const JSONDocument& document() const { assert(Valid()); return *current_json_document_; } virtual Status status() const { if (!status_.ok()) { return status_; } if (!primary_index_iter_->status().ok()) { return primary_index_iter_->status(); } return secondary_index_iter_->status(); } private: void Advance() { if (direction_ == Index::kForwards) { secondary_index_iter_->Next(); } else { secondary_index_iter_->Prev(); } UpdateIndexKey(); } void AdvanceUntilSatisfies() { bool found = false; while (secondary_index_iter_->Valid() && index_->ShouldContinueLooking( *filter_.get(), index_key_.GetSecondaryKey(), direction_)) { if (!UpdateJSONDocument()) { // corruption happened return; } if (filter_->SatisfiesFilter(*current_json_document_)) { // we found satisfied! found = true; break; } else { // doesn't satisfy :( Advance(); } } if (!found) { valid_ = false; } } bool UpdateJSONDocument() { assert(secondary_index_iter_->Valid()); primary_index_iter_->Seek(index_key_.GetPrimaryKey()); if (!primary_index_iter_->Valid()) { status_ = Status::Corruption( "Inconsistency between primary and secondary index"); valid_ = false; return false; } current_json_document_.reset( JSONDocument::Deserialize(primary_index_iter_->value())); if (current_json_document_.get() == nullptr) { status_ = Status::Corruption("JSON deserialization failed"); valid_ = false; return false; } return true; } void UpdateIndexKey() { if (secondary_index_iter_->Valid()) { index_key_ = IndexKey(secondary_index_iter_->key()); if (!index_key_.ok()) { status_ = Status::Corruption("Invalid index key"); valid_ = false; } } } std::unique_ptr primary_index_iter_; std::unique_ptr secondary_index_iter_; // we don't own index_ const Index* index_; Index::Direction direction_; std::unique_ptr filter_; bool valid_; IndexKey index_key_; std::unique_ptr current_json_document_; Status status_; }; class CursorFromIterator : public Cursor { public: explicit CursorFromIterator(Iterator* iter) : iter_(iter), current_json_document_(nullptr) { iter_->SeekToFirst(); UpdateCurrentJSON(); } virtual bool Valid() const override { return status_.ok() && iter_->Valid(); } virtual void Next() override { iter_->Next(); UpdateCurrentJSON(); } virtual const JSONDocument& document() const override { assert(Valid()); return *current_json_document_; }; virtual Status status() const override { if (!status_.ok()) { return status_; } return iter_->status(); } // not part of public Cursor interface Slice key() const { return iter_->key(); } private: void UpdateCurrentJSON() { if (Valid()) { current_json_document_.reset(JSONDocument::Deserialize(iter_->value())); if (current_json_document_.get() == nullptr) { status_ = Status::Corruption("JSON deserialization failed"); } } } Status status_; std::unique_ptr iter_; std::unique_ptr current_json_document_; }; class CursorWithFilter : public Cursor { public: CursorWithFilter(Cursor* base_cursor, const Filter* filter) : base_cursor_(base_cursor), filter_(filter) { assert(filter_.get() != nullptr); SeekToNextSatisfies(); } virtual bool Valid() const override { return base_cursor_->Valid(); } virtual void Next() override { assert(Valid()); base_cursor_->Next(); SeekToNextSatisfies(); } virtual const JSONDocument& document() const override { assert(Valid()); return base_cursor_->document(); } virtual Status status() const override { return base_cursor_->status(); } private: void SeekToNextSatisfies() { for (; base_cursor_->Valid(); base_cursor_->Next()) { if (filter_->SatisfiesFilter(base_cursor_->document())) { break; } } } std::unique_ptr base_cursor_; std::unique_ptr filter_; }; class CursorError : public Cursor { public: explicit CursorError(Status s) : s_(s) { assert(!s.ok()); } virtual Status status() const override { return s_; } virtual bool Valid() const override { return false; } virtual void Next() override {} virtual const JSONDocument& document() const override { assert(false); // compiler complains otherwise return trash_; } private: Status s_; JSONDocument trash_; }; class DocumentDBImpl : public DocumentDB { public: DocumentDBImpl( DB* db, ColumnFamilyHandle* primary_key_column_family, const std::vector>& indexes, const Options& rocksdb_options) : DocumentDB(db), primary_key_column_family_(primary_key_column_family), rocksdb_options_(rocksdb_options) { for (const auto& index : indexes) { name_to_index_.insert( {index.first->Name(), IndexColumnFamily(index.first, index.second)}); } } ~DocumentDBImpl() { for (auto& iter : name_to_index_) { delete iter.second.index; delete iter.second.column_family; } delete primary_key_column_family_; } virtual Status CreateIndex(const WriteOptions& write_options, const IndexDescriptor& index) { auto index_obj = Index::CreateIndexFromDescription(*index.description, index.name); if (index_obj == nullptr) { return Status::InvalidArgument("Failed parsing index description"); } ColumnFamilyHandle* cf_handle; Status s = CreateColumnFamily(ColumnFamilyOptions(rocksdb_options_), InternalSecondaryIndexName(index.name), &cf_handle); if (!s.ok()) { delete index_obj; return s; } MutexLock l(&write_mutex_); std::unique_ptr cursor(new CursorFromIterator( DocumentDB::NewIterator(ReadOptions(), primary_key_column_family_))); WriteBatch batch; for (; cursor->Valid(); cursor->Next()) { std::string secondary_index_key; index_obj->GetIndexKey(cursor->document(), &secondary_index_key); IndexKey index_key(Slice(secondary_index_key), cursor->key()); batch.Put(cf_handle, index_key.GetSliceParts(), SliceParts()); } if (!cursor->status().ok()) { delete index_obj; return cursor->status(); } { MutexLock l_nti(&name_to_index_mutex_); name_to_index_.insert( {index.name, IndexColumnFamily(index_obj, cf_handle)}); } return DocumentDB::Write(write_options, &batch); } virtual Status DropIndex(const std::string& name) { MutexLock l(&write_mutex_); auto index_iter = name_to_index_.find(name); if (index_iter == name_to_index_.end()) { return Status::InvalidArgument("No such index"); } Status s = DropColumnFamily(index_iter->second.column_family); if (!s.ok()) { return s; } delete index_iter->second.index; delete index_iter->second.column_family; // remove from name_to_index_ { MutexLock l_nti(&name_to_index_mutex_); name_to_index_.erase(index_iter); } return Status::OK(); } virtual Status Insert(const WriteOptions& options, const JSONDocument& document) { WriteBatch batch; if (!document.IsObject()) { return Status::InvalidArgument("Document not an object"); } auto primary_key = document.Get(kPrimaryKey); if (primary_key == nullptr || primary_key->IsNull() || (!primary_key->IsString() && !primary_key->IsInt64())) { return Status::InvalidArgument( "No primary key or primary key format error"); } std::string encoded_document; document.Serialize(&encoded_document); std::string primary_key_encoded; if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { // previous call should be guaranteed to pass because of all primary_key // conditions checked before assert(false); } Slice primary_key_slice(primary_key_encoded); // Lock now, since we're starting DB operations MutexLock l(&write_mutex_); // check if there is already a document with the same primary key std::string value; Status s = DocumentDB::Get(ReadOptions(), primary_key_column_family_, primary_key_slice, &value); if (!s.IsNotFound()) { return s.ok() ? Status::InvalidArgument("Duplicate primary key!") : s; } batch.Put(primary_key_column_family_, primary_key_slice, encoded_document); for (const auto& iter : name_to_index_) { std::string secondary_index_key; iter.second.index->GetIndexKey(document, &secondary_index_key); IndexKey index_key(Slice(secondary_index_key), primary_key_slice); batch.Put(iter.second.column_family, index_key.GetSliceParts(), SliceParts()); } return DocumentDB::Write(options, &batch); } virtual Status Remove(const ReadOptions& read_options, const WriteOptions& write_options, const JSONDocument& query) override { MutexLock l(&write_mutex_); std::unique_ptr cursor( ConstructFilterCursor(read_options, nullptr, query)); WriteBatch batch; for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) { const auto& document = cursor->document(); if (!document.IsObject()) { return Status::Corruption("Document corruption"); } auto primary_key = document.Get(kPrimaryKey); if (primary_key == nullptr || primary_key->IsNull() || (!primary_key->IsString() && !primary_key->IsInt64())) { return Status::Corruption("Document corruption"); } // TODO(icanadi) Instead of doing this, just get primary key encoding from // cursor, as it already has this information std::string primary_key_encoded; if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { // previous call should be guaranteed to pass because of all primary_key // conditions checked before assert(false); } Slice primary_key_slice(primary_key_encoded); batch.Delete(primary_key_column_family_, primary_key_slice); for (const auto& iter : name_to_index_) { std::string secondary_index_key; iter.second.index->GetIndexKey(document, &secondary_index_key); IndexKey index_key(Slice(secondary_index_key), primary_key_slice); batch.Delete(iter.second.column_family, index_key.GetSliceParts()); } } if (!cursor->status().ok()) { return cursor->status(); } return DocumentDB::Write(write_options, &batch); } virtual Status Update(const ReadOptions& read_options, const WriteOptions& write_options, const JSONDocument& filter, const JSONDocument& updates) { MutexLock l(&write_mutex_); std::unique_ptr cursor( ConstructFilterCursor(read_options, nullptr, filter)); WriteBatch batch; for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) { const auto& old_document = cursor->document(); JSONDocument new_document(old_document); if (!new_document.IsObject()) { return Status::Corruption("Document corruption"); } // TODO(icanadi) Make this nicer, something like class Filter for (const auto& update : updates.Items()) { if (update.first == "$set") { for (const auto& itr : update.second->Items()) { if (itr.first == kPrimaryKey) { return Status::NotSupported("Please don't change primary key"); } new_document.Set(itr.first, *itr.second); } } else { // TODO(icanadi) more commands return Status::InvalidArgument("Can't understand update command"); } } // TODO(icanadi) reuse some of this code auto primary_key = new_document.Get(kPrimaryKey); if (primary_key == nullptr || primary_key->IsNull() || (!primary_key->IsString() && !primary_key->IsInt64())) { // This will happen when document on storage doesn't have primary key, // since we don't support any update operations on primary key. That's // why this is corruption error return Status::Corruption("Corrupted document -- primary key missing"); } std::string encoded_document; new_document.Serialize(&encoded_document); std::string primary_key_encoded; if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { // previous call should be guaranteed to pass because of all primary_key // conditions checked before assert(false); } Slice primary_key_slice(primary_key_encoded); batch.Put(primary_key_column_family_, primary_key_slice, encoded_document); for (const auto& iter : name_to_index_) { std::string old_key, new_key; iter.second.index->GetIndexKey(old_document, &old_key); iter.second.index->GetIndexKey(new_document, &new_key); if (old_key == new_key) { // don't need to update this secondary index continue; } IndexKey old_index_key(Slice(old_key), primary_key_slice); IndexKey new_index_key(Slice(new_key), primary_key_slice); batch.Delete(iter.second.column_family, old_index_key.GetSliceParts()); batch.Put(iter.second.column_family, new_index_key.GetSliceParts(), SliceParts()); } } if (!cursor->status().ok()) { return cursor->status(); } return DocumentDB::Write(write_options, &batch); } virtual Cursor* Query(const ReadOptions& read_options, const JSONDocument& query) override { Cursor* cursor = nullptr; if (!query.IsArray()) { return new CursorError( Status::InvalidArgument("Query has to be an array")); } // TODO(icanadi) support index "_id" for (size_t i = 0; i < query.Count(); ++i) { const auto& command_doc = query[i]; if (command_doc.Count() != 1) { // there can be only one key-value pair in each of array elements. // key is the command and value are the params delete cursor; return new CursorError(Status::InvalidArgument("Invalid query")); } const auto& command = *command_doc.Items().begin(); if (command.first == "$filter") { cursor = ConstructFilterCursor(read_options, cursor, *command.second); } else { // only filter is supported for now delete cursor; return new CursorError(Status::InvalidArgument("Invalid query")); } } if (cursor == nullptr) { cursor = new CursorFromIterator( DocumentDB::NewIterator(read_options, primary_key_column_family_)); } return cursor; } // RocksDB functions virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) override { return Status::NotSupported(""); } virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) override { return Status::NotSupported(""); } virtual Status Write(const WriteOptions& options, WriteBatch* updates) override { return Status::NotSupported(""); } virtual Iterator* NewIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) override { return nullptr; } virtual Iterator* NewIterator(const ReadOptions& options) override { return nullptr; } private: Cursor* ConstructFilterCursor(ReadOptions read_options, Cursor* cursor, const JSONDocument& query) { std::unique_ptr filter(Filter::ParseFilter(query)); if (filter.get() == nullptr) { return new CursorError(Status::InvalidArgument("Invalid query")); } IndexColumnFamily tmp_storage(nullptr, nullptr); if (cursor == nullptr) { auto index_name = query.Get("$index"); IndexColumnFamily* index_column_family = nullptr; if (index_name != nullptr && index_name->IsString()) { { MutexLock l(&name_to_index_mutex_); auto index_iter = name_to_index_.find(index_name->GetString()); if (index_iter != name_to_index_.end()) { tmp_storage = index_iter->second; index_column_family = &tmp_storage; } else { return new CursorError( Status::InvalidArgument("Index does not exist")); } } } if (index_column_family != nullptr && index_column_family->index->UsefulIndex(*filter.get())) { std::vector iterators; Status s = DocumentDB::NewIterators( read_options, {primary_key_column_family_, index_column_family->column_family}, &iterators); if (!s.ok()) { delete cursor; return new CursorError(s); } assert(iterators.size() == 2); return new CursorWithFilterIndexed(iterators[0], iterators[1], index_column_family->index, filter.release()); } else { return new CursorWithFilter( new CursorFromIterator(DocumentDB::NewIterator( read_options, primary_key_column_family_)), filter.release()); } } else { return new CursorWithFilter(cursor, filter.release()); } assert(false); return nullptr; } // currently, we lock and serialize all writes to rocksdb. reads are not // locked and always get consistent view of the database. we should optimize // locking in the future port::Mutex write_mutex_; port::Mutex name_to_index_mutex_; const char* kPrimaryKey = "_id"; struct IndexColumnFamily { IndexColumnFamily(Index* _index, ColumnFamilyHandle* _column_family) : index(_index), column_family(_column_family) {} Index* index; ColumnFamilyHandle* column_family; }; // name_to_index_ protected: // 1) when writing -- 1. lock write_mutex_, 2. lock name_to_index_mutex_ // 2) when reading -- lock name_to_index_mutex_ OR write_mutex_ std::unordered_map name_to_index_; ColumnFamilyHandle* primary_key_column_family_; Options rocksdb_options_; }; namespace { Options GetRocksDBOptionsFromOptions(const DocumentDBOptions& options) { Options rocksdb_options; rocksdb_options.max_background_compactions = options.background_threads - 1; rocksdb_options.max_background_flushes = 1; rocksdb_options.write_buffer_size = options.memtable_size; rocksdb_options.max_write_buffer_number = 6; BlockBasedTableOptions table_options; table_options.block_cache = NewLRUCache(options.cache_size); rocksdb_options.table_factory.reset(NewBlockBasedTableFactory(table_options)); return rocksdb_options; } } // namespace Status DocumentDB::Open(const DocumentDBOptions& options, const std::string& name, const std::vector& indexes, DocumentDB** db, bool read_only) { Options rocksdb_options = GetRocksDBOptionsFromOptions(options); rocksdb_options.create_if_missing = true; std::vector column_families; column_families.push_back(ColumnFamilyDescriptor( kDefaultColumnFamilyName, ColumnFamilyOptions(rocksdb_options))); for (const auto& index : indexes) { column_families.emplace_back(InternalSecondaryIndexName(index.name), ColumnFamilyOptions(rocksdb_options)); } std::vector handles; DB* base_db; Status s; if (read_only) { s = DB::OpenForReadOnly(DBOptions(rocksdb_options), name, column_families, &handles, &base_db); } else { s = DB::Open(DBOptions(rocksdb_options), name, column_families, &handles, &base_db); } if (!s.ok()) { return s; } std::vector> index_cf(indexes.size()); assert(handles.size() == indexes.size() + 1); for (size_t i = 0; i < indexes.size(); ++i) { auto index = Index::CreateIndexFromDescription(*indexes[i].description, indexes[i].name); index_cf[i] = {index, handles[i + 1]}; } *db = new DocumentDBImpl(base_db, handles[0], index_cf, rocksdb_options); return Status::OK(); } } // namespace rocksdb #endif // ROCKSDB_LITE