Merge branch 'master' into columnfamilies

Conflicts:
	db/compaction_picker.cc
	db/db_impl.cc
	db/db_impl.h
	db/tailing_iter.cc
	db/version_set.h
	include/rocksdb/options.h
	util/options.cc
main
Igor Canadi 11 years ago
commit 9634ba42ac
  1. 3
      HISTORY.md
  2. 28
      db/c.cc
  3. 14
      db/column_family.cc
  4. 1
      db/column_family.h
  5. 2
      db/compaction_picker.cc
  6. 9
      db/db_bench.cc
  7. 125
      db/db_impl.cc
  8. 13
      db/db_impl.h
  9. 39
      db/db_test.cc
  10. 5
      db/memtable.cc
  11. 6
      db/plain_table_db_test.cc
  12. 11
      db/prefix_test.cc
  13. 6
      db/skiplist.h
  14. 2
      db/tailing_iter.cc
  15. 2
      db/version_set.cc
  16. 4
      db/version_set.h
  17. 24
      include/rocksdb/env.h
  18. 20
      include/rocksdb/memtablerep.h
  19. 7
      include/rocksdb/options.h
  20. 2
      include/rocksdb/statistics.h
  21. 3
      include/utilities/backupable_db.h
  22. 4
      table/filter_block.cc
  23. 8
      table/table_reader_bench.cc
  24. 11
      table/table_test.cc
  25. 14
      tools/db_stress.cc
  26. 6
      tools/ldb_test.py
  27. 2
      tools/sst_dump.cc
  28. 73
      util/env.cc
  29. 18
      util/hash_linklist_rep.cc
  30. 17
      util/hash_linklist_rep.h
  31. 21
      util/hash_skiplist_rep.cc
  32. 14
      util/hash_skiplist_rep.h
  33. 67
      util/log_buffer.cc
  34. 46
      util/log_buffer.h
  35. 4
      util/options.cc
  36. 11
      util/skiplistrep.cc
  37. 3
      util/vectorrep.cc
  38. 13
      utilities/backupable/backupable_db.cc

@ -14,6 +14,9 @@
* Added is_manual_compaction to CompactionFilter::Context * Added is_manual_compaction to CompactionFilter::Context
* Added "virtual void WaitForJoin() = 0" in class Env * Added "virtual void WaitForJoin() = 0" in class Env
* Removed BackupEngine::DeleteBackupsNewerThan() function * Removed BackupEngine::DeleteBackupsNewerThan() function
* Added new option -- verify_checksums_in_compaction
* Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership)
Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly)
### New Features ### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files, * If we find one truncated record at the end of the MANIFEST or WAL files,

@ -294,10 +294,10 @@ struct rocksdb_universal_compaction_options_t {
}; };
static bool SaveError(char** errptr, const Status& s) { static bool SaveError(char** errptr, const Status& s) {
assert(errptr != NULL); assert(errptr != nullptr);
if (s.ok()) { if (s.ok()) {
return false; return false;
} else if (*errptr == NULL) { } else if (*errptr == nullptr) {
*errptr = strdup(s.ToString().c_str()); *errptr = strdup(s.ToString().c_str());
} else { } else {
// TODO(sanjay): Merge with existing error? // TODO(sanjay): Merge with existing error?
@ -319,7 +319,7 @@ rocksdb_t* rocksdb_open(
char** errptr) { char** errptr) {
DB* db; DB* db;
if (SaveError(errptr, DB::Open(options->rep, std::string(name), &db))) { if (SaveError(errptr, DB::Open(options->rep, std::string(name), &db))) {
return NULL; return nullptr;
} }
rocksdb_t* result = new rocksdb_t; rocksdb_t* result = new rocksdb_t;
result->rep = db; result->rep = db;
@ -373,7 +373,7 @@ char* rocksdb_get(
const char* key, size_t keylen, const char* key, size_t keylen,
size_t* vallen, size_t* vallen,
char** errptr) { char** errptr) {
char* result = NULL; char* result = nullptr;
std::string tmp; std::string tmp;
Status s = db->rep->Get(options->rep, Slice(key, keylen), &tmp); Status s = db->rep->Get(options->rep, Slice(key, keylen), &tmp);
if (s.ok()) { if (s.ok()) {
@ -418,7 +418,7 @@ char* rocksdb_property_value(
// We use strdup() since we expect human readable output. // We use strdup() since we expect human readable output.
return strdup(tmp.c_str()); return strdup(tmp.c_str());
} else { } else {
return NULL; return nullptr;
} }
} }
@ -456,9 +456,9 @@ void rocksdb_compact_range(
const char* limit_key, size_t limit_key_len) { const char* limit_key, size_t limit_key_len) {
Slice a, b; Slice a, b;
db->rep->CompactRange( db->rep->CompactRange(
// Pass NULL Slice if corresponding "const char*" is NULL // Pass nullptr Slice if corresponding "const char*" is nullptr
(start_key ? (a = Slice(start_key, start_key_len), &a) : NULL), (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : NULL)); (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr));
} }
void rocksdb_flush( void rocksdb_flush(
@ -647,7 +647,7 @@ void rocksdb_options_set_paranoid_checks(
} }
void rocksdb_options_set_env(rocksdb_options_t* opt, rocksdb_env_t* env) { void rocksdb_options_set_env(rocksdb_options_t* opt, rocksdb_env_t* env) {
opt->rep.env = (env ? env->rep : NULL); opt->rep.env = (env ? env->rep : nullptr);
} }
void rocksdb_options_set_info_log(rocksdb_options_t* opt, rocksdb_logger_t* l) { void rocksdb_options_set_info_log(rocksdb_options_t* opt, rocksdb_logger_t* l) {
@ -765,7 +765,7 @@ void rocksdb_options_set_compression_options(
void rocksdb_options_set_prefix_extractor( void rocksdb_options_set_prefix_extractor(
rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) { rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) {
opt->rep.prefix_extractor = prefix_extractor; opt->rep.prefix_extractor.reset(prefix_extractor);
} }
void rocksdb_options_set_whole_key_filtering( void rocksdb_options_set_whole_key_filtering(
@ -1087,8 +1087,8 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_bloom(int bits_per_key) {
}; };
Wrapper* wrapper = new Wrapper; Wrapper* wrapper = new Wrapper;
wrapper->rep_ = NewBloomFilterPolicy(bits_per_key); wrapper->rep_ = NewBloomFilterPolicy(bits_per_key);
wrapper->state_ = NULL; wrapper->state_ = nullptr;
wrapper->delete_filter_ = NULL; wrapper->delete_filter_ = nullptr;
wrapper->destructor_ = &Wrapper::DoNothing; wrapper->destructor_ = &Wrapper::DoNothing;
return wrapper; return wrapper;
} }
@ -1154,7 +1154,7 @@ void rocksdb_readoptions_set_prefix_seek(
void rocksdb_readoptions_set_snapshot( void rocksdb_readoptions_set_snapshot(
rocksdb_readoptions_t* opt, rocksdb_readoptions_t* opt,
const rocksdb_snapshot_t* snap) { const rocksdb_snapshot_t* snap) {
opt->rep.snapshot = (snap ? snap->rep : NULL); opt->rep.snapshot = (snap ? snap->rep : nullptr);
} }
void rocksdb_readoptions_set_prefix( void rocksdb_readoptions_set_prefix(
@ -1280,7 +1280,7 @@ rocksdb_slicetransform_t* rocksdb_slicetransform_create_fixed_prefix(size_t pref
}; };
Wrapper* wrapper = new Wrapper; Wrapper* wrapper = new Wrapper;
wrapper->rep_ = rocksdb::NewFixedPrefixTransform(prefixLen); wrapper->rep_ = rocksdb::NewFixedPrefixTransform(prefixLen);
wrapper->state_ = NULL; wrapper->state_ = nullptr;
wrapper->destructor_ = &Wrapper::DoNothing; wrapper->destructor_ = &Wrapper::DoNothing;
return wrapper; return wrapper;
} }

@ -40,9 +40,11 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
} }
db_->FindObsoleteFiles(deletion_state, false, true); db_->FindObsoleteFiles(deletion_state, false, true);
mutex_->Unlock(); mutex_->Unlock();
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state); db_->PurgeObsoleteFiles(deletion_state);
} }
} }
}
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); } uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
@ -84,13 +86,11 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
if (result.soft_rate_limit > result.hard_rate_limit) { if (result.soft_rate_limit > result.hard_rate_limit) {
result.soft_rate_limit = result.hard_rate_limit; result.soft_rate_limit = result.hard_rate_limit;
} }
if (result.prefix_extractor) { if (!result.prefix_extractor) {
// If a prefix extractor has been supplied and a HashSkipListRepFactory is assert(result.memtable_factory);
// being used, make sure that the latter uses the former as its transform Slice name = result.memtable_factory->Name();
// function. if (name.compare("HashSkipListRepFactory") == 0 ||
auto factory = name.compare("HashLinkListRepFactory") == 0) {
dynamic_cast<HashSkipListRepFactory*>(result.memtable_factory.get());
if (factory && factory->GetTransform() != result.prefix_extractor) {
result.memtable_factory = std::make_shared<SkipListFactory>(); result.memtable_factory = std::make_shared<SkipListFactory>();
} }
} }

