API to fetch from both a WriteBatchWithIndex and the db

Summary:
Added a couple functions to WriteBatchWithIndex to make it easier to query the value of a key including reading pending writes from a batch.  (This is needed for transactions).

I created write_batch_with_index_internal.h to use to store an internal-only helper function since there wasn't a good place in the existing class hierarchy to store this function (and it didn't seem right to stick this function inside WriteBatchInternal::Rep).

Since I needed to access the WriteBatchEntryComparator, I moved some helper classes from write_batch_with_index.cc into write_batch_with_index_internal.h/.cc.  WriteBatchIndexEntry, ReadableWriteBatch, and WriteBatchEntryComparator are all unchanged (just moved to a different file(s)).

Test Plan: Added new unit tests.

Reviewers: rven, yhchiang, sdong, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D38037
main
agiardullo 10 years ago
parent 3996fff8a1
commit 711465ccec
  1. 2
      db/db_impl.cc
  2. 2
      db/db_impl.h
  3. 3
      db/db_test.cc
  4. 1
      db/merge_context.h
  5. 2
      include/rocksdb/db.h
  6. 5
      include/rocksdb/utilities/stackable_db.h
  7. 36
      include/rocksdb/utilities/write_batch_with_index.h
  8. 1
      src.mk
  9. 6
      utilities/spatialdb/spatial_db.cc
  10. 225
      utilities/write_batch_with_index/write_batch_with_index.cc
  11. 242
      utilities/write_batch_with_index/write_batch_with_index_internal.cc
  12. 96
      utilities/write_batch_with_index/write_batch_with_index_internal.h
  13. 275
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -3524,6 +3524,8 @@ const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
return *cfh->cfd()->options(); return *cfh->cfd()->options();
} }
const DBOptions& DBImpl::GetDBOptions() const { return db_options_; }
bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) { const Slice& property, std::string* value) {
bool is_int_property = false; bool is_int_property = false;

@ -150,6 +150,8 @@ class DBImpl : public DB {
using DB::GetOptions; using DB::GetOptions;
virtual const Options& GetOptions( virtual const Options& GetOptions(
ColumnFamilyHandle* column_family) const override; ColumnFamilyHandle* column_family) const override;
using DB::GetDBOptions;
virtual const DBOptions& GetDBOptions() const override;
using DB::Flush; using DB::Flush;
virtual Status Flush(const FlushOptions& options, virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) override; ColumnFamilyHandle* column_family) override;

@ -8753,6 +8753,9 @@ class ModelDB: public DB {
return options_; return options_;
} }
using DB::GetDBOptions;
virtual const DBOptions& GetDBOptions() const override { return options_; }
using DB::Flush; using DB::Flush;
virtual Status Flush(const rocksdb::FlushOptions& options, virtual Status Flush(const rocksdb::FlushOptions& options,
ColumnFamilyHandle* column_family) override { ColumnFamilyHandle* column_family) override {

@ -66,4 +66,3 @@ private:
}; };
} // namespace rocksdb } // namespace rocksdb

@ -495,6 +495,8 @@ class DB {
return GetOptions(DefaultColumnFamily()); return GetOptions(DefaultColumnFamily());
} }
virtual const DBOptions& GetDBOptions() const = 0;
// Flush all mem-table data. // Flush all mem-table data.
virtual Status Flush(const FlushOptions& options, virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) = 0; ColumnFamilyHandle* column_family) = 0;

