Merge branch 'master' into performance

main
kailiu 11 years ago
commit 551e9428ce
  1. 4
      INSTALL.md
  2. 4
      README
  3. 24
      build_tools/regression_build_test.sh
  4. 44
      db/db_bench.cc
  5. 68
      db/db_impl.cc
  6. 14
      db/db_impl.h
  7. 10
      db/db_iter.cc
  8. 14
      db/db_statistics.cc
  9. 4
      db/db_statistics.h
  10. 372
      db/db_test.cc
  11. 4
      db/memtable.cc
  12. 2
      db/memtable.h
  13. 7
      db/memtablelist.cc
  14. 3
      db/memtablelist.h
  15. 4
      db/perf_context_test.cc
  16. 1
      db/prefix_test.cc
  17. 4
      db/repair.cc
  18. 4
      db/write_batch_test.cc
  19. 10
      include/rocksdb/compaction_filter.h
  20. 7
      include/rocksdb/db.h
  21. 32
      include/rocksdb/memtablerep.h
  22. 30
      include/rocksdb/options.h
  23. 7
      include/rocksdb/statistics.h
  24. 4
      include/utilities/stackable_db.h
  25. 7
      table/block_based_table_builder.cc
  26. 10
      table/block_based_table_reader.cc
  27. 4
      table/block_based_table_reader.h
  28. 44
      table/table_test.cc
  29. 28
      tools/db_stress.cc
  30. 41
      util/hash_skiplist_rep.cc
  31. 45
      util/hash_skiplist_rep.h
  32. 58
      util/ldb_cmd.cc
  33. 9
      util/options.cc
  34. 19
      util/stl_wrappers.h
  35. 426
      util/transformrep.cc
  36. 4
      utilities/ttl/db_ttl.cc
  37. 2
      utilities/ttl/db_ttl.h