@ -33,6 +33,7 @@ class InternalKey;
class InternalStats; class InternalStats;
class ColumnFamilyData; class ColumnFamilyData;
class DBImpl; class DBImpl;
class LogBuffer;
class ColumnFamilyHandleImpl : public ColumnFamilyHandle { class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
public: public:

@ -10,6 +10,8 @@
#include "db/compaction_picker.h" #include "db/compaction_picker.h"
#include <limits> #include <limits>
#include "util/log_buffer.h"
#include "util/statistics.h"
namespace rocksdb { namespace rocksdb {

@ -1538,9 +1538,10 @@ class Benchmark {
options.compaction_style = FLAGS_compaction_style_e; options.compaction_style = FLAGS_compaction_style_e;
options.block_size = FLAGS_block_size; options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;
options.prefix_extractor = if (FLAGS_use_plain_table || FLAGS_use_prefix_blooms) {
(FLAGS_use_plain_table || FLAGS_use_prefix_blooms) ? prefix_extractor_ options.prefix_extractor.reset(
: nullptr; NewFixedPrefixTransform(FLAGS_prefix_size));
}
options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits;
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
options.statistics = dbstats; options.statistics = dbstats;
@ -1564,7 +1565,6 @@ class Benchmark {
switch (FLAGS_rep_factory) { switch (FLAGS_rep_factory) {
case kPrefixHash: case kPrefixHash:
options.memtable_factory.reset(NewHashSkipListRepFactory( options.memtable_factory.reset(NewHashSkipListRepFactory(
prefix_extractor_,
FLAGS_hash_bucket_count)); FLAGS_hash_bucket_count));
break; break;
case kSkipList: case kSkipList:
@ -1572,7 +1572,6 @@ class Benchmark {
break; break;
case kHashLinkedList: case kHashLinkedList:
options.memtable_factory.reset(NewHashLinkListRepFactory( options.memtable_factory.reset(NewHashLinkListRepFactory(
prefix_extractor_,
FLAGS_hash_bucket_count)); FLAGS_hash_bucket_count));
break; break;
case kVectorRep: case kVectorRep:

@ -61,7 +61,9 @@
#include "util/build_version.h" #include "util/build_version.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/hash_skiplist_rep.h" #include "util/hash_skiplist_rep.h"
#include "util/hash_linklist_rep.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
@ -320,9 +322,11 @@ DBImpl::~DBImpl() {
FindObsoleteFiles(deletion_state, true); FindObsoleteFiles(deletion_state, true);
// manifest number starting from 2 // manifest number starting from 2
deletion_state.manifest_file_number = 1; deletion_state.manifest_file_number = 1;
if (deletion_state.HaveSomethingToDelete()) {
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
} }
} }
}
mutex_.Unlock(); mutex_.Unlock();
if (default_cf_handle_ != nullptr) { if (default_cf_handle_ != nullptr) {
@ -536,12 +540,8 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// files in sst_delete_files and log_delete_files. // files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method. // It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) { void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// check if there is anything to do // we'd better have sth to delete
if (state.candidate_files.empty() && assert(state.HaveSomethingToDelete());
state.sst_delete_files.empty() &&
state.log_delete_files.empty()) {
return;
}
// this checks if FindObsoleteFiles() was run before. If not, don't do // this checks if FindObsoleteFiles() was run before. If not, don't do
// PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
@ -549,7 +549,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
if (state.manifest_file_number == 0) { if (state.manifest_file_number == 0) {
return; return;
} }
std::vector<std::string> old_log_files;
// Now, convert live list to an unordered set, WITHOUT mutex held; // Now, convert live list to an unordered set, WITHOUT mutex held;
// set is slow. // set is slow.
@ -587,6 +587,8 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
candidate_files.end() candidate_files.end()
); );
std::vector<std::string> old_info_log_files;
for (const auto& to_delete : candidate_files) { for (const auto& to_delete : candidate_files) {
uint64_t number; uint64_t number;
FileType type; FileType type;
@ -617,7 +619,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
case kInfoLogFile: case kInfoLogFile:
keep = true; keep = true;
if (number != 0) { if (number != 0) {
old_log_files.push_back(to_delete); old_info_log_files.push_back(to_delete);
} }
break; break;
case kCurrentFile: case kCurrentFile:
@ -636,44 +638,40 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// evict from cache // evict from cache
TableCache::Evict(table_cache_.get(), number); TableCache::Evict(table_cache_.get(), number);
} }
std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) + std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
"/" + to_delete; "/" + to_delete;
Log(options_.info_log,
"Delete type=%d #%lu",
int(type),
(unsigned long)number);
if (type == kLogFile && if (type == kLogFile &&
(options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
Status s = env_->RenameFile(fname, auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number);
ArchivedLogFileName(options_.wal_dir, number)); Status s = env_->RenameFile(fname, archived_log_name);
if (!s.ok()) {
Log(options_.info_log, Log(options_.info_log,
"RenameFile logfile #%lu FAILED -- %s\n", "Move log file %s to %s -- %s\n",
(unsigned long)number, s.ToString().c_str()); fname.c_str(), archived_log_name.c_str(), s.ToString().c_str());
}
} else { } else {
Status s = env_->DeleteFile(fname); Status s = env_->DeleteFile(fname);
if (!s.ok()) { Log(options_.info_log, "Delete %s type=%d #%lu -- %s\n",
Log(options_.info_log, "Delete type=%d #%lu FAILED -- %s\n", fname.c_str(), type, (unsigned long)number,
int(type), (unsigned long)number, s.ToString().c_str()); s.ToString().c_str());
}
} }
} }
// Delete old info log files. // Delete old info log files.
size_t old_log_file_count = old_log_files.size(); size_t old_info_log_file_count = old_info_log_files.size();
// NOTE: Currently we only support log purge when options_.db_log_dir is // NOTE: Currently we only support log purge when options_.db_log_dir is
// located in `dbname` directory. // located in `dbname` directory.
if (old_log_file_count >= options_.keep_log_file_num && if (old_info_log_file_count >= options_.keep_log_file_num &&
options_.db_log_dir.empty()) { options_.db_log_dir.empty()) {
std::sort(old_log_files.begin(), old_log_files.end()); std::sort(old_info_log_files.begin(), old_info_log_files.end());
size_t end = old_log_file_count - options_.keep_log_file_num; size_t end = old_info_log_file_count - options_.keep_log_file_num;
for (unsigned int i = 0; i <= end; i++) { for (unsigned int i = 0; i <= end; i++) {
std::string& to_delete = old_log_files.at(i); std::string& to_delete = old_info_log_files.at(i);
// Log(options_.info_log, "Delete type=%d %s\n", Log(options_.info_log, "Delete info log file %s\n", to_delete.c_str());
// int(kInfoLogFile), to_delete.c_str()); Status s = env_->DeleteFile(dbname_ + "/" + to_delete);
env_->DeleteFile(dbname_ + "/" + to_delete); if (!s.ok()) {
Log(options_.info_log, "Delete info log file %s FAILED -- %s\n",
to_delete.c_str(), s.ToString().c_str());
}
} }
} }
PurgeObsoleteWALFiles(); PurgeObsoleteWALFiles();
@ -684,8 +682,10 @@ void DBImpl::DeleteObsoleteFiles() {
mutex_.AssertHeld(); mutex_.AssertHeld();
DeletionState deletion_state; DeletionState deletion_state;
FindObsoleteFiles(deletion_state, true); FindObsoleteFiles(deletion_state, true);
if (deletion_state.HaveSomethingToDelete()) {
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
} }
}
// 1. Go through all archived files and // 1. Go through all archived files and
// a. if ttl is enabled, delete outdated files // a. if ttl is enabled, delete outdated files
@ -1132,7 +1132,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
autovector<MemTable*>& mems, VersionEdit* edit, autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber) { uint64_t* filenumber, LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
FileMetaData meta; FileMetaData meta;
@ -1148,6 +1148,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
Status s; Status s;
{ {
mutex_.Unlock(); mutex_.Unlock();
log_buffer->FlushBufferToLog();
std::vector<Iterator*> memtables; std::vector<Iterator*> memtables;
for (MemTable* m : mems) { for (MemTable* m : mems) {
Log(options_.info_log, Log(options_.info_log,
@ -1218,7 +1219,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
bool* madeProgress, bool* madeProgress,
DeletionState& deletion_state) { DeletionState& deletion_state,
LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(cfd->imm()->size() != 0); assert(cfd->imm()->size() != 0);
assert(cfd->imm()->IsFlushPending()); assert(cfd->imm()->IsFlushPending());
@ -1228,7 +1230,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
autovector<MemTable*> mems; autovector<MemTable*> mems;
cfd->imm()->PickMemtablesToFlush(&mems); cfd->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) { if (mems.empty()) {
Log(options_.info_log, "Nothing in memstore to flush"); LogToBuffer(log_buffer, "Nothing in memstore to flush");
return Status::OK(); return Status::OK();
} }
@ -1245,7 +1247,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
edit->SetColumnFamily(cfd->GetID()); edit->SetColumnFamily(cfd->GetID());
// This will release and re-acquire the mutex. // This will release and re-acquire the mutex.
Status s = WriteLevel0Table(cfd, mems, edit, &file_number); Status s = WriteLevel0Table(cfd, mems, edit, &file_number, log_buffer);
if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) { if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) {
s = Status::ShutdownInProgress( s = Status::ShutdownInProgress(
@ -1838,7 +1840,8 @@ void DBImpl::BGWorkCompaction(void* db) {
} }
Status DBImpl::BackgroundFlush(bool* madeProgress, Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) { DeletionState& deletion_state,
LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
// call_status is failure if at least one flush was a failure. even if // call_status is failure if at least one flush was a failure. even if
// flushing one column family reports a failure, we will continue flushing // flushing one column family reports a failure, we will continue flushing
@ -1857,8 +1860,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
"BackgroundCallFlush doing FlushMemTableToOutputFile with column " "BackgroundCallFlush doing FlushMemTableToOutputFile with column "
"family %u, flush slots available %d", "family %u, flush slots available %d",
cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_); cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_);
flush_status = flush_status = FlushMemTableToOutputFile(cfd, madeProgress,
FlushMemTableToOutputFile(cfd, madeProgress, deletion_state); deletion_state, log_buffer);
} }
if (call_status.ok() && !flush_status.ok()) { if (call_status.ok() && !flush_status.ok()) {
call_status = flush_status; call_status = flush_status;
@ -1877,11 +1880,14 @@ void DBImpl::BackgroundCallFlush() {
bool madeProgress = false; bool madeProgress = false;
DeletionState deletion_state(true); DeletionState deletion_state(true);
assert(bg_flush_scheduled_); assert(bg_flush_scheduled_);
LogBuffer log_buffer(INFO, options_.info_log.get());
{
MutexLock l(&mutex_); MutexLock l(&mutex_);
Status s; Status s;
if (!shutting_down_.Acquire_Load()) { if (!shutting_down_.Acquire_Load()) {
s = BackgroundFlush(&madeProgress, deletion_state); s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer);
if (!s.ok()) { if (!s.ok()) {
// Wait a little bit before retrying background compaction in // Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to // case this is an environmental problem and we do not want to
@ -1891,6 +1897,7 @@ void DBImpl::BackgroundCallFlush() {
Log(options_.info_log, "Waiting after background flush error: %s", Log(options_.info_log, "Waiting after background flush error: %s",
s.ToString().c_str()); s.ToString().c_str());
mutex_.Unlock(); mutex_.Unlock();
log_buffer.FlushBufferToLog();
LogFlush(options_.info_log); LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000); env_->SleepForMicroseconds(1000000);
mutex_.Lock(); mutex_.Lock();
@ -1903,6 +1910,7 @@ void DBImpl::BackgroundCallFlush() {
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete()) { if (deletion_state.HaveSomethingToDelete()) {
mutex_.Unlock(); mutex_.Unlock();
log_buffer.FlushBufferToLog();
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
mutex_.Lock(); mutex_.Lock();
} }
@ -1913,6 +1921,8 @@ void DBImpl::BackgroundCallFlush() {
} }
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
log_buffer.FlushBufferToLog();
}
void DBImpl::TEST_PurgeObsoleteteWAL() { void DBImpl::TEST_PurgeObsoleteteWAL() {
@ -1929,7 +1939,7 @@ void DBImpl::BackgroundCallCompaction() {
DeletionState deletion_state(true); DeletionState deletion_state(true);
MaybeDumpStats(); MaybeDumpStats();
LogBuffer log_buffer(INFO, options_.info_log); LogBuffer log_buffer(INFO, options_.info_log.get());
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
// Log(options_.info_log, "XXX BG Thread %llx process new work item", // Log(options_.info_log, "XXX BG Thread %llx process new work item",
@ -1945,6 +1955,7 @@ void DBImpl::BackgroundCallCompaction() {
// the problem. // the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock(); mutex_.Unlock();
log_buffer.FlushBufferToLog();
Log(options_.info_log, "Waiting after background compaction error: %s", Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str()); s.ToString().c_str());
LogFlush(options_.info_log); LogFlush(options_.info_log);
@ -1962,6 +1973,7 @@ void DBImpl::BackgroundCallCompaction() {
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete()) { if (deletion_state.HaveSomethingToDelete()) {
mutex_.Unlock(); mutex_.Unlock();
log_buffer.FlushBufferToLog();
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
mutex_.Lock(); mutex_.Lock();
} }
@ -2054,7 +2066,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else { } else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get()); CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact, deletion_state); status = DoCompactionWork(compact, deletion_state, log_buffer);
CleanupCompaction(compact, status); CleanupCompaction(compact, status);
c->ReleaseCompactionFiles(status); c->ReleaseCompactionFiles(status);
c->ReleaseInputs(); c->ReleaseInputs();
@ -2327,7 +2339,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
} }
Status DBImpl::DoCompactionWork(CompactionState* compact, Status DBImpl::DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state) { DeletionState& deletion_state,
LogBuffer* log_buffer) {
assert(compact); assert(compact);
int64_t imm_micros = 0; // Micros spent doing imm_ compactions int64_t imm_micros = 0; // Micros spent doing imm_ compactions
ColumnFamilyData* cfd = compact->compaction->column_family_data(); ColumnFamilyData* cfd = compact->compaction->column_family_data();
@ -2370,6 +2383,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// Release mutex while we're actually doing the compaction work // Release mutex while we're actually doing the compaction work
mutex_.Unlock(); mutex_.Unlock();
// flush log buffer immediately after releasing the mutex
log_buffer->FlushBufferToLog();
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction)); unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
@ -2745,8 +2760,10 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
state->mu->Unlock(); state->mu->Unlock();
delete state->super_version; delete state->super_version;
if (deletion_state.HaveSomethingToDelete()) {
state->db->PurgeObsoleteFiles(deletion_state); state->db->PurgeObsoleteFiles(deletion_state);
} }
}
delete state; delete state;
} }
@ -2907,8 +2924,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// acquiring mutex for this operation, we use atomic Swap() on the thread // acquiring mutex for this operation, we use atomic Swap() on the thread
// local pointer to guarantee exclusive access. If the thread local pointer // local pointer to guarantee exclusive access. If the thread local pointer
// is being used while a new SuperVersion is installed, the cached // is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. It will eventually get refreshed either // SuperVersion can become stale. In that case, the background thread would
// on the next GetImpl() call or next SuperVersion installation. // have swapped in kSVObsolete. We re-check the value at the end of
// Get, with an atomic compare and swap. The superversion will be released
// if detected to be stale.
thread_local_sv = cfd->GetThreadLocalSuperVersion(); thread_local_sv = cfd->GetThreadLocalSuperVersion();
void* ptr = thread_local_sv->Swap(SuperVersion::kSVInUse); void* ptr = thread_local_sv->Swap(SuperVersion::kSVInUse);
// Invariant: // Invariant:
@ -2923,7 +2942,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
SuperVersion* sv_to_delete = nullptr; SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) { if (sv && sv->Unref()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS);
mutex_.Lock(); mutex_.Lock();
// TODO underlying resources held by superversion (sst files) might
// not be released until the next background job.
sv->Cleanup(); sv->Cleanup();
sv_to_delete = sv; sv_to_delete = sv;
} else { } else {
@ -3001,15 +3023,12 @@ Status DBImpl::GetImpl(const ReadOptions& options,
if (unref_sv) { if (unref_sv) {
// Release SuperVersion // Release SuperVersion
bool delete_sv = false;
if (sv->Unref()) { if (sv->Unref()) {
mutex_.Lock(); mutex_.Lock();
sv->Cleanup(); sv->Cleanup();
mutex_.Unlock(); mutex_.Unlock();
delete_sv = true;
}
if (delete_sv) {
delete sv; delete sv;
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS);
} }
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES); RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES);
} }
@ -3263,7 +3282,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
// use extra wrapper to exclude any keys from the results which // use extra wrapper to exclude any keys from the results which
// don't begin with the prefix // don't begin with the prefix
iter = new PrefixFilterIterator(iter, *options.prefix, iter = new PrefixFilterIterator(iter, *options.prefix,
cfd->options()->prefix_extractor); cfd->options()->prefix_extractor.get());
} }
return iter; return iter;
} }
@ -3864,12 +3883,14 @@ Status DBImpl::DeleteFile(std::string name) {
if (type == kLogFile) { if (type == kLogFile) {
// Only allow deleting archived log files // Only allow deleting archived log files
if (log_type != kArchivedLogFile) { if (log_type != kArchivedLogFile) {
Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str()); Log(options_.info_log, "DeleteFile %s failed - not archived log.\n",
name.c_str());
return Status::NotSupported("Delete only supported for archived logs"); return Status::NotSupported("Delete only supported for archived logs");
} }
status = env_->DeleteFile(options_.wal_dir + "/" + name.c_str()); status = env_->DeleteFile(options_.wal_dir + "/" + name.c_str());
if (!status.ok()) { if (!status.ok()) {
Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str()); Log(options_.info_log, "DeleteFile %s failed -- %s.\n",
name.c_str(), status.ToString().c_str());
} }
return status; return status;
} }
@ -3915,7 +3936,9 @@ Status DBImpl::DeleteFile(std::string name) {
} // lock released here } // lock released here
LogFlush(options_.info_log); LogFlush(options_.info_log);
// remove files outside the db-lock // remove files outside the db-lock
if (deletion_state.HaveSomethingToDelete()) {
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
}
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
// schedule flush if file deletion means we freed the space for flushes to // schedule flush if file deletion means we freed the space for flushes to