@ -175,6 +175,11 @@ class StackableDB : public DB {
return db_->GetOptions(column_family); return db_->GetOptions(column_family);
} }
using DB::GetDBOptions;
virtual const DBOptions& GetDBOptions() const override {
return db_->GetDBOptions();
}
using DB::Flush; using DB::Flush;
virtual Status Flush(const FlushOptions& fopts, virtual Status Flush(const FlushOptions& fopts,
ColumnFamilyHandle* column_family) override { ColumnFamilyHandle* column_family) override {

@ -11,6 +11,8 @@
#pragma once #pragma once
#include <string>
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
@ -22,6 +24,9 @@ namespace rocksdb {
class ColumnFamilyHandle; class ColumnFamilyHandle;
class Comparator; class Comparator;
class DB;
struct ReadOptions;
struct DBOptions;
enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord }; enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord };
@ -118,6 +123,37 @@ class WriteBatchWithIndex : public WriteBatchBase {
// default column family // default column family
Iterator* NewIteratorWithBase(Iterator* base_iterator); Iterator* NewIteratorWithBase(Iterator* base_iterator);
// Similar to DB::Get() but will only read the key from this batch.
// If the batch does not have enough data to resolve Merge operations,
// MergeInProgress status may be returned.
Status GetFromBatch(ColumnFamilyHandle* column_family,
const DBOptions& options, const Slice& key,
std::string* value);
// Similar to previous function but does not require a column_family.
// Note: An InvalidArgument status will be returned if there are any Merge
// operators for this key.
Status GetFromBatch(const DBOptions& options, const Slice& key,
std::string* value) {
return GetFromBatch(nullptr, options, key, value);
}
// Similar to DB::Get() but will also read writes from this batch.
//
// This function will query both this batch and the DB and then merge
// the results using the DB's merge operator (if the batch contains any
// merge requests).
//
// Setting read_options.snapshot will affect what is read from the DB
// but will NOT change which keys are read from the batch (the keys in
// this batch do not yet belong to any snapshot and will be fetched
// regardless).
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
const Slice& key, std::string* value);
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value);
private: private:
struct Rep; struct Rep;
Rep* rep; Rep* rep;

@ -110,6 +110,7 @@ LIB_SOURCES = \
utilities/spatialdb/spatial_db.cc \ utilities/spatialdb/spatial_db.cc \
utilities/ttl/db_ttl_impl.cc \ utilities/ttl/db_ttl_impl.cc \
utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \
utilities/write_batch_with_index/write_batch_with_index_internal.cc \
util/event_logger.cc \ util/event_logger.cc \
util/ldb_cmd.cc \ util/ldb_cmd.cc \
util/ldb_tool.cc \ util/ldb_tool.cc \