@ -19,13 +19,13 @@ libraries. You are on your own.
* **Linux**
* Upgrade your gcc to version at least 4.7 to get C++11 support.
* Install gflags. If you're on Ubuntu, here's a nice tutorial:
* Install gflags. First, try: `sudo apt-get install libgflags-dev`.
If this doesn't work and you're using Ubuntu, here's a nice tutorial:
(http://askubuntu.com/questions/312173/installing-gflags-12-04)
* Install snappy. This is usually as easy as:
`sudo apt-get install libsnappy-dev`.
* Install zlib. Try: `sudo apt-get install zlib1g-dev`.
* Install bzip2: `sudo apt-get install libbz2-dev`.
* Install gflags: `sudo apt-get install libgflags-dev`.
* **OS X**:
* Install latest C++ compiler that supports C++ 11:
* Update XCode: run `xcode-select --install` (or install it from XCode App's settting).

@ -16,8 +16,8 @@ The core of this code has been derived from open-source leveldb.
The code under this directory implements a system for maintaining a
persistent key/value store.
See doc/index.html for more explanation.
See doc/impl.html for a brief overview of the implementation.
See doc/index.html and github wiki (https://github.com/facebook/rocksdb/wiki)
for more explanation.
The public interface is in include/*. Callers should not include or
rely on the details of any other header files in this package. Those

@ -65,7 +65,7 @@ OPT=-DNDEBUG make db_bench -j$(nproc)
--sync=0 \
--threads=8 > ${STAT_FILE}.overwrite
# fill up the db for readrandom benchmark
# fill up the db for readrandom benchmark (1GB total size)
./db_bench \
--benchmarks=fillseq \
--db=$DATA_DIR \
@ -83,7 +83,7 @@ OPT=-DNDEBUG make db_bench -j$(nproc)
--sync=0 \
--threads=1 > /dev/null
# measure readrandom
# measure readrandom with 6GB block cache
./db_bench \
--benchmarks=readrandom \
--db=$DATA_DIR \
@ -102,6 +102,25 @@ OPT=-DNDEBUG make db_bench -j$(nproc)
--sync=0 \
--threads=32 > ${STAT_FILE}.readrandom
# measure readrandom with 300MB block cache
./db_bench \
--benchmarks=readrandom \
--db=$DATA_DIR \
--use_existing_db=1 \
--bloom_bits=10 \
--num=$NUM \
--reads=$NUM \
--cache_size=314572800 \
--cache_numshardbits=8 \
--open_files=55000 \
--disable_seek_compaction=1 \
--statistics=1 \
--histogram=1 \
--disable_data_sync=1 \
--disable_wal=1 \
--sync=0 \
--threads=32 > ${STAT_FILE}.readrandomsmallblockcache
# measure memtable performance -- none of the data gets flushed to disk
./db_bench \
--benchmarks=fillrandom,readrandom, \
@ -154,5 +173,6 @@ function send_benchmark_to_ods {
send_benchmark_to_ods overwrite overwrite $STAT_FILE.overwrite
send_benchmark_to_ods fillseq fillseq $STAT_FILE.fillseq
send_benchmark_to_ods readrandom readrandom $STAT_FILE.readrandom
send_benchmark_to_ods readrandom readrandom_smallblockcache $STAT_FILE.readrandomsmallblockcache
send_benchmark_to_ods fillrandom memtablefillrandom $STAT_FILE.memtablefillreadrandom
send_benchmark_to_ods readrandom memtablereadrandom $STAT_FILE.memtablefillreadrandom

@ -191,6 +191,10 @@ DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
DEFINE_int32(universal_max_size_amplification_percent, 0,
"The max size amplification for universal style compaction");
DEFINE_int32(universal_compression_size_percent, -1,
"The percentage of the database to compress for universal "
"compaction. -1 means compress everything.");
DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed"
"data. Negative means use default settings.");
@ -325,6 +329,23 @@ DEFINE_string(compression_type, "snappy",
static enum rocksdb::CompressionType FLAGS_compression_type_e =
rocksdb::kSnappyCompression;
DEFINE_int32(compression_level, -1,
"Compression level. For zlib this should be -1 for the "
"default level, or between 0 and 9.");
static bool ValidateCompressionLevel(const char* flagname, int32_t value) {
if (value < -1 || value > 9) {
fprintf(stderr, "Invalid value for --%s: %d, must be between -1 and 9\n",
flagname, value);
return false;
}
return true;
}
static const bool FLAGS_compression_level_dummy =
google::RegisterFlagValidator(&FLAGS_compression_level,
&ValidateCompressionLevel);
DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
" from this level. Levels with number < min_level_to_compress are"
" not compressed. Otherwise, apply compression_type to "
@ -434,12 +455,11 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
}
return true;
}
DEFINE_int32(prefix_size, 0, "Control the prefix size for PrefixHashRep");
DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipList");
enum RepFactory {
kSkipList,
kPrefixHash,
kUnsorted,
kVectorRep
};
enum RepFactory StringToRepFactory(const char* ctype) {
@ -449,8 +469,6 @@ enum RepFactory StringToRepFactory(const char* ctype) {
return kSkipList;
else if (!strcasecmp(ctype, "prefix_hash"))
return kPrefixHash;
else if (!strcasecmp(ctype, "unsorted"))
return kUnsorted;
else if (!strcasecmp(ctype, "vector"))
return kVectorRep;
@ -807,9 +825,6 @@ class Benchmark {
case kSkipList:
fprintf(stdout, "Memtablerep: skip_list\n");
break;
case kUnsorted:
fprintf(stdout, "Memtablerep: unsorted\n");
break;
case kVectorRep:
fprintf(stdout, "Memtablerep: vector\n");
break;
@ -1334,14 +1349,8 @@ class Benchmark {
}
switch (FLAGS_rep_factory) {
case kPrefixHash:
options.memtable_factory.reset(
new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size))
);
break;
case kUnsorted:
options.memtable_factory.reset(
new UnsortedRepFactory
);
options.memtable_factory.reset(NewHashSkipListRepFactory(
NewFixedPrefixTransform(FLAGS_prefix_size)));
break;
case kSkipList:
// no need to do anything
@ -1368,6 +1377,7 @@ class Benchmark {
options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type_e;
options.compression_opts.level = FLAGS_compression_level;
options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
if (FLAGS_min_level_to_compress >= 0) {
@ -1429,6 +1439,10 @@ class Benchmark {
options.compaction_options_universal.max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent;
}
if (FLAGS_universal_compression_size_percent != -1) {
options.compaction_options_universal.compression_size_percent =
FLAGS_universal_compression_size_percent;
}
Status s;
if(FLAGS_readonly) {

@ -51,6 +51,7 @@
#include "util/auto_roll_logger.h"
#include "util/build_version.h"
#include "util/coding.h"
#include "util/hash_skiplist_rep.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
@ -163,10 +164,10 @@ Options SanitizeOptions(const std::string& dbname,
Log(result.info_log, "Compaction filter specified, ignore factory");
}
if (result.prefix_extractor) {
// If a prefix extractor has been supplied and a PrefixHashRepFactory is
// If a prefix extractor has been supplied and a HashSkipListRepFactory is
// being used, make sure that the latter uses the former as its transform
// function.
auto factory = dynamic_cast<PrefixHashRepFactory*>(
auto factory = dynamic_cast<HashSkipListRepFactory*>(
result.memtable_factory.get());
if (factory &&
factory->GetTransform() != result.prefix_extractor) {
@ -236,7 +237,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mutex_(options.use_adaptive_mutex),
shutting_down_(nullptr),
bg_cv_(&mutex_),
mem_rep_factory_(options_.memtable_factory),
mem_rep_factory_(options_.memtable_factory.get()),
mem_(new MemTable(internal_comparator_, mem_rep_factory_,
NumberLevels(), options_)),
logfile_number_(0),
@ -516,6 +517,19 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// free pending memtables
for (auto m : state.memtables_to_free) {
delete m;
}
// check if there is anything to do
if (!state.all_files.size() &&
!state.sst_delete_files.size() &&
!state.log_delete_files.size()) {
return;
}
// this checks if FindObsoleteFiles() was run before. If not, don't do
// PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
// run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
@ -1170,7 +1184,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Replace immutable memtable with the generated Table
s = imm_.InstallMemtableFlushResults(
mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_);
file_number, pending_outputs_, &deletion_state.memtables_to_free);
if (s.ok()) {
if (madeProgress) {
@ -1656,7 +1670,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
void DBImpl::BackgroundCallFlush() {
bool madeProgress = false;
DeletionState deletion_state;
DeletionState deletion_state(options_.max_write_buffer_number);
assert(bg_flush_scheduled_);
MutexLock l(&mutex_);
@ -1702,7 +1716,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false;
DeletionState deletion_state;
DeletionState deletion_state(options_.max_write_buffer_number);
MaybeDumpStats();
@ -1732,6 +1746,7 @@ void DBImpl::BackgroundCallCompaction() {
// FindObsoleteFiles(). This is because deletion_state does not catch
// all created files if compaction failed.
FindObsoleteFiles(deletion_state, !s.ok());
// delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete()) {
mutex_.Unlock();
@ -2492,25 +2507,20 @@ struct IterState {
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
std::vector<MemTable*> to_delete;
to_delete.reserve(state->mem.size());
DBImpl::DeletionState deletion_state(state->db->GetOptions().
max_write_buffer_number);
state->mu->Lock();
for (unsigned int i = 0; i < state->mem.size(); i++) {
MemTable* m = state->mem[i]->Unref();
if (m != nullptr) {
to_delete.push_back(m);
deletion_state.memtables_to_free.push_back(m);
}
}
state->version->Unref();
// delete only the sst obsolete files
DBImpl::DeletionState deletion_state;
// fast path FindObsoleteFiles
state->db->FindObsoleteFiles(deletion_state, false, true);
state->mu->Unlock();
state->db->PurgeObsoleteFiles(deletion_state);
// delete obsolete memtables outside the db-mutex
for (MemTable* m : to_delete) delete m;
delete state;
}
} // namespace
@ -2612,8 +2622,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);
if (mem->Get(lkey, value, &s, merge_context, options_)) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else if (imm.Get(lkey, value, &s, merge_context, options_)) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else {
StopWatchNano from_files_timer(env_, false);
StartPerfTimer(&from_files_timer);
@ -2622,6 +2634,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
options_, value_found);
have_stat_update = true;
BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer);
RecordTick(options_.statistics.get(), MEMTABLE_MISS);
}
StopWatchNano post_process_timer(env_, false);
@ -3514,6 +3527,33 @@ void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata) {
return versions_->GetLiveFilesMetaData(metadata);
}
Status DBImpl::GetDbIdentity(std::string& identity) {
std::string idfilename = IdentityFileName(dbname_);
unique_ptr<SequentialFile> idfile;
const EnvOptions soptions;
Status s = env_->NewSequentialFile(idfilename, &idfile, soptions);
if (!s.ok()) {
return s;
}
uint64_t file_size;
s = env_->GetFileSize(idfilename, &file_size);
if (!s.ok()) {
return s;
}
char buffer[file_size];
Slice id;
s = idfile->Read(file_size, &id, buffer);
if (!s.ok()) {
return s;
}
identity.assign(id.ToString());
// If last character is '\n' remove it from identity
if (identity.size() > 0 && identity.back() == '\n') {
identity.pop_back();
}
return s;
}
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {

@ -85,6 +85,8 @@ class DBImpl : public DB {
virtual void GetLiveFilesMetaData(
std::vector<LiveFileMetaData> *metadata);
virtual Status GetDbIdentity(std::string& identity);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin, *end]
@ -129,10 +131,12 @@ class DBImpl : public DB {
struct DeletionState {
inline bool HaveSomethingToDelete() const {
return all_files.size() ||
return memtables_to_free.size() ||
all_files.size() ||
sst_delete_files.size() ||
log_delete_files.size();
}
// a list of all files that we'll consider deleting
// (every once in a while this is filled up with all files
// in the DB directory)
@ -147,14 +151,18 @@ class DBImpl : public DB {
// a list of log files that we need to delete
std::vector<uint64_t> log_delete_files;
// a list of memtables to be free
std::vector<MemTable *> memtables_to_free;
// the current manifest_file_number, log_number and prev_log_number
// that corresponds to the set of files in 'live'.
uint64_t manifest_file_number, log_number, prev_log_number;
DeletionState() {
explicit DeletionState(const int num_memtables = 0) {
manifest_file_number = 0;
log_number = 0;
prev_log_number = 0;
memtables_to_free.reserve(num_memtables);
}
};
@ -309,7 +317,7 @@ class DBImpl : public DB {
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes
std::shared_ptr<MemTableRepFactory> mem_rep_factory_;
MemTableRepFactory* mem_rep_factory_;
MemTable* mem_;
MemTableList imm_; // Memtable that are not changing
uint64_t logfile_number_;

@ -61,7 +61,7 @@ class DBIter: public Iterator {
const Comparator* cmp, Iterator* iter, SequenceNumber s)
: dbname_(dbname),
env_(env),
logger_(options.info_log),
logger_(options.info_log.get()),
user_comparator_(cmp),
user_merge_operator_(options.merge_operator.get()),
iter_(iter),
@ -123,7 +123,7 @@ class DBIter: public Iterator {
const std::string* const dbname_;
Env* const env_;
shared_ptr<Logger> logger_;
Logger* logger_;
const Comparator* const user_comparator_;
const MergeOperator* const user_merge_operator_;
Iterator* const iter_;
@ -302,7 +302,7 @@ void DBIter::MergeValuesNewToOld() {
// ignore corruption if there is any.
const Slice value = iter_->value();
user_merge_operator_->FullMerge(ikey.user_key, &value, operands,
&saved_value_, logger_.get());
&saved_value_, logger_);
// iter_ is positioned after put
iter_->Next();
return;
@ -319,7 +319,7 @@ void DBIter::MergeValuesNewToOld() {
Slice(operands[0]),
Slice(operands[1]),
&merge_result,
logger_.get())) {
logger_)) {
operands.pop_front();
swap(operands.front(), merge_result);
} else {
@ -336,7 +336,7 @@ void DBIter::MergeValuesNewToOld() {
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
user_merge_operator_->FullMerge(saved_key_, nullptr, operands,
&saved_value_, logger_.get());
&saved_value_, logger_);
}
void DBIter::Prev() {

@ -0,0 +1,14 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/db_statistics.h"
namespace rocksdb {
std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<DBStatistics>();
}
} // namespace rocksdb

@ -58,8 +58,6 @@ class DBStatistics: public Statistics {
std::vector<HistogramImpl> allHistograms_;
};
std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<DBStatistics>();
}
std::shared_ptr<Statistics> CreateDBStatistics();
} // namespace rocksdb

@ -245,7 +245,6 @@ class DBTest {
enum OptionConfig {
kDefault,
kVectorRep,
kUnsortedRep,
kMergePut,
kFilter,
kUncompressed,
@ -256,7 +255,7 @@ class DBTest {
kCompactOnFlush,
kPerfOptions,
kDeletesFilterFirst,
kPrefixHashRep,
kHashSkipList,
kUniversalCompaction,
kCompressedBlockCache,
kEnd
@ -340,9 +339,9 @@ class DBTest {
Options CurrentOptions() {
Options options;
switch (option_config_) {
case kPrefixHashRep:
options.memtable_factory.reset(new
PrefixHashRepFactory(NewFixedPrefixTransform(1)));
case kHashSkipList:
options.memtable_factory.reset(
NewHashSkipListRepFactory(NewFixedPrefixTransform(1)));
break;
case kMergePut:
options.merge_operator = MergeOperators::CreatePutOperator();
@ -376,9 +375,6 @@ class DBTest {
case kDeletesFilterFirst:
options.filter_deletes = true;
break;
case kUnsortedRep:
options.memtable_factory.reset(new UnsortedRepFactory);
break;
case kVectorRep:
options.memtable_factory.reset(new VectorRepFactory(100));
break;
@ -1776,31 +1772,23 @@ TEST(DBTest, ManifestRollOver) {
TEST(DBTest, IdentityAcrossRestarts) {
do {
std::string idfilename = IdentityFileName(dbname_);
unique_ptr<SequentialFile> idfile;
const EnvOptions soptions;
ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions));
char buffer1[100];
Slice id1;
ASSERT_OK(idfile->Read(100, &id1, buffer1));
std::string id1;
ASSERT_OK(db_->GetDbIdentity(id1));
Options options = CurrentOptions();
Reopen(&options);
char buffer2[100];
Slice id2;
ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions));
ASSERT_OK(idfile->Read(100, &id2, buffer2));
std::string id2;
ASSERT_OK(db_->GetDbIdentity(id2));
// id1 should match id2 because identity was not regenerated
ASSERT_EQ(id1.ToString(), id2.ToString());
ASSERT_EQ(id1.compare(id2), 0);
std::string idfilename = IdentityFileName(dbname_);
ASSERT_OK(env_->DeleteFile(idfilename));
Reopen(&options);
char buffer3[100];
Slice id3;
ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions));
ASSERT_OK(idfile->Read(100, &id3, buffer3));
// id1 should NOT match id2 because identity was regenerated
ASSERT_NE(id1.ToString(0), id3.ToString());
std::string id3;
ASSERT_OK(db_->GetDbIdentity(id3));
// id1 should NOT match id3 because identity was regenerated
ASSERT_NE(id1.compare(id3), 0);
} while (ChangeCompactOptions());
}
@ -1856,94 +1844,6 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) {
}
}
// TODO(kailiu) disable the in non-linux platforms to temporarily solve
// the unit test failure.
#ifdef OS_LINUX
TEST(DBTest, CompressedCache) {
int num_iter = 80;
// Run this test three iterations.
// Iteration 1: only a uncompressed block cache
// Iteration 2: only a compressed block cache
// Iteration 3: both block cache and compressed cache
for (int iter = 0; iter < 3; iter++) {
Options options = CurrentOptions();
options.write_buffer_size = 64*1024; // small write buffer
options.statistics = rocksdb::CreateDBStatistics();
switch (iter) {
case 0:
// only uncompressed block cache
options.block_cache = NewLRUCache(8*1024);
options.block_cache_compressed = nullptr;
break;
case 1:
// no block cache, only compressed cache
options.no_block_cache = true;
options.block_cache = nullptr;
options.block_cache_compressed = NewLRUCache(8*1024);
break;
case 2:
// both compressed and uncompressed block cache
options.block_cache = NewLRUCache(1024);
options.block_cache_compressed = NewLRUCache(8*1024);
break;
default:
ASSERT_TRUE(false);
}
Reopen(&options);
Random rnd(301);
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
std::vector<std::string> values;
std::string str;
for (int i = 0; i < num_iter; i++) {
if (i % 4 == 0) { // high compression ratio
str = RandomString(&rnd, 1000);
}
values.push_back(str);
ASSERT_OK(Put(Key(i), values[i]));
}
// flush all data from memtable so that reads are from block cache
dbfull()->Flush(FlushOptions());
for (int i = 0; i < num_iter; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
// check that we triggered the appropriate code paths in the cache
switch (iter) {
case 0:
// only uncompressed block cache
ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_EQ(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
case 1:
// no block cache, only compressed cache
ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_GT(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
case 2:
// both compressed and uncompressed block cache
ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_GT(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
default:
ASSERT_TRUE(false);
}
}
}
#endif
TEST(DBTest, CompactionTrigger) {
Options options = CurrentOptions();
options.write_buffer_size = 100<<10; //100KB
@ -2185,9 +2085,91 @@ TEST(DBTest, UniversalCompactionOptions) {
}
}
// TODO(kailiu) disable the in non-linux platforms to temporarily solve
// the unit test failure.
#ifdef OS_LINUX
#if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2)
TEST(DBTest, CompressedCache) {
int num_iter = 80;
// Run this test three iterations.
// Iteration 1: only a uncompressed block cache
// Iteration 2: only a compressed block cache
// Iteration 3: both block cache and compressed cache
for (int iter = 0; iter < 3; iter++) {
Options options = CurrentOptions();
options.write_buffer_size = 64*1024; // small write buffer
options.statistics = rocksdb::CreateDBStatistics();
switch (iter) {
case 0:
// only uncompressed block cache
options.block_cache = NewLRUCache(8*1024);
options.block_cache_compressed = nullptr;
break;
case 1:
// no block cache, only compressed cache
options.no_block_cache = true;
options.block_cache = nullptr;
options.block_cache_compressed = NewLRUCache(8*1024);
break;
case 2:
// both compressed and uncompressed block cache
options.block_cache = NewLRUCache(1024);
options.block_cache_compressed = NewLRUCache(8*1024);
break;
default:
ASSERT_TRUE(false);
}
Reopen(&options);
Random rnd(301);
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
std::vector<std::string> values;
std::string str;
for (int i = 0; i < num_iter; i++) {
if (i % 4 == 0) { // high compression ratio
str = RandomString(&rnd, 1000);
}
values.push_back(str);
ASSERT_OK(Put(Key(i), values[i]));
}
// flush all data from memtable so that reads are from block cache
dbfull()->Flush(FlushOptions());
for (int i = 0; i < num_iter; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
// check that we triggered the appropriate code paths in the cache
switch (iter) {
case 0:
// only uncompressed block cache
ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_EQ(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
case 1:
// no block cache, only compressed cache
ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_GT(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
case 2:
// both compressed and uncompressed block cache
ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_GT(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
default:
ASSERT_TRUE(false);
}
}
}
static std::string CompressibleString(Random* rnd, int len) {
std::string r;
test::CompressibleString(rnd, 0.8, len, &r);
@ -4535,6 +4517,10 @@ class ModelDB: public DB {
return Status::OK();
}
virtual Status GetDbIdentity(std::string& identity) {
return Status::OK();
}
virtual SequenceNumber GetLatestSequenceNumber() const {
return 0;
}
@ -4647,7 +4633,7 @@ TEST(DBTest, Randomized) {
// TODO(sanjay): Test Get() works
int p = rnd.Uniform(100);
int minimum = 0;
if (option_config_ == kPrefixHashRep) {
if (option_config_ == kHashSkipList) {
minimum = 1;
}
if (p < 45) { // Put
@ -4817,90 +4803,82 @@ void PrefixScanInit(DBTest *dbtest) {
}
TEST(DBTest, PrefixScan) {
for (int it = 0; it < 2; ++it) {
ReadOptions ro = ReadOptions();
int count;
Slice prefix;
Slice key;
char buf[100];
Iterator* iter;
snprintf(buf, sizeof(buf), "03______:");
prefix = Slice(buf, 8);
key = Slice(buf, 9);
auto prefix_extractor = NewFixedPrefixTransform(8);
// db configs
env_->count_random_reads_ = true;
Options options = CurrentOptions();
options.env = env_;
options.no_block_cache = true;
options.filter_policy = NewBloomFilterPolicy(10);
options.prefix_extractor = prefix_extractor;
options.whole_key_filtering = false;
options.disable_auto_compactions = true;
options.max_background_compactions = 2;
options.create_if_missing = true;
options.disable_seek_compaction = true;
if (it == 0) {
options.memtable_factory.reset(NewHashSkipListRepFactory(
prefix_extractor));
} else {
options.memtable_factory = std::make_shared<PrefixHashRepFactory>(
prefix_extractor);
}
ReadOptions ro = ReadOptions();
int count;
Slice prefix;
Slice key;
char buf[100];
Iterator* iter;
snprintf(buf, sizeof(buf), "03______:");
prefix = Slice(buf, 8);
key = Slice(buf, 9);
auto prefix_extractor = NewFixedPrefixTransform(8);
// db configs
env_->count_random_reads_ = true;
Options options = CurrentOptions();
options.env = env_;
options.no_block_cache = true;
options.filter_policy = NewBloomFilterPolicy(10);
options.prefix_extractor = prefix_extractor;
options.whole_key_filtering = false;
options.disable_auto_compactions = true;
options.max_background_compactions = 2;
options.create_if_missing = true;
options.disable_seek_compaction = true;
options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor));
// prefix specified, with blooms: 2 RAND I/Os
// SeekToFirst
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
ro.prefix = &prefix;
iter = db_->NewIterator(ro);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 2);
// prefix specified, with blooms: 2 RAND I/Os
// SeekToFirst
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
ro.prefix = &prefix;
iter = db_->NewIterator(ro);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 2);
// prefix specified, with blooms: 2 RAND I/Os
// Seek
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
ro.prefix = &prefix;
iter = db_->NewIterator(ro);
for (iter->Seek(key); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 2);
// prefix specified, with blooms: 2 RAND I/Os
// Seek
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
ro.prefix = &prefix;
iter = db_->NewIterator(ro);
for (iter->Seek(key); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 2);
// no prefix specified: 11 RAND I/Os
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
iter = db_->NewIterator(ReadOptions());
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
if (! iter->key().starts_with(prefix)) {
break;
}
count++;
// no prefix specified: 11 RAND I/Os
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
iter = db_->NewIterator(ReadOptions());
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
if (! iter->key().starts_with(prefix)) {
break;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 11);
Close();
delete options.filter_policy;
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 11);
Close();
delete options.filter_policy;
}
std::string MakeKey(unsigned int num) {

@ -36,7 +36,7 @@ struct hash<rocksdb::Slice> {
namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp,
std::shared_ptr<MemTableRepFactory> table_factory,
MemTableRepFactory* table_factory,
int numlevel,
const Options& options)
: comparator_(cmp),
@ -281,7 +281,7 @@ bool MemTable::Update(SequenceNumber seq, ValueType type,
Slice memkey = lkey.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(
table_.get()->GetIterator(lkey.user_key()));
table_->GetIterator(lkey.user_key()));
iter->Seek(key, memkey.data());
if (iter->Valid()) {

@ -36,7 +36,7 @@ class MemTable {
// is zero and the caller must call Ref() at least once.
explicit MemTable(
const InternalKeyComparator& comparator,
std::shared_ptr<MemTableRepFactory> table_factory,
MemTableRepFactory* table_factory,
int numlevel = 7,
const Options& options = Options());

@ -80,7 +80,8 @@ Status MemTableList::InstallMemtableFlushResults(
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs) {
std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete) {
mu->AssertHeld();
// If the flush was not successful, then just reset state.
@ -151,7 +152,9 @@ Status MemTableList::InstallMemtableFlushResults(
// executing compaction threads do not mistakenly assume that this
// file is not live.
pending_outputs.erase(m->file_number_);
m->Unref();
if (m->Unref() != nullptr) {
to_delete->push_back(m);
}
size_--;
} else {
//commit failed. setup state so that we can flush again.

@ -65,7 +65,8 @@ class MemTableList {
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs);
std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().

@ -38,8 +38,8 @@ std::shared_ptr<DB> OpenDb() {
if (FLAGS_use_set_based_memetable) {
auto prefix_extractor = rocksdb::NewFixedPrefixTransform(0);
options.memtable_factory =
std::make_shared<rocksdb::PrefixHashRepFactory>(prefix_extractor);
options.memtable_factory.reset(
NewHashSkipListRepFactory(prefix_extractor));
}
Status s = DB::Open(options, kDbName, &db);

@ -11,7 +11,6 @@
#include "util/testharness.h"
DEFINE_bool(use_prefix_hash_memtable, true, "");
DEFINE_bool(use_nolock_version, true, "");
DEFINE_bool(trigger_deadlock, false,
"issue delete in range scan to trigger PrefixHashMap deadlock");
DEFINE_uint64(bucket_count, 100000, "number of buckets");

@ -196,7 +196,7 @@ class Repairer {
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_.memtable_factory,
MemTable* mem = new MemTable(icmp_, options_.memtable_factory.get(),
options_.num_levels);
mem->Ref();
int counter = 0;
@ -227,7 +227,7 @@ class Repairer {
table_cache_, iter, &meta,
icmp_.user_comparator(), 0, 0, true);
delete iter;
mem->Unref();
delete mem->Unref();
mem = nullptr;
if (status.ok()) {
if (meta.file_size > 0) {

@ -22,7 +22,7 @@ namespace rocksdb {
static std::string PrintContents(WriteBatch* b) {
InternalKeyComparator cmp(BytewiseComparator());
auto factory = std::make_shared<SkipListFactory>();
MemTable* mem = new MemTable(cmp, factory);
MemTable* mem = new MemTable(cmp, factory.get());
mem->Ref();
std::string state;
Options options;
@ -69,7 +69,7 @@ static std::string PrintContents(WriteBatch* b) {
} else if (count != WriteBatchInternal::Count(b)) {
state.append("CountMismatch()");
}
mem->Unref();
delete mem->Unref();
return state;
}

@ -40,6 +40,16 @@ class CompactionFilter {
// When the value is to be preserved, the application has the option
// to modify the existing_value and pass it back through new_value.
// value_changed needs to be set to true in this case.
//
// If multithreaded compaction is being used *and* a single CompactionFilter
// instance was supplied via Options::compaction_filter, this method may be
// called from different threads concurrently. The application must ensure
// that the call is thread-safe.
//
// If the CompactionFilter was created by a factory, then it will only ever
// be used by a single thread that is doing the compaction run, and this
// call does not need to be thread-safe. However, multiple filters may be
// in existence and operating concurrently.
virtual bool Filter(int level,
const Slice& key,
const Slice& existing_value,

@ -273,7 +273,7 @@ class DB {
// Sets iter to an iterator that is positioned at a write-batch containing
// seq_number. If the sequence number is non existent, it returns an iterator
// at the first available seq_no after the requested seq_no
// Returns Status::Ok if iterator is valid
// Returns Status::OK if iterator is valid
// Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to
// use this api, else the WAL files will get
// cleared aggressively and the iterator might keep getting invalid before
@ -292,6 +292,11 @@ class DB {
std::vector<LiveFileMetaData> *metadata) {
}
// Sets the globally unique ID created at database creation time by invoking
// Env::GenerateUniqueId(), in identity. Returns Status::OK if identity could
// be set properly
virtual Status GetDbIdentity(std::string& identity) = 0;
private:
// No copying allowed
DB(const DB&);

@ -17,21 +17,13 @@
// The factory will be passed an Arena object when a new MemTableRep is
// requested. The API for this object is in rocksdb/arena.h.
//
// Users can implement their own memtable representations. We include four
// Users can implement their own memtable representations. We include three
// types built in:
// - SkipListRep: This is the default; it is backed by a skip list.
// - TransformRep: This is backed by an custom hash map.
// On construction, they are given a SliceTransform object. This
// object is applied to the user key of stored items which indexes into the
// hash map to yield a skiplist containing all records that share the same
// user key under the transform function.
// - UnsortedRep: A subclass of TransformRep where the transform function is
// the identity function. Optimized for point lookups.
// - PrefixHashRep: A subclass of TransformRep where the transform function is
// a fixed-size prefix extractor. If you use PrefixHashRepFactory, the transform
// must be identical to options.prefix_extractor, otherwise it will be discarded
// and the default will be used. It is optimized for ranged scans over a
// prefix.
// - HashSkipListRep: The memtable rep that is best used for keys that are
// structured like "prefix:suffix" where iteration withing a prefix is
// common and iteration across different prefixes is rare. It is backed by
// a hash map where each bucket is a skip list.
// - VectorRep: This is backed by an unordered std::vector. On iteration, the
// vector is sorted. It is intelligent about sorting; once the MarkReadOnly()
// has been called, the vector will only be sorted once. It is optimized for
@ -186,16 +178,14 @@ public:
}
};
// TransformReps are backed by an unordered map of buffers to buckets. When
// looking up a key, the user key is extracted and a user-supplied transform
// function (see rocksdb/slice_transform.h) is applied to get the key into the
// unordered map. This allows the user to bin user keys based on arbitrary
// criteria. Two example implementations are UnsortedRepFactory and
// PrefixHashRepFactory.
// HashSkipListRep is backed by hash map of buckets. Each bucket is a skip
// list. All the keys with the same prefix will be in the same bucket.
// The prefix is determined using user supplied SliceTransform. It has
// to match prefix_extractor in options.prefix_extractor.
//
// Iteration over the entire collection is implemented by dumping all the keys
// into an std::set. Thus, these data structures are best used when iteration
// over the entire collection is rare.
// into a separate skip list. Thus, these data structures are best used when
// iteration over the entire collection is rare.
//
// Parameters:
// transform: The SliceTransform to bucket user keys on. TransformRepFactory

@ -15,11 +15,9 @@
#include <vector>
#include <stdint.h>
#include "rocksdb/memtablerep.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/universal_compaction.h"
@ -95,16 +93,33 @@ struct Options {
// Default: nullptr
shared_ptr<MergeOperator> merge_operator;
// The client must provide compaction_filter_factory if it requires a new
// compaction filter to be used for different compaction processes
// A single CompactionFilter instance to call into during compaction.
// Allows an application to modify/delete a key-value during background
// compaction.
// Ideally, client should specify only one of filter or factory.
//
// If the client requires a new compaction filter to be used for different
// compaction runs, it can specify compaction_filter_factory instead of this
// option. The client should specify only one of the two.
// compaction_filter takes precedence over compaction_filter_factory if
// client specifies both.
//
// If multithreaded compaction is being used, the supplied CompactionFilter
// instance may be used from different threads concurrently and so should be
// thread-safe.
//
// Default: nullptr
const CompactionFilter* compaction_filter;
// This is a factory that provides compaction filter objects which allow
// an application to modify/delete a key-value during background compaction.
//
// A new filter will be created on each compaction run. If multithreaded
// compaction is being used, each created CompactionFilter will only be used
// from a single thread and so does not need to be thread-safe.
//
// Default: a factory that doesn't provide any object
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
// If true, the database will be created if it is missing.
// Default: false
bool create_if_missing;
@ -602,11 +617,6 @@ struct Options {
// Table and TableBuilder.
std::shared_ptr<TableFactory> table_factory;
// This is a factory that provides compaction filter objects which allow
// an application to modify/delete a key-value during background compaction.
// Default: a factory that doesn't provide any object
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
// This option allows user to to collect their own interested statistics of
// the tables.
// Default: emtpy vector -- no user-defined statistics collection will be

@ -51,6 +51,11 @@ enum Tickers {
// # of times bloom filter has avoided file reads.
BLOOM_FILTER_USEFUL,
// # of memtable hits.
MEMTABLE_HIT,
// # of memtable misses.
MEMTABLE_MISS,
/**
* COMPACTION_KEY_DROP_* count the reasons for key drop during compaction
* There are 3 reasons currently.
@ -125,6 +130,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{ BLOCK_CACHE_DATA_MISS, "rocksdb.block.cache.data.miss" },
{ BLOCK_CACHE_DATA_HIT, "rocksdb.block.cache.data.hit" },
{ BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful" },
{ MEMTABLE_HIT, "rocksdb.memtable.hit" },
{ MEMTABLE_MISS, "rocksdb.memtable.miss" },
{ COMPACTION_KEY_DROP_NEWER_ENTRY, "rocksdb.compaction.key.drop.new" },
{ COMPACTION_KEY_DROP_OBSOLETE, "rocksdb.compaction.key.drop.obsolete" },
{ COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user" },

@ -140,6 +140,10 @@ class StackableDB : public DB {
return db_->DeleteFile(name);
}
virtual Status GetDbIdentity(std::string& identity) {
return db_->GetDbIdentity(identity);
}
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter)
override {

@ -100,9 +100,10 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
rep_->filter_block->StartBlock(0);
}
if (options.block_cache_compressed.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix(options.block_cache_compressed, file,
&rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size);
BlockBasedTable::GenerateCachePrefix(
options.block_cache_compressed.get(), file,
&rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size);
}
}

@ -97,18 +97,18 @@ void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) {
rep->cache_key_prefix_size = 0;
rep->compressed_cache_key_prefix_size = 0;
if (rep->options.block_cache != nullptr) {
GenerateCachePrefix(rep->options.block_cache, rep->file.get(),
GenerateCachePrefix(rep->options.block_cache.get(), rep->file.get(),
&rep->cache_key_prefix[0],
&rep->cache_key_prefix_size);
}
if (rep->options.block_cache_compressed != nullptr) {
GenerateCachePrefix(rep->options.block_cache_compressed, rep->file.get(),
&rep->compressed_cache_key_prefix[0],
GenerateCachePrefix(rep->options.block_cache_compressed.get(),
rep->file.get(), &rep->compressed_cache_key_prefix[0],
&rep->compressed_cache_key_prefix_size);
}
}
void BlockBasedTable::GenerateCachePrefix(shared_ptr<Cache> cc,
void BlockBasedTable::GenerateCachePrefix(Cache* cc,
RandomAccessFile* file, char* buffer, size_t* size) {
// generate an id from the file
@ -122,7 +122,7 @@ void BlockBasedTable::GenerateCachePrefix(shared_ptr<Cache> cc,
}
}
void BlockBasedTable::GenerateCachePrefix(shared_ptr<Cache> cc,
void BlockBasedTable::GenerateCachePrefix(Cache* cc,
WritableFile* file, char* buffer, size_t* size) {
// generate an id from the file

@ -167,9 +167,9 @@ class BlockBasedTable : public TableReader {
rep_ = rep;
}
// Generate a cache key prefix from the file
static void GenerateCachePrefix(shared_ptr<Cache> cc,
static void GenerateCachePrefix(Cache* cc,
RandomAccessFile* file, char* buffer, size_t* size);
static void GenerateCachePrefix(shared_ptr<Cache> cc,
static void GenerateCachePrefix(Cache* cc,
WritableFile* file, char* buffer, size_t* size);
// The longest prefix of the cache key used to identify blocks.

@ -370,15 +370,15 @@ class MemTableConstructor: public Constructor {
: Constructor(cmp),
internal_comparator_(cmp),
table_factory_(new SkipListFactory) {
memtable_ = new MemTable(internal_comparator_, table_factory_);
memtable_ = new MemTable(internal_comparator_, table_factory_.get());
memtable_->Ref();
}
~MemTableConstructor() {
memtable_->Unref();
delete memtable_->Unref();
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
memtable_->Unref();
memtable_ = new MemTable(internal_comparator_, table_factory_);
delete memtable_->Unref();
memtable_ = new MemTable(internal_comparator_, table_factory_.get());
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
@ -930,19 +930,19 @@ TEST(TableTest, NumBlockStat) {
class BlockCacheProperties {
public:
explicit BlockCacheProperties(std::shared_ptr<Statistics> statistics) {
explicit BlockCacheProperties(Statistics* statistics) {
block_cache_miss =
statistics.get()->getTickerCount(BLOCK_CACHE_MISS);
statistics->getTickerCount(BLOCK_CACHE_MISS);
block_cache_hit =
statistics.get()->getTickerCount(BLOCK_CACHE_HIT);
statistics->getTickerCount(BLOCK_CACHE_HIT);
index_block_cache_miss =
statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_MISS);
statistics->getTickerCount(BLOCK_CACHE_INDEX_MISS);
index_block_cache_hit =
statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_HIT);
statistics->getTickerCount(BLOCK_CACHE_INDEX_HIT);
data_block_cache_miss =
statistics.get()->getTickerCount(BLOCK_CACHE_DATA_MISS);
statistics->getTickerCount(BLOCK_CACHE_DATA_MISS);
data_block_cache_hit =
statistics.get()->getTickerCount(BLOCK_CACHE_DATA_HIT);
statistics->getTickerCount(BLOCK_CACHE_DATA_HIT);
}
// Check if the fetched props matches the expected ones.
@ -993,7 +993,7 @@ TEST(TableTest, BlockCacheTest) {
// At first, no block will be accessed.
{
BlockCacheProperties props(options.statistics);
BlockCacheProperties props(options.statistics.get());
// index will be added to block cache.
props.AssertEqual(
1, // index block miss
@ -1006,7 +1006,7 @@ TEST(TableTest, BlockCacheTest) {
// Only index block will be accessed
{
iter.reset(c.NewIterator());
BlockCacheProperties props(options.statistics);
BlockCacheProperties props(options.statistics.get());
// NOTE: to help better highlight the "detla" of each ticker, I use
// <last_value> + <added_value> to indicate the increment of changed
// value; other numbers remain the same.
@ -1021,7 +1021,7 @@ TEST(TableTest, BlockCacheTest) {
// Only data block will be accessed
{
iter->SeekToFirst();
BlockCacheProperties props(options.statistics);
BlockCacheProperties props(options.statistics.get());
props.AssertEqual(
1,
1,
@ -1034,7 +1034,7 @@ TEST(TableTest, BlockCacheTest) {
{
iter.reset(c.NewIterator());
iter->SeekToFirst();
BlockCacheProperties props(options.statistics);
BlockCacheProperties props(options.statistics.get());
props.AssertEqual(
1,
1 + 1, // index block hit
@ -1047,14 +1047,14 @@ TEST(TableTest, BlockCacheTest) {
// -- PART 2: Open without block cache
options.block_cache.reset();
options.statistics = CreateDBStatistics(); // reset the props
options.statistics = CreateDBStatistics(); // reset the stats
c.Reopen(options);
{
iter.reset(c.NewIterator());
iter->SeekToFirst();
ASSERT_EQ("key", iter->key().ToString());
BlockCacheProperties props(options.statistics);
BlockCacheProperties props(options.statistics.get());
// Nothing is affected at all
props.AssertEqual(0, 0, 0, 0);
}
@ -1065,7 +1065,7 @@ TEST(TableTest, BlockCacheTest) {
options.block_cache = NewLRUCache(1);
c.Reopen(options);
{
BlockCacheProperties props(options.statistics);
BlockCacheProperties props(options.statistics.get());
props.AssertEqual(
1, // index block miss
0,
@ -1080,7 +1080,7 @@ TEST(TableTest, BlockCacheTest) {
// It first cache index block then data block. But since the cache size
// is only 1, index block will be purged after data block is inserted.
iter.reset(c.NewIterator());
BlockCacheProperties props(options.statistics);
BlockCacheProperties props(options.statistics.get());
props.AssertEqual(
1 + 1, // index block miss
0,
@ -1093,7 +1093,7 @@ TEST(TableTest, BlockCacheTest) {
// SeekToFirst() accesses data block. With similar reason, we expect data
// block's cache miss.
iter->SeekToFirst();
BlockCacheProperties props(options.statistics);
BlockCacheProperties props(options.statistics.get());
props.AssertEqual(
2,
0,
@ -1268,7 +1268,7 @@ class MemTableTest { };
TEST(MemTableTest, Simple) {
InternalKeyComparator cmp(BytewiseComparator());
auto table_factory = std::make_shared<SkipListFactory>();
MemTable* memtable = new MemTable(cmp, table_factory);
MemTable* memtable = new MemTable(cmp, table_factory.get());
memtable->Ref();
WriteBatch batch;
Options options;
@ -1289,7 +1289,7 @@ TEST(MemTableTest, Simple) {
}
delete iter;
memtable->Unref();
delete memtable->Unref();
}

@ -305,8 +305,7 @@ DEFINE_bool(filter_deletes, false, "On true, deletes use KeyMayExist to drop"
enum RepFactory {
kSkipList,
kPrefixHash,
kUnsorted,
kHashSkipList,
kVectorRep
};
enum RepFactory StringToRepFactory(const char* ctype) {
@ -315,9 +314,7 @@ enum RepFactory StringToRepFactory(const char* ctype) {
if (!strcasecmp(ctype, "skip_list"))
return kSkipList;
else if (!strcasecmp(ctype, "prefix_hash"))
return kPrefixHash;
else if (!strcasecmp(ctype, "unsorted"))
return kUnsorted;
return kHashSkipList;
else if (!strcasecmp(ctype, "vector"))
return kVectorRep;
@ -335,7 +332,7 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
}
return true;
}
DEFINE_int32(prefix_size, 0, "Control the prefix size for PrefixHashRep");
DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipListRep");
static const bool FLAGS_prefix_size_dummy =
google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
@ -1338,12 +1335,9 @@ class StressTest {
case kSkipList:
memtablerep = "skip_list";
break;
case kPrefixHash:
case kHashSkipList:
memtablerep = "prefix_hash";
break;
case kUnsorted:
memtablerep = "unsorted";
break;
case kVectorRep:
memtablerep = "vector";
break;
@ -1393,21 +1387,15 @@ class StressTest {
FLAGS_delete_obsolete_files_period_micros;
options.max_manifest_file_size = 1024;
options.filter_deletes = FLAGS_filter_deletes;
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) {
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) {
fprintf(stderr,
"prefix_size should be non-zero iff memtablerep == prefix_hash\n");
exit(1);
}
switch (FLAGS_rep_factory) {
case kPrefixHash:
options.memtable_factory.reset(
new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size))
);
break;
case kUnsorted:
options.memtable_factory.reset(
new UnsortedRepFactory()
);
case kHashSkipList:
options.memtable_factory.reset(NewHashSkipListRepFactory(
NewFixedPrefixTransform(FLAGS_prefix_size)));
break;
case kSkipList:
// no need to do anything

@ -4,6 +4,8 @@
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "util/hash_skiplist_rep.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/arena.h"
#include "rocksdb/slice.h"
@ -309,39 +311,12 @@ std::shared_ptr<MemTableRep::Iterator>
} // anon namespace
class HashSkipListRepFactory : public MemTableRepFactory {
public:
explicit HashSkipListRepFactory(
const SliceTransform* transform,
size_t bucket_count,
int32_t skiplist_height,
int32_t skiplist_branching_factor)
: transform_(transform),
bucket_count_(bucket_count),
skiplist_height_(skiplist_height),
skiplist_branching_factor_(skiplist_branching_factor) { }
virtual ~HashSkipListRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) override {
return std::make_shared<HashSkipListRep>(compare, arena, transform_,
bucket_count_, skiplist_height_,
skiplist_branching_factor_);
}
virtual const char* Name() const override {
return "HashSkipListRepFactory";
}
const SliceTransform* GetTransform() { return transform_; }
private:
const SliceTransform* transform_;
const size_t bucket_count_;
const int32_t skiplist_height_;
const int32_t skiplist_branching_factor_;
};
std::shared_ptr<MemTableRep> HashSkipListRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::make_shared<HashSkipListRep>(compare, arena, transform_,
bucket_count_, skiplist_height_,
skiplist_branching_factor_);
}
MemTableRepFactory* NewHashSkipListRepFactory(
const SliceTransform* transform, size_t bucket_count,

@ -0,0 +1,45 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/slice_transform.h"
#include "rocksdb/memtablerep.h"
namespace rocksdb {
class HashSkipListRepFactory : public MemTableRepFactory {
public:
explicit HashSkipListRepFactory(
const SliceTransform* transform,
size_t bucket_count,
int32_t skiplist_height,
int32_t skiplist_branching_factor)
: transform_(transform),
bucket_count_(bucket_count),
skiplist_height_(skiplist_height),
skiplist_branching_factor_(skiplist_branching_factor) { }
virtual ~HashSkipListRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) override;
virtual const char* Name() const override {
return "HashSkipListRepFactory";
}
const SliceTransform* GetTransform() { return transform_; }
private:
const SliceTransform* transform_;
const size_t bucket_count_;
const int32_t skiplist_height_;
const int32_t skiplist_branching_factor_;
};
}

@ -1226,25 +1226,41 @@ void ChangeCompactionStyleCommand::DoCommand() {
class InMemoryHandler : public WriteBatch::Handler {
public:
InMemoryHandler(stringstream& row, bool print_values) : Handler(),row_(row) {
print_values_ = print_values;
}
virtual void Put(const Slice& key, const Slice& value) {
putMap_[key.ToString()] = value.ToString();
void commonPutMerge(const Slice& key, const Slice& value) {
string k = LDBCommand::StringToHex(key.ToString());
if (print_values_) {
string v = LDBCommand::StringToHex(value.ToString());
row_ << k << " : ";
row_ << v << " ";
} else {
row_ << k << " ";
}
}
virtual void Delete(const Slice& key) {
deleteList_.push_back(key.ToString(true));
virtual void Put(const Slice& key, const Slice& value) {
row_ << "PUT : ";
commonPutMerge(key, value);
}
virtual ~InMemoryHandler() { };
map<string, string> PutMap() {
return putMap_;
virtual void Merge(const Slice& key, const Slice& value) {
row_ << "MERGE : ";
commonPutMerge(key, value);
}
vector<string> DeleteList() {
return deleteList_;
virtual void Delete(const Slice& key) {
row_ <<",DELETE : ";
row_ << LDBCommand::StringToHex(key.ToString()) << " ";
}
virtual ~InMemoryHandler() { };
private:
map<string, string> putMap_;
vector<string> deleteList_;
stringstream & row_;
bool print_values_;
};
const string WALDumperCommand::ARG_WAL_FILE = "walfile";
@ -1322,26 +1338,8 @@ void WALDumperCommand::DoCommand() {
row<<WriteBatchInternal::Count(&batch)<<",";
row<<WriteBatchInternal::ByteSize(&batch)<<",";
row<<reader.LastRecordOffset()<<",";
InMemoryHandler handler;
InMemoryHandler handler(row, print_values_);
batch.Iterate(&handler);
row << "PUT : ";
if (print_values_) {
for (auto& kv : handler.PutMap()) {
string k = StringToHex(kv.first);
string v = StringToHex(kv.second);
row << k << " : ";
row << v << " ";
}
}
else {
for(auto& kv : handler.PutMap()) {
row << StringToHex(kv.first) << " ";
}
}
row<<",DELETE : ";
for(string& s : handler.DeleteList()) {
row << StringToHex(s) << " ";
}
row<<"\n";
}
cout<<row.str();

@ -25,6 +25,9 @@ Options::Options()
: comparator(BytewiseComparator()),
merge_operator(nullptr),
compaction_filter(nullptr),
compaction_filter_factory(
std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())),
create_if_missing(false),
error_if_exists(false),
paranoid_checks(false),
@ -97,9 +100,6 @@ Options::Options()
memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)),
table_factory(
std::shared_ptr<TableFactory>(new BlockBasedTableFactory())),
compaction_filter_factory(
std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())),
inplace_update_support(false),
inplace_update_num_locks(10000) {
assert(memtable_factory.get() != nullptr);
@ -278,6 +278,9 @@ Options::Dump(Logger* log) const
Log(log,"Options.compaction_options_universal."
"max_size_amplification_percent: %u",
compaction_options_universal.max_size_amplification_percent);
Log(log,
"Options.compaction_options_universal.compression_size_percent: %u",
compaction_options_universal.compression_size_percent);
std::string collector_names;
for (auto collector : table_properties_collectors) {
collector_names.append(collector->Name());

@ -28,24 +28,5 @@ namespace stl_wrappers {
}
};
struct Hash {
inline size_t operator()(const char* buf) const {
Slice internal_key = GetLengthPrefixedSlice(buf);
Slice value =
GetLengthPrefixedSlice(internal_key.data() + internal_key.size());
unsigned int hval = MurmurHash(internal_key.data(), internal_key.size(),
0);
hval = MurmurHash(value.data(), value.size(), hval);
return hval;
}
};
struct KeyEqual : private Base {
explicit KeyEqual(const MemTableRep::KeyComparator& compare)
: Base(compare) { }
inline bool operator()(const char* a, const char* b) const {
return this->compare_(a, b) == 0;
}
};
}
}

@ -1,426 +0,0 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include <unordered_map>
#include <set>
#include <vector>
#include <algorithm>
#include <iostream>
#include "rocksdb/memtablerep.h"
#include "rocksdb/arena.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "db/memtable.h"
#include "port/port.h"
#include "util/mutexlock.h"
#include "util/murmurhash.h"
#include "util/stl_wrappers.h"
namespace std {
template <>
struct hash<rocksdb::Slice> {
size_t operator()(const rocksdb::Slice& slice) const {
return MurmurHash(slice.data(), slice.size(), 0);
}
};
}
namespace rocksdb {
namespace {
using namespace stl_wrappers;
class TransformRep : public MemTableRep {
public:
TransformRep(const KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size,
size_t num_locks);
virtual void Insert(const char* key) override;
virtual bool Contains(const char* key) const override;
virtual size_t ApproximateMemoryUsage() override;
virtual ~TransformRep() { }
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator() override;
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator(
const Slice& slice) override;
virtual std::shared_ptr<MemTableRep::Iterator> GetDynamicPrefixIterator()
override {
return std::make_shared<DynamicPrefixIterator>(*this);
}
std::shared_ptr<MemTableRep::Iterator> GetTransformIterator(
const Slice& transformed);
private:
friend class DynamicPrefixIterator;
typedef std::set<const char*, Compare> Bucket;
typedef std::unordered_map<Slice, std::shared_ptr<Bucket>> BucketMap;
// Maps slices (which are transformed user keys) to buckets of keys sharing
// the same transform.
BucketMap buckets_;
// rwlock_ protects access to the buckets_ data structure itself. Each bucket
// has its own read-write lock as well.
mutable port::RWMutex rwlock_;
// Keep track of approximately how much memory is being used.
size_t memory_usage_ = 0;
// The user-supplied transform whose domain is the user keys.
const SliceTransform* transform_;
// Get a bucket from buckets_. If the bucket hasn't been initialized yet,
// initialize it before returning. Must be externally synchronized.
std::shared_ptr<Bucket>& GetBucket(const Slice& transformed);
port::RWMutex* GetLock(const Slice& transformed) const;
mutable std::vector<port::RWMutex> locks_;
const KeyComparator& compare_;
class Iterator : public MemTableRep::Iterator {
public:
explicit Iterator(std::shared_ptr<Bucket> items);
virtual ~Iterator() { };
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const;
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const;
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next();
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev();
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& user_key, const char* memtable_key);
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst();
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast();
private:
std::shared_ptr<Bucket> items_;
Bucket::const_iterator cit_;
std::string tmp_; // For passing to EncodeKey
};
class EmptyIterator : public MemTableRep::Iterator {
// This is used when there wasn't a bucket. It is cheaper than
// instantiating an empty bucket over which to iterate.
public:
virtual bool Valid() const {
return false;
}
virtual const char* key() const {
assert(false);
return nullptr;
}
virtual void Next() { }
virtual void Prev() { }
virtual void Seek(const Slice& user_key, const char* memtable_key) { }
virtual void SeekToFirst() { }
virtual void SeekToLast() { }
static std::shared_ptr<EmptyIterator> GetInstance();
private:
static std::shared_ptr<EmptyIterator> instance;
EmptyIterator() { }
};
class TransformIterator : public Iterator {
public:
explicit TransformIterator(std::shared_ptr<Bucket> items,
port::RWMutex* rwlock);
virtual ~TransformIterator() { }
private:
const ReadLock l_;
};
class DynamicPrefixIterator : public MemTableRep::Iterator {
private:
// the underlying memtable rep
const TransformRep& memtable_rep_;
// the result of a prefix seek
std::unique_ptr<MemTableRep::Iterator> bucket_iterator_;
public:
explicit DynamicPrefixIterator(const TransformRep& memtable_rep)
: memtable_rep_(memtable_rep) {}
virtual ~DynamicPrefixIterator() { };
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const {
return bucket_iterator_ && bucket_iterator_->Valid();
}
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const {
assert(Valid());
return bucket_iterator_->key();
}
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() {
assert(Valid());
bucket_iterator_->Next();
}
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() {
assert(Valid());
bucket_iterator_->Prev();
}
// Advance to the first entry with a key >= target within the
// same bucket as target
virtual void Seek(const Slice& user_key, const char* memtable_key) {
Slice prefix = memtable_rep_.transform_->Transform(user_key);
ReadLock l(&memtable_rep_.rwlock_);
auto bucket = memtable_rep_.buckets_.find(prefix);
if (bucket == memtable_rep_.buckets_.end()) {
bucket_iterator_.reset(nullptr);
} else {
bucket_iterator_.reset(
new TransformIterator(bucket->second, memtable_rep_.GetLock(prefix)));
bucket_iterator_->Seek(user_key, memtable_key);
}
}
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() {
// Prefix iterator does not support total order.
// We simply set the iterator to invalid state
bucket_iterator_.reset(nullptr);
}
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast() {
// Prefix iterator does not support total order.
// We simply set the iterator to invalid state
bucket_iterator_.reset(nullptr);
}
};
};
class PrefixHashRep : public TransformRep {
public:
PrefixHashRep(const KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size,
size_t num_locks)
: TransformRep(compare, arena, transform,
bucket_size, num_locks) { }
virtual std::shared_ptr<MemTableRep::Iterator> GetPrefixIterator(
const Slice& prefix) override;
};
std::shared_ptr<TransformRep::Bucket>& TransformRep::GetBucket(
const Slice& transformed) {
WriteLock l(&rwlock_);
auto& bucket = buckets_[transformed];
if (!bucket) {
bucket.reset(
new decltype(buckets_)::mapped_type::element_type(Compare(compare_)));
// To memory_usage_ we add the size of the std::set and the size of the
// std::pair (decltype(buckets_)::value_type) which includes the
// Slice and the std::shared_ptr
memory_usage_ += sizeof(*bucket) +
sizeof(decltype(buckets_)::value_type);
}
return bucket;
}
port::RWMutex* TransformRep::GetLock(const Slice& transformed) const {
return &locks_[std::hash<Slice>()(transformed) % locks_.size()];
}
TransformRep::TransformRep(const KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size,
size_t num_locks)
: buckets_(bucket_size),
transform_(transform),
locks_(num_locks),
compare_(compare) { }
void TransformRep::Insert(const char* key) {
assert(!Contains(key));
auto transformed = transform_->Transform(UserKey(key));
auto& bucket = GetBucket(transformed);
WriteLock bl(GetLock(transformed));
bucket->insert(key);
memory_usage_ += sizeof(key);
}
bool TransformRep::Contains(const char* key) const {
ReadLock l(&rwlock_);
auto transformed = transform_->Transform(UserKey(key));
auto bucket = buckets_.find(transformed);
if (bucket == buckets_.end()) {
return false;
}
ReadLock bl(GetLock(transformed));
return bucket->second->count(key) != 0;
}
size_t TransformRep::ApproximateMemoryUsage() {
return memory_usage_;
}
std::shared_ptr<TransformRep::EmptyIterator>
TransformRep::EmptyIterator::GetInstance() {
if (!instance) {
instance.reset(new TransformRep::EmptyIterator);
}
return instance;
}
TransformRep::Iterator::Iterator(std::shared_ptr<Bucket> items)
: items_(items),
cit_(items_->begin()) { }
// Returns true iff the iterator is positioned at a valid node.
bool TransformRep::Iterator::Valid() const {
return cit_ != items_->end();
}
// Returns the key at the current position.
// REQUIRES: Valid()
const char* TransformRep::Iterator::key() const {
assert(Valid());
return *cit_;
}
// Advances to the next position.
// REQUIRES: Valid()
void TransformRep::Iterator::Next() {
assert(Valid());
if (cit_ == items_->end()) {
return;
}
++cit_;
}
// Advances to the previous position.
// REQUIRES: Valid()
void TransformRep::Iterator::Prev() {
assert(Valid());
if (cit_ == items_->begin()) {
// If you try to go back from the first element, the iterator should be
// invalidated. So we set it to past-the-end. This means that you can
// treat the container circularly.
cit_ = items_->end();
} else {
--cit_;
}
}
// Advance to the first entry with a key >= target
void TransformRep::Iterator::Seek(const Slice& user_key,
const char* memtable_key) {
const char* encoded_key =
(memtable_key != nullptr) ? memtable_key : EncodeKey(&tmp_, user_key);
cit_ = items_->lower_bound(encoded_key);
}
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
void TransformRep::Iterator::SeekToFirst() {
cit_ = items_->begin();
}
void TransformRep::Iterator::SeekToLast() {
cit_ = items_->end();
if (items_->size() != 0) {
--cit_;
}
}
TransformRep::TransformIterator::TransformIterator(
std::shared_ptr<Bucket> items, port::RWMutex* rwlock)
: Iterator(items), l_(rwlock) { }
std::shared_ptr<MemTableRep::Iterator> TransformRep::GetIterator() {
auto items = std::make_shared<Bucket>(Compare(compare_));
// Hold read locks on all locks
ReadLock l(&rwlock_);
std::for_each(locks_.begin(), locks_.end(), [] (port::RWMutex& lock) {
lock.ReadLock();
});
for (auto& bucket : buckets_) {
items->insert(bucket.second->begin(), bucket.second->end());
}
std::for_each(locks_.begin(), locks_.end(), [] (port::RWMutex& lock) {
lock.Unlock();
});
return std::make_shared<Iterator>(std::move(items));
}
std::shared_ptr<MemTableRep::Iterator> TransformRep::GetTransformIterator(
const Slice& transformed) {
ReadLock l(&rwlock_);
auto bucket = buckets_.find(transformed);
if (bucket == buckets_.end()) {
return EmptyIterator::GetInstance();
}
return std::make_shared<TransformIterator>(bucket->second,
GetLock(transformed));
}
std::shared_ptr<MemTableRep::Iterator> TransformRep::GetIterator(
const Slice& slice) {
auto transformed = transform_->Transform(slice);
return GetTransformIterator(transformed);
}
std::shared_ptr<TransformRep::EmptyIterator>
TransformRep::EmptyIterator::instance;
} // anon namespace
std::shared_ptr<MemTableRep> TransformRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::make_shared<TransformRep>(compare, arena, transform_,
bucket_count_, num_locks_);
}
std::shared_ptr<MemTableRep> PrefixHashRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::make_shared<PrefixHashRep>(compare, arena, transform_,
bucket_count_, num_locks_);
}
std::shared_ptr<MemTableRep::Iterator> PrefixHashRep::GetPrefixIterator(
const Slice& prefix) {
return TransformRep::GetTransformIterator(prefix);
}
} // namespace rocksdb

@ -291,6 +291,10 @@ Status DBWithTTL::DeleteFile(std::string name) {
return db_->DeleteFile(name);
}
Status DBWithTTL::GetDbIdentity(std::string& identity) {
return db_->GetDbIdentity(identity);
}
Status DBWithTTL::GetUpdatesSince(
SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter) {

@ -84,6 +84,8 @@ class DBWithTTL : public StackableDB {
virtual Status DeleteFile(std::string name);
virtual Status GetDbIdentity(std::string& identity);
virtual SequenceNumber GetLatestSequenceNumber() const;
virtual Status GetUpdatesSince(SequenceNumber seq_number,

Loading…
Cancel
Save