@ -266,6 +266,7 @@ class DBImpl : public DB {
private: private:
friend class DB; friend class DB;
friend class TailingIterator; friend class TailingIterator;
friend struct SuperVersion;
struct CompactionState; struct CompactionState;
struct Writer; struct Writer;
@ -287,7 +288,8 @@ class DBImpl : public DB {
// Flush the in-memory write buffer to storage. Switches to a new // Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. // log-file/memtable and writes a new descriptor iff successful.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress, Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress,
DeletionState& deletion_state); DeletionState& deletion_state,
LogBuffer* log_buffer);
Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only); bool read_only);
@ -300,7 +302,8 @@ class DBImpl : public DB {
Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
VersionEdit* edit); VersionEdit* edit);
Status WriteLevel0Table(ColumnFamilyData* cfd, autovector<MemTable*>& mems, Status WriteLevel0Table(ColumnFamilyData* cfd, autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber); VersionEdit* edit, uint64_t* filenumber,
LogBuffer* log_buffer);
uint64_t SlowdownAmount(int n, double bottom, double top); uint64_t SlowdownAmount(int n, double bottom, double top);
Status MakeRoomForWrite(ColumnFamilyData* cfd, Status MakeRoomForWrite(ColumnFamilyData* cfd,
@ -325,10 +328,12 @@ class DBImpl : public DB {
void BackgroundCallFlush(); void BackgroundCallFlush();
Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state,
LogBuffer* log_buffer); LogBuffer* log_buffer);
Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state,
LogBuffer* log_buffer);
void CleanupCompaction(CompactionState* compact, Status status); void CleanupCompaction(CompactionState* compact, Status status);
Status DoCompactionWork(CompactionState* compact, Status DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state); DeletionState& deletion_state,
LogBuffer* log_buffer);
Status OpenCompactionOutputFile(CompactionState* compact); Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);