@ -658,7 +658,7 @@ class SpatialDBImpl : public SpatialDB {
}; };
namespace { namespace {
DBOptions GetDBOptions(const SpatialDBOptions& options) { DBOptions GetDBOptionsFromSpatialDBOptions(const SpatialDBOptions& options) {
DBOptions db_options; DBOptions db_options;
db_options.max_open_files = 50000; db_options.max_open_files = 50000;
db_options.max_background_compactions = 3 * options.num_threads / 4; db_options.max_background_compactions = 3 * options.num_threads / 4;
@ -760,7 +760,7 @@ class MetadataStorage {
Status SpatialDB::Create( Status SpatialDB::Create(
const SpatialDBOptions& options, const std::string& name, const SpatialDBOptions& options, const std::string& name,
const std::vector<SpatialIndexOptions>& spatial_indexes) { const std::vector<SpatialIndexOptions>& spatial_indexes) {
DBOptions db_options = GetDBOptions(options); DBOptions db_options = GetDBOptionsFromSpatialDBOptions(options);
db_options.create_if_missing = true; db_options.create_if_missing = true;
db_options.create_missing_column_families = true; db_options.create_missing_column_families = true;
db_options.error_if_exists = true; db_options.error_if_exists = true;
@ -805,7 +805,7 @@ Status SpatialDB::Create(
Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name, Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name,
SpatialDB** db, bool read_only) { SpatialDB** db, bool read_only) {
DBOptions db_options = GetDBOptions(options); DBOptions db_options = GetDBOptionsFromSpatialDBOptions(options);
auto block_cache = NewLRUCache(options.cache_size); auto block_cache = NewLRUCache(options.cache_size);
ColumnFamilyOptions column_family_options = ColumnFamilyOptions column_family_options =
GetColumnFamilyOptions(options, block_cache); GetColumnFamilyOptions(options, block_cache);

@ -10,8 +10,11 @@
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/skiplist.h" #include "db/skiplist.h"
#include "util/arena.h" #include "util/arena.h"
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace rocksdb { namespace rocksdb {
@ -287,60 +290,6 @@ class BaseDeltaIterator : public Iterator {
const Comparator* comparator_; // not owned const Comparator* comparator_; // not owned
}; };
class ReadableWriteBatch : public WriteBatch {
public:
explicit ReadableWriteBatch(size_t reserved_bytes = 0)
: WriteBatch(reserved_bytes) {}
// Retrieve some information from a write entry in the write batch, given
// the start offset of the write entry.
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
Slice* value, Slice* blob) const;
};
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
struct WriteBatchIndexEntry {
WriteBatchIndexEntry(size_t o, uint32_t c)
: offset(o), column_family(c), search_key(nullptr) {}
WriteBatchIndexEntry(const Slice* sk, uint32_t c)
: offset(0), column_family(c), search_key(sk) {}
// If this flag appears in the offset, it indicates a key that is smaller
// than any other entry for the same column family
static const size_t kFlagMin = std::numeric_limits<size_t>::max();
size_t offset; // offset of an entry in write batch's string buffer.
uint32_t column_family; // column family of the entry
const Slice* search_key; // if not null, instead of reading keys from
// write batch, use it to compare. This is used
// for lookup key.
};
class WriteBatchEntryComparator {
public:
WriteBatchEntryComparator(const Comparator* _default_comparator,
const ReadableWriteBatch* write_batch)
: default_comparator_(_default_comparator), write_batch_(write_batch) {}
// Compare a and b. Return a negative value if a is less than b, 0 if they
// are equal, and a positive value if a is greater than b
int operator()(const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const;
int CompareKey(uint32_t column_family, const Slice& key1,
const Slice& key2) const;
void SetComparatorForCF(uint32_t column_family_id,
const Comparator* comparator) {
cf_comparator_map_[column_family_id] = comparator;
}
const Comparator* default_comparator() { return default_comparator_; }
private:
const Comparator* default_comparator_;
std::unordered_map<uint32_t, const Comparator*> cf_comparator_map_;
const ReadableWriteBatch* write_batch_;
};
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&> typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
WriteBatchEntrySkipList; WriteBatchEntrySkipList;
@ -535,45 +484,6 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
last_entry_offset = 0; last_entry_offset = 0;
} }
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
Slice* value,
Slice* blob) const {
if (type == nullptr || Key == nullptr || value == nullptr ||
blob == nullptr) {
return Status::InvalidArgument("Output parameters cannot be null");
}
if (data_offset >= GetDataSize()) {
return Status::InvalidArgument("data offset exceed write batch size");
}
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
char tag;
uint32_t column_family;
Status s =
ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob);
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
*type = kPutRecord;
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
*type = kDeleteRecord;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
*type = kMergeRecord;
break;
case kTypeLogData:
*type = kLogDataRecord;
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
return Status::OK();
}
WriteBatchWithIndex::WriteBatchWithIndex( WriteBatchWithIndex::WriteBatchWithIndex(
const Comparator* default_index_comparator, size_t reserved_bytes, const Comparator* default_index_comparator, size_t reserved_bytes,
@ -659,66 +569,97 @@ void WriteBatchWithIndex::Delete(const Slice& key) {
void WriteBatchWithIndex::Clear() { rep->Clear(); } void WriteBatchWithIndex::Clear() { rep->Clear(); }
int WriteBatchEntryComparator::operator()( Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
const WriteBatchIndexEntry* entry1, const DBOptions& options,
const WriteBatchIndexEntry* entry2) const { const Slice& key, std::string* value) {
if (entry1->column_family > entry2->column_family) { Status s;
return 1; MergeContext merge_context;
} else if (entry1->column_family < entry2->column_family) {
return -1; WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::GetFromBatch(options, this, column_family,
key, &merge_context,
&rep->comparator, value, &s);
switch (result) {
case WriteBatchWithIndexInternal::Result::kFound:
case WriteBatchWithIndexInternal::Result::kError:
return s;
case WriteBatchWithIndexInternal::Result::kDeleted:
case WriteBatchWithIndexInternal::Result::kNotFound:
return Status::NotFound();
case WriteBatchWithIndexInternal::Result::kMergeInProgress:
return Status::MergeInProgress("");
default:
assert(false);
} }
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) { return s;
return -1; }
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
return 1;
}
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
const ReadOptions& read_options,
const Slice& key,
std::string* value) {
return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
value);
}
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key,
std::string* value) {
Status s; Status s;
Slice key1, key2; MergeContext merge_context;
if (entry1->search_key == nullptr) { const DBOptions& options = db->GetDBOptions();
Slice value, blob;
WriteType write_type; std::string batch_value;
s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1, WriteBatchWithIndexInternal::Result result =
&value, &blob); WriteBatchWithIndexInternal::GetFromBatch(
if (!s.ok()) { options, this, column_family, key, &merge_context, &rep->comparator,
return 1; &batch_value, &s);
}
} else { if (result == WriteBatchWithIndexInternal::Result::kFound) {
key1 = *(entry1->search_key); value->assign(batch_value.data(), batch_value.size());
return s;
} }
if (entry2->search_key == nullptr) { if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
Slice value, blob; return Status::NotFound();
WriteType write_type;
s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2,
&value, &blob);
if (!s.ok()) {
return -1;
} }
} else { if (result == WriteBatchWithIndexInternal::Result::kError) {
key2 = *(entry2->search_key); return s;
} }
assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
result == WriteBatchWithIndexInternal::Result::kNotFound);
int cmp = CompareKey(entry1->column_family, key1, key2); // Did not find key in batch OR could not resolve Merges. Try DB.
if (cmp != 0) { s = db->Get(read_options, column_family, key, value);
return cmp;
} else if (entry1->offset > entry2->offset) { if (s.ok() || s.IsNotFound()) { // DB Get Suceeded
return 1; if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) {
} else if (entry1->offset < entry2->offset) { // Merge result from DB with merges in Batch
return -1; auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
const MergeOperator* merge_operator =
cfh->cfd()->ioptions()->merge_operator;
Statistics* statistics = options.statistics.get();
Env* env = options.env;
Logger* logger = options.info_log.get();
Slice db_slice(*value);
Slice* merge_data;
if (s.ok()) {
merge_data = &db_slice;
} else { // Key not present in db (s.IsNotFound())
merge_data = nullptr;
} }
return 0;
}
int WriteBatchEntryComparator::CompareKey(uint32_t column_family, s = MergeHelper::TimedFullMerge(
const Slice& key1, key, merge_data, merge_context.GetOperands(), merge_operator,
const Slice& key2) const { statistics, env, logger, value);
auto comparator_for_cf = cf_comparator_map_.find(column_family);
if (comparator_for_cf != cf_comparator_map_.end()) {
return comparator_for_cf->second->Compare(key1, key2);
} else {
return default_comparator_->Compare(key1, key2);
} }
}
return s;
} }
} // namespace rocksdb } // namespace rocksdb

@ -0,0 +1,242 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/column_family.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/coding.h"
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace rocksdb {
class Env;
class Logger;
class Statistics;
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
Slice* value,
Slice* blob) const {
if (type == nullptr || Key == nullptr || value == nullptr ||
blob == nullptr) {
return Status::InvalidArgument("Output parameters cannot be null");
}
if (data_offset >= GetDataSize()) {
return Status::InvalidArgument("data offset exceed write batch size");
}
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
char tag;
uint32_t column_family;
Status s =
ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob);
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
*type = kPutRecord;
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
*type = kDeleteRecord;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
*type = kMergeRecord;
break;
case kTypeLogData:
*type = kLogDataRecord;
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
return Status::OK();
}
int WriteBatchEntryComparator::operator()(
const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const {
if (entry1->column_family > entry2->column_family) {
return 1;
} else if (entry1->column_family < entry2->column_family) {
return -1;
}
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
return -1;
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
return 1;
}
Status s;
Slice key1, key2;
if (entry1->search_key == nullptr) {
Slice value, blob;
WriteType write_type;
s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1,
&value, &blob);
if (!s.ok()) {
return 1;
}
} else {
key1 = *(entry1->search_key);
}
if (entry2->search_key == nullptr) {
Slice value, blob;
WriteType write_type;
s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2,
&value, &blob);
if (!s.ok()) {
return -1;
}
} else {
key2 = *(entry2->search_key);
}
int cmp = CompareKey(entry1->column_family, key1, key2);
if (cmp != 0) {
return cmp;
} else if (entry1->offset > entry2->offset) {
return 1;
} else if (entry1->offset < entry2->offset) {
return -1;
}
return 0;
}
int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
const Slice& key1,
const Slice& key2) const {
auto comparator_for_cf = cf_comparator_map_.find(column_family);
if (comparator_for_cf != cf_comparator_map_.end()) {
return comparator_for_cf->second->Compare(key1, key2);
} else {
return default_comparator_->Compare(key1, key2);
}
}
WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
const DBOptions& options, WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family, const Slice& key,
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
std::string* value, Status* s) {
uint32_t cf_id = GetColumnFamilyID(column_family);
*s = Status::OK();
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::Result::kNotFound;
std::unique_ptr<WBWIIterator> iter =
std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
// We want to iterate in the reverse order that the writes were added to the
// batch. Since we don't have a reverse iterator, we must seek past the end.
// TODO(agiardullo): consider adding support for reverse iteration
iter->Seek(key);
while (iter->Valid()) {
const WriteEntry& entry = iter->Entry();
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
break;
}
iter->Next();
}
if (!(*s).ok()) {
return WriteBatchWithIndexInternal::Result::kError;
}
if (!iter->Valid()) {
// Read past end of results. Reposition on last result.
iter->SeekToLast();
} else {
iter->Prev();
}
const Slice* entry_value = nullptr;
while (iter->Valid()) {
const WriteEntry& entry = iter->Entry();
if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
// Unexpected error or we've reached a different next key
break;
}
switch (entry.type) {
case kPutRecord: {
result = WriteBatchWithIndexInternal::Result::kFound;
entry_value = &entry.value;
break;
}
case kMergeRecord: {
result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
merge_context->PushOperand(entry.value);
break;
}
case kDeleteRecord: {
result = WriteBatchWithIndexInternal::Result::kDeleted;
break;
}
case kLogDataRecord: {
// ignore
break;
}
default: {
result = WriteBatchWithIndexInternal::Result::kError;
(*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
std::to_string(entry.type));
break;
}
}
if (result == WriteBatchWithIndexInternal::Result::kFound ||
result == WriteBatchWithIndexInternal::Result::kDeleted ||
result == WriteBatchWithIndexInternal::Result::kError) {
// We can stop iterating once we find a PUT or DELETE
break;
}
iter->Prev();
}
if ((*s).ok()) {
if (result == WriteBatchWithIndexInternal::Result::kFound ||
result == WriteBatchWithIndexInternal::Result::kDeleted) {
// Found a Put or Delete. Merge if necessary.
if (merge_context->GetNumOperands() > 0) {
const MergeOperator* merge_operator;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
merge_operator = cfh->cfd()->ioptions()->merge_operator;
} else {
*s = Status::InvalidArgument("Must provide a column_family");
result = WriteBatchWithIndexInternal::Result::kError;
return result;
}
Statistics* statistics = options.statistics.get();
Env* env = options.env;
Logger* logger = options.info_log.get();
*s = MergeHelper::TimedFullMerge(
key, entry_value, merge_context->GetOperands(), merge_operator,
statistics, env, logger, value);
if ((*s).ok()) {
result = WriteBatchWithIndexInternal::Result::kFound;
} else {
result = WriteBatchWithIndexInternal::Result::kError;
}
} else { // nothing to merge
if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT
value->assign(entry_value->data(), entry_value->size());
}
}
}
}
return result;
}
} // namespace rocksdb