@ -259,8 +259,6 @@ class SpecialEnv : public EnvWrapper {
class DBTest { class DBTest {
private: private:
const FilterPolicy* filter_policy_; const FilterPolicy* filter_policy_;
static std::unique_ptr<const SliceTransform> prefix_1_transform;
static std::unique_ptr<const SliceTransform> noop_transform;
protected: protected:
// Sequence of option configurations to try // Sequence of option configurations to try
@ -375,18 +373,18 @@ class DBTest {
Options options; Options options;
switch (option_config_) { switch (option_config_) {
case kHashSkipList: case kHashSkipList:
options.memtable_factory.reset( options.prefix_extractor.reset(NewFixedPrefixTransform(1));
NewHashSkipListRepFactory(NewFixedPrefixTransform(1))); options.memtable_factory.reset(NewHashSkipListRepFactory());
break; break;
case kPlainTableFirstBytePrefix: case kPlainTableFirstBytePrefix:
options.table_factory.reset(new PlainTableFactory()); options.table_factory.reset(new PlainTableFactory());
options.prefix_extractor = prefix_1_transform.get(); options.prefix_extractor.reset(NewFixedPrefixTransform(1));
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
options.max_sequential_skip_in_iterations = 999999; options.max_sequential_skip_in_iterations = 999999;
break; break;
case kPlainTableAllBytesPrefix: case kPlainTableAllBytesPrefix:
options.table_factory.reset(new PlainTableFactory()); options.table_factory.reset(new PlainTableFactory());
options.prefix_extractor = noop_transform.get(); options.prefix_extractor.reset(NewNoopTransform());
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
options.max_sequential_skip_in_iterations = 999999; options.max_sequential_skip_in_iterations = 999999;
break; break;
@ -426,8 +424,8 @@ class DBTest {
options.memtable_factory.reset(new VectorRepFactory(100)); options.memtable_factory.reset(new VectorRepFactory(100));
break; break;
case kHashLinkList: case kHashLinkList:
options.memtable_factory.reset( options.prefix_extractor.reset(NewFixedPrefixTransform(1));
NewHashLinkListRepFactory(NewFixedPrefixTransform(1), 4)); options.memtable_factory.reset(NewHashLinkListRepFactory(4));
break; break;
case kUniversalCompaction: case kUniversalCompaction:
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
@ -945,10 +943,6 @@ class DBTest {
} }
}; };
std::unique_ptr<const SliceTransform> DBTest::prefix_1_transform(
NewFixedPrefixTransform(1));
std::unique_ptr<const SliceTransform> DBTest::noop_transform(
NewNoopTransform());
static std::string Key(int i) { static std::string Key(int i) {
char buf[100]; char buf[100];
@ -1587,12 +1581,7 @@ TEST(DBTest, IterMulti) {
iter->Seek("ax"); iter->Seek("ax");
ASSERT_EQ(IterStatus(iter), "b->vb"); ASSERT_EQ(IterStatus(iter), "b->vb");
SetPerfLevel(kEnableTime);
perf_context.Reset();
iter->Seek("b"); iter->Seek("b");
ASSERT_TRUE((int) perf_context.seek_internal_seek_time > 0);
ASSERT_TRUE((int) perf_context.find_next_user_entry_time > 0);
SetPerfLevel(kDisable);
ASSERT_EQ(IterStatus(iter), "b->vb"); ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Seek("z"); iter->Seek("z");
ASSERT_EQ(IterStatus(iter), "(invalid)"); ASSERT_EQ(IterStatus(iter), "(invalid)");
@ -1607,12 +1596,7 @@ TEST(DBTest, IterMulti) {
// Switch from forward to reverse // Switch from forward to reverse
iter->SeekToFirst(); iter->SeekToFirst();
iter->Next(); iter->Next();
SetPerfLevel(kEnableTime);
perf_context.Reset();
iter->Next(); iter->Next();
ASSERT_EQ(0, (int) perf_context.seek_internal_seek_time);
ASSERT_TRUE((int) perf_context.find_next_user_entry_time > 0);
SetPerfLevel(kDisable);
iter->Prev(); iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb"); ASSERT_EQ(IterStatus(iter), "b->vb");
@ -5690,7 +5674,7 @@ TEST(DBTest, PrefixScan) {
options.env = env_; options.env = env_;
options.no_block_cache = true; options.no_block_cache = true;
options.filter_policy = NewBloomFilterPolicy(10); options.filter_policy = NewBloomFilterPolicy(10);
options.prefix_extractor = NewFixedPrefixTransform(8); options.prefix_extractor.reset(NewFixedPrefixTransform(8));
options.whole_key_filtering = false; options.whole_key_filtering = false;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.max_background_compactions = 2; options.max_background_compactions = 2;
@ -5698,8 +5682,7 @@ TEST(DBTest, PrefixScan) {
options.disable_seek_compaction = true; options.disable_seek_compaction = true;
// Tricky: options.prefix_extractor will be released by // Tricky: options.prefix_extractor will be released by
// NewHashSkipListRepFactory after use. // NewHashSkipListRepFactory after use.
options.memtable_factory.reset( options.memtable_factory.reset(NewHashSkipListRepFactory());
NewHashSkipListRepFactory(options.prefix_extractor));
// prefix specified, with blooms: 2 RAND I/Os // prefix specified, with blooms: 2 RAND I/Os
// SeekToFirst // SeekToFirst
@ -5899,14 +5882,12 @@ TEST(DBTest, TailingIteratorPrefixSeek) {
read_options.tailing = true; read_options.tailing = true;
read_options.prefix_seek = true; read_options.prefix_seek = true;
auto prefix_extractor = NewFixedPrefixTransform(2);
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
options.create_if_missing = true; options.create_if_missing = true;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.prefix_extractor = prefix_extractor; options.prefix_extractor.reset(NewFixedPrefixTransform(2));
options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor)); options.memtable_factory.reset(NewHashSkipListRepFactory());
DestroyAndReopen(&options); DestroyAndReopen(&options);
CreateAndReopenWithCF({"pikachu"}, &options); CreateAndReopenWithCF({"pikachu"}, &options);

@ -33,7 +33,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
: comparator_(cmp), : comparator_(cmp),
refs_(0), refs_(0),
arena_(options.arena_block_size), arena_(options.arena_block_size),
table_(options.memtable_factory->CreateMemTableRep(comparator_, &arena_)), table_(options.memtable_factory->CreateMemTableRep(
comparator_, &arena_, options.prefix_extractor.get())),
flush_in_progress_(false), flush_in_progress_(false),
flush_completed_(false), flush_completed_(false),
file_number_(0), file_number_(0),
@ -41,7 +42,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
mem_next_logfile_number_(0), mem_next_logfile_number_(0),
locks_(options.inplace_update_support ? options.inplace_update_num_locks locks_(options.inplace_update_support ? options.inplace_update_num_locks
: 0), : 0),
prefix_extractor_(options.prefix_extractor) { prefix_extractor_(options.prefix_extractor.get()) {
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits, prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
options.memtable_prefix_bloom_probes)); options.memtable_prefix_bloom_probes));

@ -44,7 +44,6 @@ class PlainTableDBTest {
DB* db_; DB* db_;
Options last_options_; Options last_options_;
static std::unique_ptr<const SliceTransform> prefix_transform;
public: public:
PlainTableDBTest() : env_(Env::Default()) { PlainTableDBTest() : env_(Env::Default()) {
@ -66,7 +65,7 @@ class PlainTableDBTest {
Options CurrentOptions() { Options CurrentOptions() {
Options options; Options options;
options.table_factory.reset(NewPlainTableFactory(16, 2, 0.8, 3)); options.table_factory.reset(NewPlainTableFactory(16, 2, 0.8, 3));
options.prefix_extractor = prefix_transform.get(); options.prefix_extractor.reset(NewFixedPrefixTransform(8));
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
return options; return options;
} }
@ -173,9 +172,6 @@ class PlainTableDBTest {
} }
}; };
std::unique_ptr<const SliceTransform> PlainTableDBTest::prefix_transform(
NewFixedPrefixTransform(8));
TEST(PlainTableDBTest, Empty) { TEST(PlainTableDBTest, Empty) {
ASSERT_TRUE(dbfull() != nullptr); ASSERT_TRUE(dbfull() != nullptr);
ASSERT_EQ("NOT_FOUND", Get("0000000000000foo")); ASSERT_EQ("NOT_FOUND", Get("0000000000000foo"));

@ -161,16 +161,15 @@ class PrefixTest {
// skip some options // skip some options
option_config_++; option_config_++;
if (option_config_ < kEnd) { if (option_config_ < kEnd) {
auto prefix_extractor = NewFixedPrefixTransform(8); options.prefix_extractor.reset(NewFixedPrefixTransform(8));
options.prefix_extractor = prefix_extractor;
switch(option_config_) { switch(option_config_) {
case kHashSkipList: case kHashSkipList:
options.memtable_factory.reset(NewHashSkipListRepFactory( options.memtable_factory.reset(
options.prefix_extractor, bucket_count, FLAGS_skiplist_height)); NewHashSkipListRepFactory(bucket_count, FLAGS_skiplist_height));
return true; return true;
case kHashLinkList: case kHashLinkList:
options.memtable_factory.reset(NewHashLinkListRepFactory( options.memtable_factory.reset(
options.prefix_extractor, bucket_count)); NewHashLinkListRepFactory(bucket_count));
return true; return true;
default: default:
return false; return false;

@ -277,8 +277,8 @@ bool SkipList<Key,Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
} }
template<typename Key, class Comparator> template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev) typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::
const { FindGreaterOrEqual(const Key& key, Node** prev) const {
// Use prev as an optimization hint and fallback to slow path // Use prev as an optimization hint and fallback to slow path
if (prev && !KeyIsAfterNode(key, prev[0]->Next(0))) { if (prev && !KeyIsAfterNode(key, prev[0]->Next(0))) {
Node* x = prev[0]; Node* x = prev[0];
@ -356,7 +356,7 @@ typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()
} }
template<typename Key, class Comparator> template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena, SkipList<Key, Comparator>::SkipList(const Comparator cmp, Arena* arena,
int32_t max_height, int32_t max_height,
int32_t branching_factor) int32_t branching_factor)
: kMaxHeight_(max_height), : kMaxHeight_(max_height),

@ -159,7 +159,7 @@ bool TailingIterator::IsCurrentVersion() const {
} }
bool TailingIterator::IsSamePrefix(const Slice& target) const { bool TailingIterator::IsSamePrefix(const Slice& target) const {
const SliceTransform* extractor = cfd_->options()->prefix_extractor; const SliceTransform* extractor = cfd_->options()->prefix_extractor.get();
assert(extractor); assert(extractor);
assert(is_prev_set_); assert(is_prev_set_);

@ -2505,6 +2505,8 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
Iterator* VersionSet::MakeInputIterator(Compaction* c) { Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options; ReadOptions options;
options.verify_checksums =
c->column_family_data()->options()->verify_checksums_in_compaction;
options.fill_cache = false; options.fill_cache = false;
// Level-0 files have to be merged together. For other levels, // Level-0 files have to be merged together. For other levels,

@ -41,13 +41,15 @@ namespace log { class Writer; }
class Compaction; class Compaction;
class CompactionPicker; class CompactionPicker;
class Iterator; class Iterator;
class LogBuffer;
class LookupKey;
class MemTable; class MemTable;
class Version; class Version;
class VersionSet; class VersionSet;
class MergeContext; class MergeContext;
class ColumnFamilyData; class ColumnFamilyData;
class ColumnFamilySet; class ColumnFamilySet;
class LookupKey; class TableCache;
// Return the smallest index i such that files[i]->largest >= key. // Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file. // Return files.size() if there is no such file.

@ -575,26 +575,6 @@ class Logger {
InfoLogLevel log_level_; InfoLogLevel log_level_;
}; };
// A class to buffer info log entries and flush them in the end.
class LogBuffer {
public:
// log_level: the log level for all the logs
// info_log: logger to write the logs to
LogBuffer(const InfoLogLevel log_level, const shared_ptr<Logger>& info_log);
~LogBuffer();
// Add a log entry to the buffer.
void AddLogToBuffer(const char* format, va_list ap);
// Flush all buffered log to the info log.
void FlushBufferToLog() const;
private:
struct Rep;
Rep* rep_;
const InfoLogLevel log_level_;
const shared_ptr<Logger>& info_log_;
};
// Identifies a locked file. // Identifies a locked file.
class FileLock { class FileLock {
@ -607,10 +587,6 @@ class FileLock {
void operator=(const FileLock&); void operator=(const FileLock&);
}; };
// Add log to the LogBuffer for a delayed info logging. It can be used when
// we want to add some logs inside a mutex.
extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...);
extern void LogFlush(const shared_ptr<Logger>& info_log); extern void LogFlush(const shared_ptr<Logger>& info_log);
extern void Log(const InfoLogLevel log_level, extern void Log(const InfoLogLevel log_level,

@ -160,8 +160,8 @@ class MemTableRep {
class MemTableRepFactory { class MemTableRepFactory {
public: public:
virtual ~MemTableRepFactory() {} virtual ~MemTableRepFactory() {}
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&, virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
Arena*) = 0; Arena*, const SliceTransform*) = 0;
virtual const char* Name() const = 0; virtual const char* Name() const = 0;
}; };
@ -178,8 +178,9 @@ class VectorRepFactory : public MemTableRepFactory {
public: public:
explicit VectorRepFactory(size_t count = 0) : count_(count) { } explicit VectorRepFactory(size_t count = 0) : count_(count) { }
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&, virtual MemTableRep* CreateMemTableRep(
Arena*) override; const MemTableRep::KeyComparator&, Arena*,
const SliceTransform*) override;
virtual const char* Name() const override { virtual const char* Name() const override {
return "VectorRepFactory"; return "VectorRepFactory";
} }
@ -188,8 +189,9 @@ class VectorRepFactory : public MemTableRepFactory {
// This uses a skip list to store keys. It is the default. // This uses a skip list to store keys. It is the default.
class SkipListFactory : public MemTableRepFactory { class SkipListFactory : public MemTableRepFactory {
public: public:
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&, virtual MemTableRep* CreateMemTableRep(
Arena*) override; const MemTableRep::KeyComparator&, Arena*,
const SliceTransform*) override;
virtual const char* Name() const override { virtual const char* Name() const override {
return "SkipListFactory"; return "SkipListFactory";
} }
@ -202,8 +204,8 @@ class SkipListFactory : public MemTableRepFactory {
// skiplist_branching_factor: probabilistic size ratio between adjacent // skiplist_branching_factor: probabilistic size ratio between adjacent
// link lists in the skiplist // link lists in the skiplist
extern MemTableRepFactory* NewHashSkipListRepFactory( extern MemTableRepFactory* NewHashSkipListRepFactory(
const SliceTransform* transform, size_t bucket_count = 1000000, size_t bucket_count = 1000000, int32_t skiplist_height = 4,
int32_t skiplist_height = 4, int32_t skiplist_branching_factor = 4 int32_t skiplist_branching_factor = 4
); );
// The factory is to create memtables with a hashed linked list: // The factory is to create memtables with a hashed linked list:
@ -211,6 +213,6 @@ extern MemTableRepFactory* NewHashSkipListRepFactory(
// linked list (null if the bucket is empty). // linked list (null if the bucket is empty).
// bucket_count: number of fixed array buckets // bucket_count: number of fixed array buckets
extern MemTableRepFactory* NewHashLinkListRepFactory( extern MemTableRepFactory* NewHashLinkListRepFactory(
const SliceTransform* transform, size_t bucket_count = 50000); size_t bucket_count = 50000);
} // namespace rocksdb } // namespace rocksdb

@ -237,7 +237,7 @@ struct ColumnFamilyOptions {
// 4) prefix(prefix(key)) == prefix(key) // 4) prefix(prefix(key)) == prefix(key)
// //
// Default: nullptr // Default: nullptr
const SliceTransform* prefix_extractor; std::shared_ptr<const SliceTransform> prefix_extractor;
// If true, place whole keys in the filter (not just prefixes). // If true, place whole keys in the filter (not just prefixes).
// This must generally be true for gets to be efficient. // This must generally be true for gets to be efficient.
@ -385,6 +385,11 @@ struct ColumnFamilyOptions {
// The compaction style. Default: kCompactionStyleLevel // The compaction style. Default: kCompactionStyleLevel
CompactionStyle compaction_style; CompactionStyle compaction_style;
// If true, compaction will verify checksum on every read that happens
// as part of compaction
// Default: true
bool verify_checksums_in_compaction;
// The options needed to support Universal Style compactions // The options needed to support Universal Style compactions
CompactionOptionsUniversal compaction_options_universal; CompactionOptionsUniversal compaction_options_universal;

@ -124,6 +124,7 @@ enum Tickers {
NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
NUMBER_SUPERVERSION_ACQUIRES, NUMBER_SUPERVERSION_ACQUIRES,
NUMBER_SUPERVERSION_RELEASES, NUMBER_SUPERVERSION_RELEASES,
NUMBER_SUPERVERSION_CLEANUPS,
TICKER_ENUM_MAX TICKER_ENUM_MAX
}; };
@ -181,6 +182,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
"rocksdb.number.direct.load.table.properties"}, "rocksdb.number.direct.load.table.properties"},
{NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"}, {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"},
{NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"},
{NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"},
}; };
/** /**

@ -55,6 +55,8 @@ struct BackupableDBOptions {
// Default: false // Default: false
bool destroy_old_data; bool destroy_old_data;
void Dump(Logger* logger) const;
explicit BackupableDBOptions(const std::string& _backup_dir, explicit BackupableDBOptions(const std::string& _backup_dir,
Env* _backup_env = nullptr, Env* _backup_env = nullptr,
bool _share_table_files = true, bool _share_table_files = true,
@ -62,6 +64,7 @@ struct BackupableDBOptions {
bool _destroy_old_data = false) bool _destroy_old_data = false)
: backup_dir(_backup_dir), : backup_dir(_backup_dir),
backup_env(_backup_env), backup_env(_backup_env),
share_table_files(_share_table_files),
info_log(_info_log), info_log(_info_log),
sync(_sync), sync(_sync),
destroy_old_data(_destroy_old_data) {} destroy_old_data(_destroy_old_data) {}

@ -24,7 +24,7 @@ static const size_t kFilterBase = 1 << kFilterBaseLg;
FilterBlockBuilder::FilterBlockBuilder(const Options& opt, FilterBlockBuilder::FilterBlockBuilder(const Options& opt,
const Comparator* internal_comparator) const Comparator* internal_comparator)
: policy_(opt.filter_policy), : policy_(opt.filter_policy),
prefix_extractor_(opt.prefix_extractor), prefix_extractor_(opt.prefix_extractor.get()),
whole_key_filtering_(opt.whole_key_filtering), whole_key_filtering_(opt.whole_key_filtering),
comparator_(internal_comparator) {} comparator_(internal_comparator) {}
@ -133,7 +133,7 @@ void FilterBlockBuilder::GenerateFilter() {
FilterBlockReader::FilterBlockReader( FilterBlockReader::FilterBlockReader(
const Options& opt, const Slice& contents, bool delete_contents_after_use) const Options& opt, const Slice& contents, bool delete_contents_after_use)
: policy_(opt.filter_policy), : policy_(opt.filter_policy),
prefix_extractor_(opt.prefix_extractor), prefix_extractor_(opt.prefix_extractor.get()),
whole_key_filtering_(opt.whole_key_filtering), whole_key_filtering_(opt.whole_key_filtering),
data_(nullptr), data_(nullptr),
offset_(nullptr), offset_(nullptr),

@ -240,8 +240,8 @@ int main(int argc, char** argv) {
rocksdb::TableFactory* tf = new rocksdb::BlockBasedTableFactory(); rocksdb::TableFactory* tf = new rocksdb::BlockBasedTableFactory();
rocksdb::Options options; rocksdb::Options options;
if (FLAGS_prefix_len < 16) { if (FLAGS_prefix_len < 16) {
options.prefix_extractor = rocksdb::NewFixedPrefixTransform( options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(
FLAGS_prefix_len); FLAGS_prefix_len));
} }
rocksdb::ReadOptions ro; rocksdb::ReadOptions ro;
rocksdb::EnvOptions env_options; rocksdb::EnvOptions env_options;
@ -254,8 +254,8 @@ int main(int argc, char** argv) {
env_options.use_mmap_reads = true; env_options.use_mmap_reads = true;
tf = new rocksdb::PlainTableFactory(16, (FLAGS_prefix_len == 16) ? 0 : 8, tf = new rocksdb::PlainTableFactory(16, (FLAGS_prefix_len == 16) ? 0 : 8,
0.75); 0.75);
options.prefix_extractor = rocksdb::NewFixedPrefixTransform( options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(
FLAGS_prefix_len); FLAGS_prefix_len));
} else { } else {
tf = new rocksdb::BlockBasedTableFactory(); tf = new rocksdb::BlockBasedTableFactory();
} }

@ -697,7 +697,7 @@ class Harness {
case PLAIN_TABLE_SEMI_FIXED_PREFIX: case PLAIN_TABLE_SEMI_FIXED_PREFIX:
support_prev_ = false; support_prev_ = false;
only_support_prefix_seek_ = true; only_support_prefix_seek_ = true;
options_.prefix_extractor = prefix_transform.get(); options_.prefix_extractor.reset(new FixedOrLessPrefixTransform(2));
options_.allow_mmap_reads = true; options_.allow_mmap_reads = true;
options_.table_factory.reset(NewPlainTableFactory()); options_.table_factory.reset(NewPlainTableFactory());
constructor_ = new TableConstructor(options_.comparator, true, true); constructor_ = new TableConstructor(options_.comparator, true, true);
@ -707,7 +707,7 @@ class Harness {
case PLAIN_TABLE_FULL_STR_PREFIX: case PLAIN_TABLE_FULL_STR_PREFIX:
support_prev_ = false; support_prev_ = false;
only_support_prefix_seek_ = true; only_support_prefix_seek_ = true;
options_.prefix_extractor = noop_transform.get(); options_.prefix_extractor.reset(NewNoopTransform());
options_.allow_mmap_reads = true; options_.allow_mmap_reads = true;
options_.table_factory.reset(NewPlainTableFactory()); options_.table_factory.reset(NewPlainTableFactory());
constructor_ = new TableConstructor(options_.comparator, true, true); constructor_ = new TableConstructor(options_.comparator, true, true);
@ -920,15 +920,8 @@ class Harness {
bool support_prev_; bool support_prev_;
bool only_support_prefix_seek_; bool only_support_prefix_seek_;
shared_ptr<InternalKeyComparator> internal_comparator_; shared_ptr<InternalKeyComparator> internal_comparator_;
static std::unique_ptr<const SliceTransform> noop_transform;
static std::unique_ptr<const SliceTransform> prefix_transform;
}; };
std::unique_ptr<const SliceTransform> Harness::noop_transform(
NewNoopTransform());
std::unique_ptr<const SliceTransform> Harness::prefix_transform(
new FixedOrLessPrefixTransform(2));
static bool Between(uint64_t val, uint64_t low, uint64_t high) { static bool Between(uint64_t val, uint64_t low, uint64_t high) {
bool result = (val >= low) && (val <= high); bool result = (val >= low) && (val <= high);
if (!result) { if (!result) {

@ -688,9 +688,6 @@ class StressTest {
filter_policy_(FLAGS_bloom_bits >= 0 filter_policy_(FLAGS_bloom_bits >= 0
? NewBloomFilterPolicy(FLAGS_bloom_bits) ? NewBloomFilterPolicy(FLAGS_bloom_bits)
: nullptr), : nullptr),
prefix_extractor_(NewFixedPrefixTransform(
FLAGS_test_batches_snapshots ?
sizeof(long) : sizeof(long)-1)),
db_(nullptr), db_(nullptr),
num_times_reopened_(0) { num_times_reopened_(0) {
if (FLAGS_destroy_db_initially) { if (FLAGS_destroy_db_initially) {
@ -708,7 +705,6 @@ class StressTest {
~StressTest() { ~StressTest() {
delete db_; delete db_;
delete filter_policy_; delete filter_policy_;
delete prefix_extractor_;
} }
void Run() { void Run() {
@ -1373,7 +1369,7 @@ class StressTest {
static_cast<rocksdb::CompactionStyle>(FLAGS_compaction_style); static_cast<rocksdb::CompactionStyle>(FLAGS_compaction_style);
options.block_size = FLAGS_block_size; options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;
options.prefix_extractor = prefix_extractor_; options.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size));
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
options.statistics = dbstats; options.statistics = dbstats;
options.env = FLAGS_env; options.env = FLAGS_env;
@ -1405,16 +1401,13 @@ class StressTest {
} }
switch (FLAGS_rep_factory) { switch (FLAGS_rep_factory) {
case kHashSkipList: case kHashSkipList:
options.memtable_factory.reset(NewHashSkipListRepFactory( options.memtable_factory.reset(NewHashSkipListRepFactory());
NewFixedPrefixTransform(FLAGS_prefix_size)));
break; break;
case kSkipList: case kSkipList:
// no need to do anything // no need to do anything
break; break;
case kVectorRep: case kVectorRep:
options.memtable_factory.reset( options.memtable_factory.reset(new VectorRepFactory());
new VectorRepFactory()
);
break; break;
} }
static Random purge_percent(1000); // no benefit from non-determinism here static Random purge_percent(1000); // no benefit from non-determinism here
@ -1488,7 +1481,6 @@ class StressTest {
shared_ptr<Cache> cache_; shared_ptr<Cache> cache_;
shared_ptr<Cache> compressed_cache_; shared_ptr<Cache> compressed_cache_;
const FilterPolicy* filter_policy_; const FilterPolicy* filter_policy_;
const SliceTransform* prefix_extractor_;
DB* db_; DB* db_;
StackableDB* sdb_; StackableDB* sdb_;
int num_times_reopened_; int num_times_reopened_;

@ -205,10 +205,12 @@ class LDBTestCase(unittest.TestCase):
def testTtlPutGet(self): def testTtlPutGet(self):
print "Running testTtlPutGet..." print "Running testTtlPutGet..."
self.assertRunOK("put a1 b1 --ttl --create_if_missing", "OK") self.assertRunOK("put a1 b1 --ttl --create_if_missing", "OK")
self.assertRunOK("scan ", "a1 : b1", True) self.assertRunOK("scan --hex", "0x6131 : 0x6231", True)
self.assertRunOK("dump --ttl ", "a1 ==> b1", True) self.assertRunOK("dump --ttl ", "a1 ==> b1", True)
self.assertRunOK("dump --hex --ttl ",
"0x6131 ==> 0x6231\nKeys in range: 1")
self.assertRunOK("scan --hex --ttl", "0x6131 : 0x6231") self.assertRunOK("scan --hex --ttl", "0x6131 : 0x6231")
self.assertRunOK("get a1", "b1", True) self.assertRunOK("get --value_hex a1", "0x6231", True)
self.assertRunOK("get --ttl a1", "b1") self.assertRunOK("get --ttl a1", "b1")
self.assertRunOK("put a3 b3 --create_if_missing", "OK") self.assertRunOK("put a3 b3 --create_if_missing", "OK")
# fails because timstamp's length is greater than value's # fails because timstamp's length is greater than value's

@ -130,7 +130,7 @@ Status SstFileReader::SetTableOptionsByMagicNumber(uint64_t table_magic_number,
options_.allow_mmap_reads = true; options_.allow_mmap_reads = true;
options_.table_factory = std::make_shared<PlainTableFactory>( options_.table_factory = std::make_shared<PlainTableFactory>(
table_properties->fixed_key_len, 2, 0.8); table_properties->fixed_key_len, 2, 0.8);
options_.prefix_extractor = NewNoopTransform(); options_.prefix_extractor.reset(NewNoopTransform());
fprintf(stdout, "Sst file format: plain table\n"); fprintf(stdout, "Sst file format: plain table\n");
} else { } else {
char error_msg_buffer[80]; char error_msg_buffer[80];

@ -31,82 +31,9 @@ WritableFile::~WritableFile() {
Logger::~Logger() { Logger::~Logger() {
} }
// One log entry with its timestamp
struct BufferedLog {
struct timeval now_tv; // Timestamp of the log
char message[1]; // Beginning of log message
};
struct LogBuffer::Rep {
Arena arena_;
autovector<BufferedLog*> logs_;
};
// Lazily initialize Rep to avoid allocations when new log is added.
LogBuffer::LogBuffer(const InfoLogLevel log_level,
const shared_ptr<Logger>& info_log)
: rep_(nullptr), log_level_(log_level), info_log_(info_log) {}
LogBuffer::~LogBuffer() { delete rep_; }
void LogBuffer::AddLogToBuffer(const char* format, va_list ap) {
if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) {
// Skip the level because of its level.
return;
}
if (rep_ == nullptr) {
rep_ = new Rep();
}
const size_t kLogSizeLimit = 512;
char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit);
BufferedLog* buffered_log = new (alloc_mem) BufferedLog();
char* p = buffered_log->message;
char* limit = alloc_mem + kLogSizeLimit - 1;
// store the time
gettimeofday(&(buffered_log->now_tv), nullptr);
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Add '\0' to the end
*p = '\0';
rep_->logs_.push_back(buffered_log);
}
void LogBuffer::FlushBufferToLog() const {
if (rep_ != nullptr) {
for (BufferedLog* log : rep_->logs_) {
const time_t seconds = log->now_tv.tv_sec;
struct tm t;
localtime_r(&seconds, &t);
Log(log_level_, info_log_,
"(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min,
t.tm_sec, static_cast<int>(log->now_tv.tv_usec), log->message);
}
}
}
FileLock::~FileLock() { FileLock::~FileLock() {
} }
void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) {
if (log_buffer != nullptr) {
va_list ap;
va_start(ap, format);
log_buffer->AddLogToBuffer(format, ap);
va_end(ap);
}
}
void LogFlush(Logger *info_log) { void LogFlush(Logger *info_log) {
if (info_log) { if (info_log) {
info_log->Flush(); info_log->Flush();

@ -55,7 +55,7 @@ private:
class HashLinkListRep : public MemTableRep { class HashLinkListRep : public MemTableRep {
public: public:
HashLinkListRep(MemTableRep::KeyComparator& compare, Arena* arena, HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size); const SliceTransform* transform, size_t bucket_size);
virtual void Insert(const char* key) override; virtual void Insert(const char* key) override;
@ -81,7 +81,7 @@ class HashLinkListRep : public MemTableRep {
private: private:
friend class DynamicIterator; friend class DynamicIterator;
typedef SkipList<const char*, MemTableRep::KeyComparator&> FullList; typedef SkipList<const char*, const MemTableRep::KeyComparator&> FullList;
size_t bucket_size_; size_t bucket_size_;
@ -92,7 +92,7 @@ class HashLinkListRep : public MemTableRep {
// The user-supplied transform whose domain is the user keys. // The user-supplied transform whose domain is the user keys.
const SliceTransform* transform_; const SliceTransform* transform_;
MemTableRep::KeyComparator& compare_; const MemTableRep::KeyComparator& compare_;
// immutable after construction // immutable after construction
Arena* const arena_; Arena* const arena_;
@ -314,7 +314,7 @@ class HashLinkListRep : public MemTableRep {
}; };
}; };
HashLinkListRep::HashLinkListRep(MemTableRep::KeyComparator& compare, HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare,
Arena* arena, const SliceTransform* transform, Arena* arena, const SliceTransform* transform,
size_t bucket_size) size_t bucket_size)
: bucket_size_(bucket_size), : bucket_size_(bucket_size),
@ -475,13 +475,13 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head,
} // anon namespace } // anon namespace
MemTableRep* HashLinkListRepFactory::CreateMemTableRep( MemTableRep* HashLinkListRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) { const MemTableRep::KeyComparator& compare, Arena* arena,
return new HashLinkListRep(compare, arena, transform_, bucket_count_); const SliceTransform* transform) {
return new HashLinkListRep(compare, arena, transform, bucket_count_);
} }
MemTableRepFactory* NewHashLinkListRepFactory( MemTableRepFactory* NewHashLinkListRepFactory(size_t bucket_count) {
const SliceTransform* transform, size_t bucket_count) { return new HashLinkListRepFactory(bucket_count);
return new HashLinkListRepFactory(transform, bucket_count);
} }
} // namespace rocksdb } // namespace rocksdb

@ -14,25 +14,20 @@ namespace rocksdb {
class HashLinkListRepFactory : public MemTableRepFactory { class HashLinkListRepFactory : public MemTableRepFactory {
public: public:
explicit HashLinkListRepFactory( explicit HashLinkListRepFactory(size_t bucket_count)
const SliceTransform* transform, : bucket_count_(bucket_count) { }
size_t bucket_count)
: transform_(transform),
bucket_count_(bucket_count) { }
virtual ~HashLinkListRepFactory() { delete transform_; } virtual ~HashLinkListRepFactory() {}
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator& compare, virtual MemTableRep* CreateMemTableRep(
Arena* arena) override; const MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform* transform) override;
virtual const char* Name() const override { virtual const char* Name() const override {
return "HashLinkListRepFactory"; return "HashLinkListRepFactory";
} }
const SliceTransform* GetTransform() { return transform_; }
private: private:
const SliceTransform* transform_;
const size_t bucket_count_; const size_t bucket_count_;
}; };

@ -21,7 +21,7 @@ namespace {
class HashSkipListRep : public MemTableRep { class HashSkipListRep : public MemTableRep {
public: public:
HashSkipListRep(MemTableRep::KeyComparator& compare, Arena* arena, HashSkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size, const SliceTransform* transform, size_t bucket_size,
int32_t skiplist_height, int32_t skiplist_branching_factor); int32_t skiplist_height, int32_t skiplist_branching_factor);
@ -48,7 +48,7 @@ class HashSkipListRep : public MemTableRep {
private: private:
friend class DynamicIterator; friend class DynamicIterator;
typedef SkipList<const char*, MemTableRep::KeyComparator&> Bucket; typedef SkipList<const char*, const MemTableRep::KeyComparator&> Bucket;
size_t bucket_size_; size_t bucket_size_;
@ -62,7 +62,7 @@ class HashSkipListRep : public MemTableRep {
// The user-supplied transform whose domain is the user keys. // The user-supplied transform whose domain is the user keys.
const SliceTransform* transform_; const SliceTransform* transform_;
MemTableRep::KeyComparator& compare_; const MemTableRep::KeyComparator& compare_;
// immutable after construction // immutable after construction
Arena* const arena_; Arena* const arena_;
@ -221,7 +221,7 @@ class HashSkipListRep : public MemTableRep {
}; };
}; };
HashSkipListRep::HashSkipListRep(MemTableRep::KeyComparator& compare, HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare,
Arena* arena, const SliceTransform* transform, Arena* arena, const SliceTransform* transform,
size_t bucket_size, int32_t skiplist_height, size_t bucket_size, int32_t skiplist_height,
int32_t skiplist_branching_factor) int32_t skiplist_branching_factor)
@ -321,16 +321,17 @@ MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator() {
} // anon namespace } // anon namespace
MemTableRep* HashSkipListRepFactory::CreateMemTableRep( MemTableRep* HashSkipListRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) { const MemTableRep::KeyComparator& compare, Arena* arena,
return new HashSkipListRep(compare, arena, transform_, bucket_count_, const SliceTransform* transform) {
return new HashSkipListRep(compare, arena, transform, bucket_count_,
skiplist_height_, skiplist_branching_factor_); skiplist_height_, skiplist_branching_factor_);
} }
MemTableRepFactory* NewHashSkipListRepFactory( MemTableRepFactory* NewHashSkipListRepFactory(
const SliceTransform* transform, size_t bucket_count, size_t bucket_count, int32_t skiplist_height,
int32_t skiplist_height, int32_t skiplist_branching_factor) { int32_t skiplist_branching_factor) {
return new HashSkipListRepFactory(transform, bucket_count, return new HashSkipListRepFactory(bucket_count, skiplist_height,
skiplist_height, skiplist_branching_factor); skiplist_branching_factor);
} }
} // namespace rocksdb } // namespace rocksdb

@ -15,28 +15,24 @@ namespace rocksdb {
class HashSkipListRepFactory : public MemTableRepFactory { class HashSkipListRepFactory : public MemTableRepFactory {
public: public:
explicit HashSkipListRepFactory( explicit HashSkipListRepFactory(
const SliceTransform* transform,
size_t bucket_count, size_t bucket_count,
int32_t skiplist_height, int32_t skiplist_height,
int32_t skiplist_branching_factor) int32_t skiplist_branching_factor)
: transform_(transform), : bucket_count_(bucket_count),
bucket_count_(bucket_count),
skiplist_height_(skiplist_height), skiplist_height_(skiplist_height),
skiplist_branching_factor_(skiplist_branching_factor) { } skiplist_branching_factor_(skiplist_branching_factor) { }
virtual ~HashSkipListRepFactory() { delete transform_; } virtual ~HashSkipListRepFactory() {}
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator& compare, virtual MemTableRep* CreateMemTableRep(
Arena* arena) override; const MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform* transform) override;
virtual const char* Name() const override { virtual const char* Name() const override {
return "HashSkipListRepFactory"; return "HashSkipListRepFactory";
} }
const SliceTransform* GetTransform() { return transform_; }
private: private:
const SliceTransform* transform_;
const size_t bucket_count_; const size_t bucket_count_;
const int32_t skiplist_height_; const int32_t skiplist_height_;
const int32_t skiplist_branching_factor_; const int32_t skiplist_branching_factor_;

@ -0,0 +1,67 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "util/log_buffer.h"
#include <sys/time.h>
namespace rocksdb {
LogBuffer::LogBuffer(const InfoLogLevel log_level,
Logger*info_log)
: log_level_(log_level), info_log_(info_log) {}
void LogBuffer::AddLogToBuffer(const char* format, va_list ap) {
if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) {
// Skip the level because of its level.
return;
}
const size_t kLogSizeLimit = 512;
char* alloc_mem = arena_.AllocateAligned(kLogSizeLimit);
BufferedLog* buffered_log = new (alloc_mem) BufferedLog();
char* p = buffered_log->message;
char* limit = alloc_mem + kLogSizeLimit - 1;
// store the time
gettimeofday(&(buffered_log->now_tv), nullptr);
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Add '\0' to the end
*p = '\0';
logs_.push_back(buffered_log);
}
void LogBuffer::FlushBufferToLog() {
for (BufferedLog* log : logs_) {
const time_t seconds = log->now_tv.tv_sec;
struct tm t;
localtime_r(&seconds, &t);
Log(log_level_, info_log_,
"(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min,
t.tm_sec, static_cast<int>(log->now_tv.tv_usec), log->message);
}
logs_.clear();
}
void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) {
if (log_buffer != nullptr) {
va_list ap;
va_start(ap, format);
log_buffer->AddLogToBuffer(format, ap);
va_end(ap);
}
}
} // namespace rocksdb

@ -0,0 +1,46 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include "rocksdb/env.h"
#include "util/arena.h"
#include "util/autovector.h"
namespace rocksdb {
class Logger;
// A class to buffer info log entries and flush them in the end.
class LogBuffer {
public:
// log_level: the log level for all the logs
// info_log: logger to write the logs to
LogBuffer(const InfoLogLevel log_level, Logger* info_log);
// Add a log entry to the buffer.
void AddLogToBuffer(const char* format, va_list ap);
// Flush all buffered log to the info log.
void FlushBufferToLog();
private:
// One log entry with its timestamp
struct BufferedLog {
struct timeval now_tv; // Timestamp of the log
char message[1]; // Beginning of log message
};
const InfoLogLevel log_level_;
Logger* info_log_;
Arena arena_;
autovector<BufferedLog*> logs_;
};
// Add log to the LogBuffer for a delayed info logging. It can be used when
// we want to add some logs inside a mutex.
extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...);
} // namespace rocksdb

@ -67,6 +67,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
purge_redundant_kvs_while_flush(true), purge_redundant_kvs_while_flush(true),
block_size_deviation(10), block_size_deviation(10),
compaction_style(kCompactionStyleLevel), compaction_style(kCompactionStyleLevel),
verify_checksums_in_compaction(true),
filter_deletes(false), filter_deletes(false),
max_sequential_skip_in_iterations(8), max_sequential_skip_in_iterations(8),
memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)), memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)),
@ -126,6 +127,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush),
block_size_deviation(options.block_size_deviation), block_size_deviation(options.block_size_deviation),
compaction_style(options.compaction_style), compaction_style(options.compaction_style),
verify_checksums_in_compaction(options.verify_checksums_in_compaction),
compaction_options_universal(options.compaction_options_universal), compaction_options_universal(options.compaction_options_universal),
filter_deletes(options.filter_deletes), filter_deletes(options.filter_deletes),
max_sequential_skip_in_iterations( max_sequential_skip_in_iterations(
@ -379,6 +381,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
block_size_deviation); block_size_deviation);
Log(log," Options.filter_deletes: %d", Log(log," Options.filter_deletes: %d",
filter_deletes); filter_deletes);
Log(log, " Options.verify_checksums_in_compaction: %d",
verify_checksums_in_compaction);
Log(log," Options.compaction_style: %d", Log(log," Options.compaction_style: %d",
compaction_style); compaction_style);
Log(log," Options.compaction_options_universal.size_ratio: %u", Log(log," Options.compaction_options_universal.size_ratio: %u",

@ -10,9 +10,9 @@
namespace rocksdb { namespace rocksdb {
namespace { namespace {
class SkipListRep : public MemTableRep { class SkipListRep : public MemTableRep {
SkipList<const char*, MemTableRep::KeyComparator&> skip_list_; SkipList<const char*, const MemTableRep::KeyComparator&> skip_list_;
public: public:
explicit SkipListRep(MemTableRep::KeyComparator& compare, Arena* arena) explicit SkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena)
: skip_list_(compare, arena) { : skip_list_(compare, arena) {
} }
@ -47,12 +47,12 @@ public:
// Iteration over the contents of a skip list // Iteration over the contents of a skip list
class Iterator : public MemTableRep::Iterator { class Iterator : public MemTableRep::Iterator {
SkipList<const char*, MemTableRep::KeyComparator&>::Iterator iter_; SkipList<const char*, const MemTableRep::KeyComparator&>::Iterator iter_;
public: public:
// Initialize an iterator over the specified list. // Initialize an iterator over the specified list.
// The returned iterator is not valid. // The returned iterator is not valid.
explicit Iterator( explicit Iterator(
const SkipList<const char*, MemTableRep::KeyComparator&>* list const SkipList<const char*, const MemTableRep::KeyComparator&>* list
) : iter_(list) { } ) : iter_(list) { }
virtual ~Iterator() override { } virtual ~Iterator() override { }
@ -115,7 +115,8 @@ public:
} }
MemTableRep* SkipListFactory::CreateMemTableRep( MemTableRep* SkipListFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) { const MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform*) {
return new SkipListRep(compare, arena); return new SkipListRep(compare, arena);
} }

@ -271,7 +271,8 @@ MemTableRep::Iterator* VectorRep::GetIterator() {
} // anon namespace } // anon namespace
MemTableRep* VectorRepFactory::CreateMemTableRep( MemTableRep* VectorRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) { const MemTableRep::KeyComparator& compare, Arena* arena,
const SliceTransform*) {
return new VectorRep(compare, arena, count_); return new VectorRep(compare, arena, count_);
} }
} // namespace rocksdb } // namespace rocksdb

@ -26,6 +26,17 @@
namespace rocksdb { namespace rocksdb {
void BackupableDBOptions::Dump(Logger* logger) const {
Log(logger, " Options.backup_dir: %s", backup_dir.c_str());
Log(logger, " Options.backup_env: %p", backup_env);
Log(logger, "Options.share_table_files: %d",
static_cast<int>(share_table_files));
Log(logger, " Options.info_log: %p", info_log);
Log(logger, " Options.sync: %d", static_cast<int>(sync));
Log(logger, " Options.destroy_old_data: %d",
static_cast<int>(destroy_old_data));
}
// -------- BackupEngineImpl class --------- // -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine { class BackupEngineImpl : public BackupEngine {
public: public:
@ -205,6 +216,8 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
backup_env_(options.backup_env != nullptr ? options.backup_env backup_env_(options.backup_env != nullptr ? options.backup_env
: db_env_) { : db_env_) {
options_.Dump(options_.info_log);
// create all the dirs we need // create all the dirs we need
backup_env_->CreateDirIfMissing(GetAbsolutePath()); backup_env_->CreateDirIfMissing(GetAbsolutePath());
backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_); backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_);

Loading…
Cancel
Save