@ -0,0 +1,96 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <limits>
#include <string>
#include <unordered_map>
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/write_batch_with_index.h"
namespace rocksdb {
class MergeContext;
struct Options;
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
struct WriteBatchIndexEntry {
WriteBatchIndexEntry(size_t o, uint32_t c)
: offset(o), column_family(c), search_key(nullptr) {}
WriteBatchIndexEntry(const Slice* sk, uint32_t c)
: offset(0), column_family(c), search_key(sk) {}
// If this flag appears in the offset, it indicates a key that is smaller
// than any other entry for the same column family
static const size_t kFlagMin = std::numeric_limits<size_t>::max();
size_t offset; // offset of an entry in write batch's string buffer.
uint32_t column_family; // column family of the entry
const Slice* search_key; // if not null, instead of reading keys from
// write batch, use it to compare. This is used
// for lookup key.
};
class ReadableWriteBatch : public WriteBatch {
public:
explicit ReadableWriteBatch(size_t reserved_bytes = 0)
: WriteBatch(reserved_bytes) {}
// Retrieve some information from a write entry in the write batch, given
// the start offset of the write entry.
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
Slice* value, Slice* blob) const;
};
class WriteBatchEntryComparator {
public:
WriteBatchEntryComparator(const Comparator* _default_comparator,
const ReadableWriteBatch* write_batch)
: default_comparator_(_default_comparator), write_batch_(write_batch) {}
// Compare a and b. Return a negative value if a is less than b, 0 if they
// are equal, and a positive value if a is greater than b
int operator()(const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const;
int CompareKey(uint32_t column_family, const Slice& key1,
const Slice& key2) const;
void SetComparatorForCF(uint32_t column_family_id,
const Comparator* comparator) {
cf_comparator_map_[column_family_id] = comparator;
}
const Comparator* default_comparator() { return default_comparator_; }
private:
const Comparator* default_comparator_;
std::unordered_map<uint32_t, const Comparator*> cf_comparator_map_;
const ReadableWriteBatch* write_batch_;
};
class WriteBatchWithIndexInternal {
public:
enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError };
// If batch contains a value for key, store it in *value and return kFound.
// If batch contains a deletion for key, return Deleted.
// If batch contains Merge operations as the most recent entry for a key,
// and the merge process does not stop (not reaching a value or delete),
// prepend the current merge operands to *operands,
// and return kMergeInProgress
// If batch does not contain this key, return kNotFound
// Else, return kError on error with error Status stored in *s.
static WriteBatchWithIndexInternal::Result GetFromBatch(
const DBOptions& options, WriteBatchWithIndex* batch,
ColumnFamilyHandle* column_family, const Slice& key,
MergeContext* merge_context, WriteBatchEntryComparator* cmp,
std::string* value, Status* s);
};
} // namespace rocksdb

@ -13,6 +13,8 @@
#include "db/column_family.h" #include "db/column_family.h"
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h"
namespace rocksdb { namespace rocksdb {
@ -907,6 +909,279 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) {
} }
} }
TEST_F(WriteBatchWithIndexTest, TestGetFromBatch) {
Options options;
WriteBatchWithIndex batch;
Status s;
std::string value;
s = batch.GetFromBatch(options, "b", &value);
ASSERT_TRUE(s.IsNotFound());
batch.Put("a", "a");
batch.Put("b", "b");
batch.Put("c", "c");
batch.Put("a", "z");
batch.Delete("c");
batch.Delete("d");
batch.Delete("e");
batch.Put("e", "e");
s = batch.GetFromBatch(options, "b", &value);
ASSERT_OK(s);
ASSERT_EQ("b", value);
s = batch.GetFromBatch(options, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("z", value);
s = batch.GetFromBatch(options, "c", &value);
ASSERT_TRUE(s.IsNotFound());
s = batch.GetFromBatch(options, "d", &value);
ASSERT_TRUE(s.IsNotFound());
s = batch.GetFromBatch(options, "x", &value);
ASSERT_TRUE(s.IsNotFound());
s = batch.GetFromBatch(options, "e", &value);
ASSERT_OK(s);
ASSERT_EQ("e", value);
batch.Merge("z", "z");
s = batch.GetFromBatch(options, "z", &value);
ASSERT_NOK(s); // No merge operator specified.
s = batch.GetFromBatch(options, "b", &value);
ASSERT_OK(s);
ASSERT_EQ("b", value);
}
TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) {
DB* db;
Options options;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
options.create_if_missing = true;
std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
DestroyDB(dbname, options);
Status s = DB::Open(options, dbname, &db);
assert(s.ok());
ColumnFamilyHandle* column_family = db->DefaultColumnFamily();
WriteBatchWithIndex batch;
std::string value;
s = batch.GetFromBatch(options, "x", &value);
ASSERT_TRUE(s.IsNotFound());
batch.Put("x", "X");
std::string expected = "X";
for (int i = 0; i < 5; i++) {
batch.Merge("x", std::to_string(i));
expected = expected + "," + std::to_string(i);
if (i % 2 == 0) {
batch.Put("y", std::to_string(i / 2));
}
batch.Merge("z", "z");
s = batch.GetFromBatch(column_family, options, "x", &value);
ASSERT_OK(s);
ASSERT_EQ(expected, value);
s = batch.GetFromBatch(column_family, options, "y", &value);
ASSERT_OK(s);
ASSERT_EQ(std::to_string(i / 2), value);
s = batch.GetFromBatch(column_family, options, "z", &value);
ASSERT_TRUE(s.IsMergeInProgress());
}
delete db;
DestroyDB(dbname, options);
}
TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) {
DB* db;
Options options;
options.create_if_missing = true;
std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
DestroyDB(dbname, options);
Status s = DB::Open(options, dbname, &db);
assert(s.ok());
WriteBatchWithIndex batch;
ReadOptions read_options;
WriteOptions write_options;
std::string value;
s = db->Put(write_options, "a", "a");
ASSERT_OK(s);
s = db->Put(write_options, "b", "b");
ASSERT_OK(s);
s = db->Put(write_options, "c", "c");
ASSERT_OK(s);
batch.Put("a", "batch.a");
batch.Delete("b");
s = batch.GetFromBatchAndDB(db, read_options, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("batch.a", value);
s = batch.GetFromBatchAndDB(db, read_options, "b", &value);
ASSERT_TRUE(s.IsNotFound());
s = batch.GetFromBatchAndDB(db, read_options, "c", &value);
ASSERT_OK(s);
ASSERT_EQ("c", value);
s = batch.GetFromBatchAndDB(db, read_options, "x", &value);
ASSERT_TRUE(s.IsNotFound());
db->Delete(write_options, "x");
s = batch.GetFromBatchAndDB(db, read_options, "x", &value);
ASSERT_TRUE(s.IsNotFound());
delete db;
DestroyDB(dbname, options);
}
TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) {
DB* db;
Options options;
options.create_if_missing = true;
std::string dbname = test::TmpDir() + "/write_batch_with_index_test";
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
DestroyDB(dbname, options);
Status s = DB::Open(options, dbname, &db);
assert(s.ok());
WriteBatchWithIndex batch;
ReadOptions read_options;
WriteOptions write_options;
std::string value;
s = db->Put(write_options, "a", "a0");
ASSERT_OK(s);
s = db->Put(write_options, "b", "b0");
ASSERT_OK(s);
s = db->Merge(write_options, "b", "b1");
ASSERT_OK(s);
s = db->Merge(write_options, "c", "c0");
ASSERT_OK(s);
s = db->Merge(write_options, "d", "d0");
ASSERT_OK(s);
batch.Merge("a", "a1");
batch.Merge("a", "a2");
batch.Merge("b", "b2");
batch.Merge("d", "d1");
batch.Merge("e", "e0");
s = batch.GetFromBatchAndDB(db, read_options, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("a0,a1,a2", value);
s = batch.GetFromBatchAndDB(db, read_options, "b", &value);
ASSERT_OK(s);
ASSERT_EQ("b0,b1,b2", value);
s = batch.GetFromBatchAndDB(db, read_options, "c", &value);
ASSERT_OK(s);
ASSERT_EQ("c0", value);
s = batch.GetFromBatchAndDB(db, read_options, "d", &value);
ASSERT_OK(s);
ASSERT_EQ("d0,d1", value);
s = batch.GetFromBatchAndDB(db, read_options, "e", &value);
ASSERT_OK(s);
ASSERT_EQ("e0", value);
s = db->Delete(write_options, "x");
ASSERT_OK(s);
s = batch.GetFromBatchAndDB(db, read_options, "x", &value);
ASSERT_TRUE(s.IsNotFound());
const Snapshot* snapshot = db->GetSnapshot();
ReadOptions snapshot_read_options;
snapshot_read_options.snapshot = snapshot;
s = db->Delete(write_options, "a");
ASSERT_OK(s);
s = batch.GetFromBatchAndDB(db, read_options, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("a1,a2", value);
s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("a0,a1,a2", value);
batch.Delete("a");
s = batch.GetFromBatchAndDB(db, read_options, "a", &value);
ASSERT_TRUE(s.IsNotFound());
s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Merge(write_options, "c", "c1");
ASSERT_OK(s);
s = batch.GetFromBatchAndDB(db, read_options, "c", &value);
ASSERT_OK(s);
ASSERT_EQ("c0,c1", value);
s = batch.GetFromBatchAndDB(db, snapshot_read_options, "c", &value);
ASSERT_OK(s);
ASSERT_EQ("c0", value);
s = db->Put(write_options, "e", "e1");
ASSERT_OK(s);
s = batch.GetFromBatchAndDB(db, read_options, "e", &value);
ASSERT_OK(s);
ASSERT_EQ("e1,e0", value);
s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value);
ASSERT_OK(s);
ASSERT_EQ("e0", value);
s = db->Delete(write_options, "e");
ASSERT_OK(s);
s = batch.GetFromBatchAndDB(db, read_options, "e", &value);
ASSERT_OK(s);
ASSERT_EQ("e0", value);
s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value);
ASSERT_OK(s);
ASSERT_EQ("e0", value);
db->ReleaseSnapshot(snapshot);
delete db;
DestroyDB(dbname, options);
}
} // namespace } // namespace
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save