diff --git a/HISTORY.md b/HISTORY.md index 599c3f94a..3d6384d6e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,6 +14,9 @@ * Added is_manual_compaction to CompactionFilter::Context * Added "virtual void WaitForJoin() = 0" in class Env * Removed BackupEngine::DeleteBackupsNewerThan() function +* Added new option -- verify_checksums_in_compaction +* Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership) + Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly) ### New Features * If we find one truncated record at the end of the MANIFEST or WAL files, diff --git a/db/c.cc b/db/c.cc index 0a8e0700e..e4946f351 100644 --- a/db/c.cc +++ b/db/c.cc @@ -294,10 +294,10 @@ struct rocksdb_universal_compaction_options_t { }; static bool SaveError(char** errptr, const Status& s) { - assert(errptr != NULL); + assert(errptr != nullptr); if (s.ok()) { return false; - } else if (*errptr == NULL) { + } else if (*errptr == nullptr) { *errptr = strdup(s.ToString().c_str()); } else { // TODO(sanjay): Merge with existing error? @@ -319,7 +319,7 @@ rocksdb_t* rocksdb_open( char** errptr) { DB* db; if (SaveError(errptr, DB::Open(options->rep, std::string(name), &db))) { - return NULL; + return nullptr; } rocksdb_t* result = new rocksdb_t; result->rep = db; @@ -373,7 +373,7 @@ char* rocksdb_get( const char* key, size_t keylen, size_t* vallen, char** errptr) { - char* result = NULL; + char* result = nullptr; std::string tmp; Status s = db->rep->Get(options->rep, Slice(key, keylen), &tmp); if (s.ok()) { @@ -418,7 +418,7 @@ char* rocksdb_property_value( // We use strdup() since we expect human readable output. return strdup(tmp.c_str()); } else { - return NULL; + return nullptr; } } @@ -456,9 +456,9 @@ void rocksdb_compact_range( const char* limit_key, size_t limit_key_len) { Slice a, b; db->rep->CompactRange( - // Pass NULL Slice if corresponding "const char*" is NULL - (start_key ? (a = Slice(start_key, start_key_len), &a) : NULL), - (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : NULL)); + // Pass nullptr Slice if corresponding "const char*" is nullptr + (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), + (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)); } void rocksdb_flush( @@ -647,7 +647,7 @@ void rocksdb_options_set_paranoid_checks( } void rocksdb_options_set_env(rocksdb_options_t* opt, rocksdb_env_t* env) { - opt->rep.env = (env ? env->rep : NULL); + opt->rep.env = (env ? env->rep : nullptr); } void rocksdb_options_set_info_log(rocksdb_options_t* opt, rocksdb_logger_t* l) { @@ -765,7 +765,7 @@ void rocksdb_options_set_compression_options( void rocksdb_options_set_prefix_extractor( rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) { - opt->rep.prefix_extractor = prefix_extractor; + opt->rep.prefix_extractor.reset(prefix_extractor); } void rocksdb_options_set_whole_key_filtering( @@ -1087,8 +1087,8 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_bloom(int bits_per_key) { }; Wrapper* wrapper = new Wrapper; wrapper->rep_ = NewBloomFilterPolicy(bits_per_key); - wrapper->state_ = NULL; - wrapper->delete_filter_ = NULL; + wrapper->state_ = nullptr; + wrapper->delete_filter_ = nullptr; wrapper->destructor_ = &Wrapper::DoNothing; return wrapper; } @@ -1154,7 +1154,7 @@ void rocksdb_readoptions_set_prefix_seek( void rocksdb_readoptions_set_snapshot( rocksdb_readoptions_t* opt, const rocksdb_snapshot_t* snap) { - opt->rep.snapshot = (snap ? snap->rep : NULL); + opt->rep.snapshot = (snap ? snap->rep : nullptr); } void rocksdb_readoptions_set_prefix( @@ -1280,7 +1280,7 @@ rocksdb_slicetransform_t* rocksdb_slicetransform_create_fixed_prefix(size_t pref }; Wrapper* wrapper = new Wrapper; wrapper->rep_ = rocksdb::NewFixedPrefixTransform(prefixLen); - wrapper->state_ = NULL; + wrapper->state_ = nullptr; wrapper->destructor_ = &Wrapper::DoNothing; return wrapper; } diff --git a/db/column_family.cc b/db/column_family.cc index 2aeda11f3..3a5fa067b 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -40,7 +40,9 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { } db_->FindObsoleteFiles(deletion_state, false, true); mutex_->Unlock(); - db_->PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(deletion_state); + } } } @@ -84,13 +86,11 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, if (result.soft_rate_limit > result.hard_rate_limit) { result.soft_rate_limit = result.hard_rate_limit; } - if (result.prefix_extractor) { - // If a prefix extractor has been supplied and a HashSkipListRepFactory is - // being used, make sure that the latter uses the former as its transform - // function. - auto factory = - dynamic_cast(result.memtable_factory.get()); - if (factory && factory->GetTransform() != result.prefix_extractor) { + if (!result.prefix_extractor) { + assert(result.memtable_factory); + Slice name = result.memtable_factory->Name(); + if (name.compare("HashSkipListRepFactory") == 0 || + name.compare("HashLinkListRepFactory") == 0) { result.memtable_factory = std::make_shared(); } } diff --git a/db/column_family.h b/db/column_family.h index d09174521..2ff2e02ab 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -33,6 +33,7 @@ class InternalKey; class InternalStats; class ColumnFamilyData; class DBImpl; +class LogBuffer; class ColumnFamilyHandleImpl : public ColumnFamilyHandle { public: diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 6308b9731..531426a7a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -10,6 +10,8 @@ #include "db/compaction_picker.h" #include +#include "util/log_buffer.h" +#include "util/statistics.h" namespace rocksdb { diff --git a/db/db_bench.cc b/db/db_bench.cc index efb6f210f..6d7c0898a 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -1538,9 +1538,10 @@ class Benchmark { options.compaction_style = FLAGS_compaction_style_e; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; - options.prefix_extractor = - (FLAGS_use_plain_table || FLAGS_use_prefix_blooms) ? prefix_extractor_ - : nullptr; + if (FLAGS_use_plain_table || FLAGS_use_prefix_blooms) { + options.prefix_extractor.reset( + NewFixedPrefixTransform(FLAGS_prefix_size)); + } options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; @@ -1564,7 +1565,6 @@ class Benchmark { switch (FLAGS_rep_factory) { case kPrefixHash: options.memtable_factory.reset(NewHashSkipListRepFactory( - prefix_extractor_, FLAGS_hash_bucket_count)); break; case kSkipList: @@ -1572,7 +1572,6 @@ class Benchmark { break; case kHashLinkedList: options.memtable_factory.reset(NewHashLinkListRepFactory( - prefix_extractor_, FLAGS_hash_bucket_count)); break; case kVectorRep: diff --git a/db/db_impl.cc b/db/db_impl.cc index 91bc39785..b71e2ae01 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -61,7 +61,9 @@ #include "util/build_version.h" #include "util/coding.h" #include "util/hash_skiplist_rep.h" +#include "util/hash_linklist_rep.h" #include "util/logging.h" +#include "util/log_buffer.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" @@ -320,7 +322,9 @@ DBImpl::~DBImpl() { FindObsoleteFiles(deletion_state, true); // manifest number starting from 2 deletion_state.manifest_file_number = 1; - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } } } @@ -536,12 +540,8 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // files in sst_delete_files and log_delete_files. // It is not necessary to hold the mutex when invoking this method. void DBImpl::PurgeObsoleteFiles(DeletionState& state) { - // check if there is anything to do - if (state.candidate_files.empty() && - state.sst_delete_files.empty() && - state.log_delete_files.empty()) { - return; - } + // we'd better have sth to delete + assert(state.HaveSomethingToDelete()); // this checks if FindObsoleteFiles() was run before. If not, don't do // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also @@ -549,7 +549,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { if (state.manifest_file_number == 0) { return; } - std::vector old_log_files; + // Now, convert live list to an unordered set, WITHOUT mutex held; // set is slow. @@ -587,6 +587,8 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { candidate_files.end() ); + std::vector old_info_log_files; + for (const auto& to_delete : candidate_files) { uint64_t number; FileType type; @@ -617,7 +619,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { case kInfoLogFile: keep = true; if (number != 0) { - old_log_files.push_back(to_delete); + old_info_log_files.push_back(to_delete); } break; case kCurrentFile: @@ -636,44 +638,40 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { // evict from cache TableCache::Evict(table_cache_.get(), number); } + std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) + "/" + to_delete; - Log(options_.info_log, - "Delete type=%d #%lu", - int(type), - (unsigned long)number); - if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { - Status s = env_->RenameFile(fname, - ArchivedLogFileName(options_.wal_dir, number)); - if (!s.ok()) { - Log(options_.info_log, - "RenameFile logfile #%lu FAILED -- %s\n", - (unsigned long)number, s.ToString().c_str()); - } + auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); + Status s = env_->RenameFile(fname, archived_log_name); + Log(options_.info_log, + "Move log file %s to %s -- %s\n", + fname.c_str(), archived_log_name.c_str(), s.ToString().c_str()); } else { Status s = env_->DeleteFile(fname); - if (!s.ok()) { - Log(options_.info_log, "Delete type=%d #%lu FAILED -- %s\n", - int(type), (unsigned long)number, s.ToString().c_str()); - } + Log(options_.info_log, "Delete %s type=%d #%lu -- %s\n", + fname.c_str(), type, (unsigned long)number, + s.ToString().c_str()); } } // Delete old info log files. - size_t old_log_file_count = old_log_files.size(); + size_t old_info_log_file_count = old_info_log_files.size(); // NOTE: Currently we only support log purge when options_.db_log_dir is // located in `dbname` directory. - if (old_log_file_count >= options_.keep_log_file_num && + if (old_info_log_file_count >= options_.keep_log_file_num && options_.db_log_dir.empty()) { - std::sort(old_log_files.begin(), old_log_files.end()); - size_t end = old_log_file_count - options_.keep_log_file_num; + std::sort(old_info_log_files.begin(), old_info_log_files.end()); + size_t end = old_info_log_file_count - options_.keep_log_file_num; for (unsigned int i = 0; i <= end; i++) { - std::string& to_delete = old_log_files.at(i); - // Log(options_.info_log, "Delete type=%d %s\n", - // int(kInfoLogFile), to_delete.c_str()); - env_->DeleteFile(dbname_ + "/" + to_delete); + std::string& to_delete = old_info_log_files.at(i); + Log(options_.info_log, "Delete info log file %s\n", to_delete.c_str()); + Status s = env_->DeleteFile(dbname_ + "/" + to_delete); + if (!s.ok()) { + Log(options_.info_log, "Delete info log file %s FAILED -- %s\n", + to_delete.c_str(), s.ToString().c_str()); + } } } PurgeObsoleteWALFiles(); @@ -684,7 +682,9 @@ void DBImpl::DeleteObsoleteFiles() { mutex_.AssertHeld(); DeletionState deletion_state; FindObsoleteFiles(deletion_state, true); - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } } // 1. Go through all archived files and @@ -1132,7 +1132,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, autovector& mems, VersionEdit* edit, - uint64_t* filenumber) { + uint64_t* filenumber, LogBuffer* log_buffer) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -1148,6 +1148,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, Status s; { mutex_.Unlock(); + log_buffer->FlushBufferToLog(); std::vector memtables; for (MemTable* m : mems) { Log(options_.info_log, @@ -1218,7 +1219,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(cfd->imm()->size() != 0); assert(cfd->imm()->IsFlushPending()); @@ -1228,7 +1230,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, autovector mems; cfd->imm()->PickMemtablesToFlush(&mems); if (mems.empty()) { - Log(options_.info_log, "Nothing in memstore to flush"); + LogToBuffer(log_buffer, "Nothing in memstore to flush"); return Status::OK(); } @@ -1245,7 +1247,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, edit->SetColumnFamily(cfd->GetID()); // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(cfd, mems, edit, &file_number); + Status s = WriteLevel0Table(cfd, mems, edit, &file_number, log_buffer); if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) { s = Status::ShutdownInProgress( @@ -1838,7 +1840,8 @@ void DBImpl::BGWorkCompaction(void* db) { } Status DBImpl::BackgroundFlush(bool* madeProgress, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { mutex_.AssertHeld(); // call_status is failure if at least one flush was a failure. even if // flushing one column family reports a failure, we will continue flushing @@ -1857,8 +1860,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, "BackgroundCallFlush doing FlushMemTableToOutputFile with column " "family %u, flush slots available %d", cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_); - flush_status = - FlushMemTableToOutputFile(cfd, madeProgress, deletion_state); + flush_status = FlushMemTableToOutputFile(cfd, madeProgress, + deletion_state, log_buffer); } if (call_status.ok() && !flush_status.ok()) { call_status = flush_status; @@ -1877,41 +1880,48 @@ void DBImpl::BackgroundCallFlush() { bool madeProgress = false; DeletionState deletion_state(true); assert(bg_flush_scheduled_); - MutexLock l(&mutex_); - Status s; - if (!shutting_down_.Acquire_Load()) { - s = BackgroundFlush(&madeProgress, deletion_state); - if (!s.ok()) { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background flush error: %s", - s.ToString().c_str()); + LogBuffer log_buffer(INFO, options_.info_log.get()); + { + MutexLock l(&mutex_); + + Status s; + if (!shutting_down_.Acquire_Load()) { + s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer); + if (!s.ok()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + Log(options_.info_log, "Waiting after background flush error: %s", + s.ToString().c_str()); + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + LogFlush(options_.info_log); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); + } + } + + // If !s.ok(), this means that Flush failed. In that case, we want + // to delete all obsolete files and we force FindObsoleteFiles() + FindObsoleteFiles(deletion_state, !s.ok()); + // delete unnecessary files if any, this is done outside the mutex + if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); - LogFlush(options_.info_log); - env_->SleepForMicroseconds(1000000); + log_buffer.FlushBufferToLog(); + PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } - } - - // If !s.ok(), this means that Flush failed. In that case, we want - // to delete all obsolete files and we force FindObsoleteFiles() - FindObsoleteFiles(deletion_state, !s.ok()); - // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { - mutex_.Unlock(); - PurgeObsoleteFiles(deletion_state); - mutex_.Lock(); - } - bg_flush_scheduled_--; - if (madeProgress) { - MaybeScheduleFlushOrCompaction(); + bg_flush_scheduled_--; + if (madeProgress) { + MaybeScheduleFlushOrCompaction(); + } + bg_cv_.SignalAll(); } - bg_cv_.SignalAll(); + log_buffer.FlushBufferToLog(); } @@ -1929,7 +1939,7 @@ void DBImpl::BackgroundCallCompaction() { DeletionState deletion_state(true); MaybeDumpStats(); - LogBuffer log_buffer(INFO, options_.info_log); + LogBuffer log_buffer(INFO, options_.info_log.get()); { MutexLock l(&mutex_); // Log(options_.info_log, "XXX BG Thread %llx process new work item", @@ -1945,6 +1955,7 @@ void DBImpl::BackgroundCallCompaction() { // the problem. bg_cv_.SignalAll(); // In case a waiter can proceed despite the error mutex_.Unlock(); + log_buffer.FlushBufferToLog(); Log(options_.info_log, "Waiting after background compaction error: %s", s.ToString().c_str()); LogFlush(options_.info_log); @@ -1962,6 +1973,7 @@ void DBImpl::BackgroundCallCompaction() { // delete unnecessary files if any, this is done outside the mutex if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); + log_buffer.FlushBufferToLog(); PurgeObsoleteFiles(deletion_state); mutex_.Lock(); } @@ -2054,7 +2066,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); - status = DoCompactionWork(compact, deletion_state); + status = DoCompactionWork(compact, deletion_state, log_buffer); CleanupCompaction(compact, status); c->ReleaseCompactionFiles(status); c->ReleaseInputs(); @@ -2327,7 +2339,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( } Status DBImpl::DoCompactionWork(CompactionState* compact, - DeletionState& deletion_state) { + DeletionState& deletion_state, + LogBuffer* log_buffer) { assert(compact); int64_t imm_micros = 0; // Micros spent doing imm_ compactions ColumnFamilyData* cfd = compact->compaction->column_family_data(); @@ -2370,6 +2383,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // Release mutex while we're actually doing the compaction work mutex_.Unlock(); + // flush log buffer immediately after releasing the mutex + log_buffer->FlushBufferToLog(); const uint64_t start_micros = env_->NowMicros(); unique_ptr input(versions_->MakeInputIterator(compact->compaction)); @@ -2745,7 +2760,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) { state->mu->Unlock(); delete state->super_version; - state->db->PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + state->db->PurgeObsoleteFiles(deletion_state); + } } delete state; @@ -2907,8 +2924,10 @@ Status DBImpl::GetImpl(const ReadOptions& options, // acquiring mutex for this operation, we use atomic Swap() on the thread // local pointer to guarantee exclusive access. If the thread local pointer // is being used while a new SuperVersion is installed, the cached - // SuperVersion can become stale. It will eventually get refreshed either - // on the next GetImpl() call or next SuperVersion installation. + // SuperVersion can become stale. In that case, the background thread would + // have swapped in kSVObsolete. We re-check the value at the end of + // Get, with an atomic compare and swap. The superversion will be released + // if detected to be stale. thread_local_sv = cfd->GetThreadLocalSuperVersion(); void* ptr = thread_local_sv->Swap(SuperVersion::kSVInUse); // Invariant: @@ -2923,7 +2942,10 @@ Status DBImpl::GetImpl(const ReadOptions& options, SuperVersion* sv_to_delete = nullptr; if (sv && sv->Unref()) { + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS); mutex_.Lock(); + // TODO underlying resources held by superversion (sst files) might + // not be released until the next background job. sv->Cleanup(); sv_to_delete = sv; } else { @@ -3001,15 +3023,12 @@ Status DBImpl::GetImpl(const ReadOptions& options, if (unref_sv) { // Release SuperVersion - bool delete_sv = false; if (sv->Unref()) { mutex_.Lock(); sv->Cleanup(); mutex_.Unlock(); - delete_sv = true; - } - if (delete_sv) { delete sv; + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS); } RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES); } @@ -3263,7 +3282,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, // use extra wrapper to exclude any keys from the results which // don't begin with the prefix iter = new PrefixFilterIterator(iter, *options.prefix, - cfd->options()->prefix_extractor); + cfd->options()->prefix_extractor.get()); } return iter; } @@ -3864,12 +3883,14 @@ Status DBImpl::DeleteFile(std::string name) { if (type == kLogFile) { // Only allow deleting archived log files if (log_type != kArchivedLogFile) { - Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str()); + Log(options_.info_log, "DeleteFile %s failed - not archived log.\n", + name.c_str()); return Status::NotSupported("Delete only supported for archived logs"); } status = env_->DeleteFile(options_.wal_dir + "/" + name.c_str()); if (!status.ok()) { - Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str()); + Log(options_.info_log, "DeleteFile %s failed -- %s.\n", + name.c_str(), status.ToString().c_str()); } return status; } @@ -3915,7 +3936,9 @@ Status DBImpl::DeleteFile(std::string name) { } // lock released here LogFlush(options_.info_log); // remove files outside the db-lock - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } { MutexLock l(&mutex_); // schedule flush if file deletion means we freed the space for flushes to diff --git a/db/db_impl.h b/db/db_impl.h index 13cb38900..1ca799f3b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -266,6 +266,7 @@ class DBImpl : public DB { private: friend class DB; friend class TailingIterator; + friend struct SuperVersion; struct CompactionState; struct Writer; @@ -287,7 +288,8 @@ class DBImpl : public DB { // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress, - DeletionState& deletion_state); + DeletionState& deletion_state, + LogBuffer* log_buffer); Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, bool read_only); @@ -300,7 +302,8 @@ class DBImpl : public DB { Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit); Status WriteLevel0Table(ColumnFamilyData* cfd, autovector& mems, - VersionEdit* edit, uint64_t* filenumber); + VersionEdit* edit, uint64_t* filenumber, + LogBuffer* log_buffer); uint64_t SlowdownAmount(int n, double bottom, double top); Status MakeRoomForWrite(ColumnFamilyData* cfd, @@ -325,10 +328,12 @@ class DBImpl : public DB { void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer); - Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state); + Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state, + LogBuffer* log_buffer); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, - DeletionState& deletion_state); + DeletionState& deletion_state, + LogBuffer* log_buffer); Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); diff --git a/db/db_test.cc b/db/db_test.cc index 80622c94d..5b97fa383 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -259,8 +259,6 @@ class SpecialEnv : public EnvWrapper { class DBTest { private: const FilterPolicy* filter_policy_; - static std::unique_ptr prefix_1_transform; - static std::unique_ptr noop_transform; protected: // Sequence of option configurations to try @@ -375,18 +373,18 @@ class DBTest { Options options; switch (option_config_) { case kHashSkipList: - options.memtable_factory.reset( - NewHashSkipListRepFactory(NewFixedPrefixTransform(1))); + options.prefix_extractor.reset(NewFixedPrefixTransform(1)); + options.memtable_factory.reset(NewHashSkipListRepFactory()); break; case kPlainTableFirstBytePrefix: options.table_factory.reset(new PlainTableFactory()); - options.prefix_extractor = prefix_1_transform.get(); + options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.allow_mmap_reads = true; options.max_sequential_skip_in_iterations = 999999; break; case kPlainTableAllBytesPrefix: options.table_factory.reset(new PlainTableFactory()); - options.prefix_extractor = noop_transform.get(); + options.prefix_extractor.reset(NewNoopTransform()); options.allow_mmap_reads = true; options.max_sequential_skip_in_iterations = 999999; break; @@ -426,8 +424,8 @@ class DBTest { options.memtable_factory.reset(new VectorRepFactory(100)); break; case kHashLinkList: - options.memtable_factory.reset( - NewHashLinkListRepFactory(NewFixedPrefixTransform(1), 4)); + options.prefix_extractor.reset(NewFixedPrefixTransform(1)); + options.memtable_factory.reset(NewHashLinkListRepFactory(4)); break; case kUniversalCompaction: options.compaction_style = kCompactionStyleUniversal; @@ -945,10 +943,6 @@ class DBTest { } }; -std::unique_ptr DBTest::prefix_1_transform( - NewFixedPrefixTransform(1)); -std::unique_ptr DBTest::noop_transform( - NewNoopTransform()); static std::string Key(int i) { char buf[100]; @@ -1587,12 +1581,7 @@ TEST(DBTest, IterMulti) { iter->Seek("ax"); ASSERT_EQ(IterStatus(iter), "b->vb"); - SetPerfLevel(kEnableTime); - perf_context.Reset(); iter->Seek("b"); - ASSERT_TRUE((int) perf_context.seek_internal_seek_time > 0); - ASSERT_TRUE((int) perf_context.find_next_user_entry_time > 0); - SetPerfLevel(kDisable); ASSERT_EQ(IterStatus(iter), "b->vb"); iter->Seek("z"); ASSERT_EQ(IterStatus(iter), "(invalid)"); @@ -1607,12 +1596,7 @@ TEST(DBTest, IterMulti) { // Switch from forward to reverse iter->SeekToFirst(); iter->Next(); - SetPerfLevel(kEnableTime); - perf_context.Reset(); iter->Next(); - ASSERT_EQ(0, (int) perf_context.seek_internal_seek_time); - ASSERT_TRUE((int) perf_context.find_next_user_entry_time > 0); - SetPerfLevel(kDisable); iter->Prev(); ASSERT_EQ(IterStatus(iter), "b->vb"); @@ -5690,7 +5674,7 @@ TEST(DBTest, PrefixScan) { options.env = env_; options.no_block_cache = true; options.filter_policy = NewBloomFilterPolicy(10); - options.prefix_extractor = NewFixedPrefixTransform(8); + options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.whole_key_filtering = false; options.disable_auto_compactions = true; options.max_background_compactions = 2; @@ -5698,8 +5682,7 @@ TEST(DBTest, PrefixScan) { options.disable_seek_compaction = true; // Tricky: options.prefix_extractor will be released by // NewHashSkipListRepFactory after use. - options.memtable_factory.reset( - NewHashSkipListRepFactory(options.prefix_extractor)); + options.memtable_factory.reset(NewHashSkipListRepFactory()); // prefix specified, with blooms: 2 RAND I/Os // SeekToFirst @@ -5899,14 +5882,12 @@ TEST(DBTest, TailingIteratorPrefixSeek) { read_options.tailing = true; read_options.prefix_seek = true; - auto prefix_extractor = NewFixedPrefixTransform(2); - Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.disable_auto_compactions = true; - options.prefix_extractor = prefix_extractor; - options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor)); + options.prefix_extractor.reset(NewFixedPrefixTransform(2)); + options.memtable_factory.reset(NewHashSkipListRepFactory()); DestroyAndReopen(&options); CreateAndReopenWithCF({"pikachu"}, &options); diff --git a/db/memtable.cc b/db/memtable.cc index 3c2ee01fb..b29f9339c 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -33,7 +33,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, : comparator_(cmp), refs_(0), arena_(options.arena_block_size), - table_(options.memtable_factory->CreateMemTableRep(comparator_, &arena_)), + table_(options.memtable_factory->CreateMemTableRep( + comparator_, &arena_, options.prefix_extractor.get())), flush_in_progress_(false), flush_completed_(false), file_number_(0), @@ -41,7 +42,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, mem_next_logfile_number_(0), locks_(options.inplace_update_support ? options.inplace_update_num_locks : 0), - prefix_extractor_(options.prefix_extractor) { + prefix_extractor_(options.prefix_extractor.get()) { if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits, options.memtable_prefix_bloom_probes)); diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index 85d047809..6a3d81aa5 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -44,7 +44,6 @@ class PlainTableDBTest { DB* db_; Options last_options_; - static std::unique_ptr prefix_transform; public: PlainTableDBTest() : env_(Env::Default()) { @@ -66,7 +65,7 @@ class PlainTableDBTest { Options CurrentOptions() { Options options; options.table_factory.reset(NewPlainTableFactory(16, 2, 0.8, 3)); - options.prefix_extractor = prefix_transform.get(); + options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.allow_mmap_reads = true; return options; } @@ -173,9 +172,6 @@ class PlainTableDBTest { } }; -std::unique_ptr PlainTableDBTest::prefix_transform( - NewFixedPrefixTransform(8)); - TEST(PlainTableDBTest, Empty) { ASSERT_TRUE(dbfull() != nullptr); ASSERT_EQ("NOT_FOUND", Get("0000000000000foo")); diff --git a/db/prefix_test.cc b/db/prefix_test.cc index c43ba5c4d..0f2c54a59 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -161,16 +161,15 @@ class PrefixTest { // skip some options option_config_++; if (option_config_ < kEnd) { - auto prefix_extractor = NewFixedPrefixTransform(8); - options.prefix_extractor = prefix_extractor; + options.prefix_extractor.reset(NewFixedPrefixTransform(8)); switch(option_config_) { case kHashSkipList: - options.memtable_factory.reset(NewHashSkipListRepFactory( - options.prefix_extractor, bucket_count, FLAGS_skiplist_height)); + options.memtable_factory.reset( + NewHashSkipListRepFactory(bucket_count, FLAGS_skiplist_height)); return true; case kHashLinkList: - options.memtable_factory.reset(NewHashLinkListRepFactory( - options.prefix_extractor, bucket_count)); + options.memtable_factory.reset( + NewHashLinkListRepFactory(bucket_count)); return true; default: return false; diff --git a/db/skiplist.h b/db/skiplist.h index e4a253bcc..751f7c3ec 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -158,7 +158,7 @@ class SkipList { // Implementation details follow template -struct SkipList::Node { +struct SkipList::Node { explicit Node(const Key& k) : key(k) { } Key const key; @@ -194,43 +194,43 @@ struct SkipList::Node { }; template -typename SkipList::Node* -SkipList::NewNode(const Key& key, int height) { +typename SkipList::Node* +SkipList::NewNode(const Key& key, int height) { char* mem = arena_->AllocateAligned( sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1)); return new (mem) Node(key); } template -inline SkipList::Iterator::Iterator(const SkipList* list) { +inline SkipList::Iterator::Iterator(const SkipList* list) { SetList(list); } template -inline void SkipList::Iterator::SetList(const SkipList* list) { +inline void SkipList::Iterator::SetList(const SkipList* list) { list_ = list; node_ = nullptr; } template -inline bool SkipList::Iterator::Valid() const { +inline bool SkipList::Iterator::Valid() const { return node_ != nullptr; } template -inline const Key& SkipList::Iterator::key() const { +inline const Key& SkipList::Iterator::key() const { assert(Valid()); return node_->key; } template -inline void SkipList::Iterator::Next() { +inline void SkipList::Iterator::Next() { assert(Valid()); node_ = node_->Next(0); } template -inline void SkipList::Iterator::Prev() { +inline void SkipList::Iterator::Prev() { // Instead of using explicit "prev" links, we just search for the // last node that falls before key. assert(Valid()); @@ -241,17 +241,17 @@ inline void SkipList::Iterator::Prev() { } template -inline void SkipList::Iterator::Seek(const Key& target) { +inline void SkipList::Iterator::Seek(const Key& target) { node_ = list_->FindGreaterOrEqual(target, nullptr); } template -inline void SkipList::Iterator::SeekToFirst() { +inline void SkipList::Iterator::SeekToFirst() { node_ = list_->head_->Next(0); } template -inline void SkipList::Iterator::SeekToLast() { +inline void SkipList::Iterator::SeekToLast() { node_ = list_->FindLast(); if (node_ == list_->head_) { node_ = nullptr; @@ -259,7 +259,7 @@ inline void SkipList::Iterator::SeekToLast() { } template -int SkipList::RandomHeight() { +int SkipList::RandomHeight() { // Increase height with probability 1 in kBranching int height = 1; while (height < kMaxHeight_ && ((rnd_.Next() % kBranching_) == 0)) { @@ -271,14 +271,14 @@ int SkipList::RandomHeight() { } template -bool SkipList::KeyIsAfterNode(const Key& key, Node* n) const { +bool SkipList::KeyIsAfterNode(const Key& key, Node* n) const { // nullptr n is considered infinite return (n != nullptr) && (compare_(n->key, key) < 0); } template -typename SkipList::Node* SkipList::FindGreaterOrEqual(const Key& key, Node** prev) - const { +typename SkipList::Node* SkipList:: + FindGreaterOrEqual(const Key& key, Node** prev) const { // Use prev as an optimization hint and fallback to slow path if (prev && !KeyIsAfterNode(key, prev[0]->Next(0))) { Node* x = prev[0]; @@ -315,8 +315,8 @@ typename SkipList::Node* SkipList::FindGreaterOr } template -typename SkipList::Node* -SkipList::FindLessThan(const Key& key) const { +typename SkipList::Node* +SkipList::FindLessThan(const Key& key) const { Node* x = head_; int level = GetMaxHeight() - 1; while (true) { @@ -336,7 +336,7 @@ SkipList::FindLessThan(const Key& key) const { } template -typename SkipList::Node* SkipList::FindLast() +typename SkipList::Node* SkipList::FindLast() const { Node* x = head_; int level = GetMaxHeight() - 1; @@ -356,7 +356,7 @@ typename SkipList::Node* SkipList::FindLast() } template -SkipList::SkipList(Comparator cmp, Arena* arena, +SkipList::SkipList(const Comparator cmp, Arena* arena, int32_t max_height, int32_t branching_factor) : kMaxHeight_(max_height), @@ -380,7 +380,7 @@ SkipList::SkipList(Comparator cmp, Arena* arena, } template -void SkipList::Insert(const Key& key) { +void SkipList::Insert(const Key& key) { // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() // here since Insert() is externally synchronized. Node* x = FindGreaterOrEqual(key, prev_); @@ -417,7 +417,7 @@ void SkipList::Insert(const Key& key) { } template -bool SkipList::Contains(const Key& key) const { +bool SkipList::Contains(const Key& key) const { Node* x = FindGreaterOrEqual(key, nullptr); if (x != nullptr && Equal(key, x->key)) { return true; diff --git a/db/tailing_iter.cc b/db/tailing_iter.cc index dda8c3a99..cd0335fef 100644 --- a/db/tailing_iter.cc +++ b/db/tailing_iter.cc @@ -159,7 +159,7 @@ bool TailingIterator::IsCurrentVersion() const { } bool TailingIterator::IsSamePrefix(const Slice& target) const { - const SliceTransform* extractor = cfd_->options()->prefix_extractor; + const SliceTransform* extractor = cfd_->options()->prefix_extractor.get(); assert(extractor); assert(is_prev_set_); diff --git a/db/version_set.cc b/db/version_set.cc index 5d959070a..49a260007 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2505,6 +2505,8 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { Iterator* VersionSet::MakeInputIterator(Compaction* c) { ReadOptions options; + options.verify_checksums = + c->column_family_data()->options()->verify_checksums_in_compaction; options.fill_cache = false; // Level-0 files have to be merged together. For other levels, diff --git a/db/version_set.h b/db/version_set.h index 4546c91c6..c2b0a8f82 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -41,13 +41,15 @@ namespace log { class Writer; } class Compaction; class CompactionPicker; class Iterator; +class LogBuffer; +class LookupKey; class MemTable; class Version; class VersionSet; class MergeContext; class ColumnFamilyData; class ColumnFamilySet; -class LookupKey; +class TableCache; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 9cd5bf47b..4d983619e 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -575,26 +575,6 @@ class Logger { InfoLogLevel log_level_; }; -// A class to buffer info log entries and flush them in the end. -class LogBuffer { - public: - // log_level: the log level for all the logs - // info_log: logger to write the logs to - LogBuffer(const InfoLogLevel log_level, const shared_ptr& info_log); - ~LogBuffer(); - - // Add a log entry to the buffer. - void AddLogToBuffer(const char* format, va_list ap); - - // Flush all buffered log to the info log. - void FlushBufferToLog() const; - - private: - struct Rep; - Rep* rep_; - const InfoLogLevel log_level_; - const shared_ptr& info_log_; -}; // Identifies a locked file. class FileLock { @@ -607,10 +587,6 @@ class FileLock { void operator=(const FileLock&); }; -// Add log to the LogBuffer for a delayed info logging. It can be used when -// we want to add some logs inside a mutex. -extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); - extern void LogFlush(const shared_ptr& info_log); extern void Log(const InfoLogLevel log_level, diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 428f27d4e..6c65bdc3f 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -160,8 +160,8 @@ class MemTableRep { class MemTableRepFactory { public: virtual ~MemTableRepFactory() {} - virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&, - Arena*) = 0; + virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&, + Arena*, const SliceTransform*) = 0; virtual const char* Name() const = 0; }; @@ -178,8 +178,9 @@ class VectorRepFactory : public MemTableRepFactory { public: explicit VectorRepFactory(size_t count = 0) : count_(count) { } - virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&, - Arena*) override; + virtual MemTableRep* CreateMemTableRep( + const MemTableRep::KeyComparator&, Arena*, + const SliceTransform*) override; virtual const char* Name() const override { return "VectorRepFactory"; } @@ -188,8 +189,9 @@ class VectorRepFactory : public MemTableRepFactory { // This uses a skip list to store keys. It is the default. class SkipListFactory : public MemTableRepFactory { public: - virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&, - Arena*) override; + virtual MemTableRep* CreateMemTableRep( + const MemTableRep::KeyComparator&, Arena*, + const SliceTransform*) override; virtual const char* Name() const override { return "SkipListFactory"; } @@ -202,8 +204,8 @@ class SkipListFactory : public MemTableRepFactory { // skiplist_branching_factor: probabilistic size ratio between adjacent // link lists in the skiplist extern MemTableRepFactory* NewHashSkipListRepFactory( - const SliceTransform* transform, size_t bucket_count = 1000000, - int32_t skiplist_height = 4, int32_t skiplist_branching_factor = 4 + size_t bucket_count = 1000000, int32_t skiplist_height = 4, + int32_t skiplist_branching_factor = 4 ); // The factory is to create memtables with a hashed linked list: @@ -211,6 +213,6 @@ extern MemTableRepFactory* NewHashSkipListRepFactory( // linked list (null if the bucket is empty). // bucket_count: number of fixed array buckets extern MemTableRepFactory* NewHashLinkListRepFactory( - const SliceTransform* transform, size_t bucket_count = 50000); + size_t bucket_count = 50000); } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8ca9a90a8..c80ac2ed1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -237,7 +237,7 @@ struct ColumnFamilyOptions { // 4) prefix(prefix(key)) == prefix(key) // // Default: nullptr - const SliceTransform* prefix_extractor; + std::shared_ptr prefix_extractor; // If true, place whole keys in the filter (not just prefixes). // This must generally be true for gets to be efficient. @@ -385,6 +385,11 @@ struct ColumnFamilyOptions { // The compaction style. Default: kCompactionStyleLevel CompactionStyle compaction_style; + // If true, compaction will verify checksum on every read that happens + // as part of compaction + // Default: true + bool verify_checksums_in_compaction; + // The options needed to support Universal Style compactions CompactionOptionsUniversal compaction_options_universal; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index d076f6f76..dcd82f663 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -124,6 +124,7 @@ enum Tickers { NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, NUMBER_SUPERVERSION_ACQUIRES, NUMBER_SUPERVERSION_RELEASES, + NUMBER_SUPERVERSION_CLEANUPS, TICKER_ENUM_MAX }; @@ -181,6 +182,7 @@ const std::vector> TickersNameMap = { "rocksdb.number.direct.load.table.properties"}, {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"}, {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, + {NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"}, }; /** diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index abf05978c..22a75ac34 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -55,6 +55,8 @@ struct BackupableDBOptions { // Default: false bool destroy_old_data; + void Dump(Logger* logger) const; + explicit BackupableDBOptions(const std::string& _backup_dir, Env* _backup_env = nullptr, bool _share_table_files = true, @@ -62,6 +64,7 @@ struct BackupableDBOptions { bool _destroy_old_data = false) : backup_dir(_backup_dir), backup_env(_backup_env), + share_table_files(_share_table_files), info_log(_info_log), sync(_sync), destroy_old_data(_destroy_old_data) {} diff --git a/table/filter_block.cc b/table/filter_block.cc index 7d1bfccaa..3651a7d02 100644 --- a/table/filter_block.cc +++ b/table/filter_block.cc @@ -24,7 +24,7 @@ static const size_t kFilterBase = 1 << kFilterBaseLg; FilterBlockBuilder::FilterBlockBuilder(const Options& opt, const Comparator* internal_comparator) : policy_(opt.filter_policy), - prefix_extractor_(opt.prefix_extractor), + prefix_extractor_(opt.prefix_extractor.get()), whole_key_filtering_(opt.whole_key_filtering), comparator_(internal_comparator) {} @@ -133,7 +133,7 @@ void FilterBlockBuilder::GenerateFilter() { FilterBlockReader::FilterBlockReader( const Options& opt, const Slice& contents, bool delete_contents_after_use) : policy_(opt.filter_policy), - prefix_extractor_(opt.prefix_extractor), + prefix_extractor_(opt.prefix_extractor.get()), whole_key_filtering_(opt.whole_key_filtering), data_(nullptr), offset_(nullptr), diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index 0d070a14e..ab86521f2 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -240,8 +240,8 @@ int main(int argc, char** argv) { rocksdb::TableFactory* tf = new rocksdb::BlockBasedTableFactory(); rocksdb::Options options; if (FLAGS_prefix_len < 16) { - options.prefix_extractor = rocksdb::NewFixedPrefixTransform( - FLAGS_prefix_len); + options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform( + FLAGS_prefix_len)); } rocksdb::ReadOptions ro; rocksdb::EnvOptions env_options; @@ -254,8 +254,8 @@ int main(int argc, char** argv) { env_options.use_mmap_reads = true; tf = new rocksdb::PlainTableFactory(16, (FLAGS_prefix_len == 16) ? 0 : 8, 0.75); - options.prefix_extractor = rocksdb::NewFixedPrefixTransform( - FLAGS_prefix_len); + options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform( + FLAGS_prefix_len)); } else { tf = new rocksdb::BlockBasedTableFactory(); } diff --git a/table/table_test.cc b/table/table_test.cc index c37cc81eb..aa874eb18 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -697,7 +697,7 @@ class Harness { case PLAIN_TABLE_SEMI_FIXED_PREFIX: support_prev_ = false; only_support_prefix_seek_ = true; - options_.prefix_extractor = prefix_transform.get(); + options_.prefix_extractor.reset(new FixedOrLessPrefixTransform(2)); options_.allow_mmap_reads = true; options_.table_factory.reset(NewPlainTableFactory()); constructor_ = new TableConstructor(options_.comparator, true, true); @@ -707,7 +707,7 @@ class Harness { case PLAIN_TABLE_FULL_STR_PREFIX: support_prev_ = false; only_support_prefix_seek_ = true; - options_.prefix_extractor = noop_transform.get(); + options_.prefix_extractor.reset(NewNoopTransform()); options_.allow_mmap_reads = true; options_.table_factory.reset(NewPlainTableFactory()); constructor_ = new TableConstructor(options_.comparator, true, true); @@ -920,15 +920,8 @@ class Harness { bool support_prev_; bool only_support_prefix_seek_; shared_ptr internal_comparator_; - static std::unique_ptr noop_transform; - static std::unique_ptr prefix_transform; }; -std::unique_ptr Harness::noop_transform( - NewNoopTransform()); -std::unique_ptr Harness::prefix_transform( - new FixedOrLessPrefixTransform(2)); - static bool Between(uint64_t val, uint64_t low, uint64_t high) { bool result = (val >= low) && (val <= high); if (!result) { diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 9bb581a5b..4d02bcdc5 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -688,9 +688,6 @@ class StressTest { filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), - prefix_extractor_(NewFixedPrefixTransform( - FLAGS_test_batches_snapshots ? - sizeof(long) : sizeof(long)-1)), db_(nullptr), num_times_reopened_(0) { if (FLAGS_destroy_db_initially) { @@ -708,7 +705,6 @@ class StressTest { ~StressTest() { delete db_; delete filter_policy_; - delete prefix_extractor_; } void Run() { @@ -1373,7 +1369,7 @@ class StressTest { static_cast(FLAGS_compaction_style); options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; - options.prefix_extractor = prefix_extractor_; + options.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size)); options.max_open_files = FLAGS_open_files; options.statistics = dbstats; options.env = FLAGS_env; @@ -1405,16 +1401,13 @@ class StressTest { } switch (FLAGS_rep_factory) { case kHashSkipList: - options.memtable_factory.reset(NewHashSkipListRepFactory( - NewFixedPrefixTransform(FLAGS_prefix_size))); + options.memtable_factory.reset(NewHashSkipListRepFactory()); break; case kSkipList: // no need to do anything break; case kVectorRep: - options.memtable_factory.reset( - new VectorRepFactory() - ); + options.memtable_factory.reset(new VectorRepFactory()); break; } static Random purge_percent(1000); // no benefit from non-determinism here @@ -1488,7 +1481,6 @@ class StressTest { shared_ptr cache_; shared_ptr compressed_cache_; const FilterPolicy* filter_policy_; - const SliceTransform* prefix_extractor_; DB* db_; StackableDB* sdb_; int num_times_reopened_; diff --git a/tools/ldb_test.py b/tools/ldb_test.py index fe9a6c60a..4ee3d7fef 100644 --- a/tools/ldb_test.py +++ b/tools/ldb_test.py @@ -205,10 +205,12 @@ class LDBTestCase(unittest.TestCase): def testTtlPutGet(self): print "Running testTtlPutGet..." self.assertRunOK("put a1 b1 --ttl --create_if_missing", "OK") - self.assertRunOK("scan ", "a1 : b1", True) + self.assertRunOK("scan --hex", "0x6131 : 0x6231", True) self.assertRunOK("dump --ttl ", "a1 ==> b1", True) + self.assertRunOK("dump --hex --ttl ", + "0x6131 ==> 0x6231\nKeys in range: 1") self.assertRunOK("scan --hex --ttl", "0x6131 : 0x6231") - self.assertRunOK("get a1", "b1", True) + self.assertRunOK("get --value_hex a1", "0x6231", True) self.assertRunOK("get --ttl a1", "b1") self.assertRunOK("put a3 b3 --create_if_missing", "OK") # fails because timstamp's length is greater than value's diff --git a/tools/sst_dump.cc b/tools/sst_dump.cc index 7abcb2e5a..b34b7fa82 100644 --- a/tools/sst_dump.cc +++ b/tools/sst_dump.cc @@ -130,7 +130,7 @@ Status SstFileReader::SetTableOptionsByMagicNumber(uint64_t table_magic_number, options_.allow_mmap_reads = true; options_.table_factory = std::make_shared( table_properties->fixed_key_len, 2, 0.8); - options_.prefix_extractor = NewNoopTransform(); + options_.prefix_extractor.reset(NewNoopTransform()); fprintf(stdout, "Sst file format: plain table\n"); } else { char error_msg_buffer[80]; diff --git a/util/env.cc b/util/env.cc index 573176e6e..83a831f46 100644 --- a/util/env.cc +++ b/util/env.cc @@ -31,82 +31,9 @@ WritableFile::~WritableFile() { Logger::~Logger() { } -// One log entry with its timestamp -struct BufferedLog { - struct timeval now_tv; // Timestamp of the log - char message[1]; // Beginning of log message -}; - -struct LogBuffer::Rep { - Arena arena_; - autovector logs_; -}; - -// Lazily initialize Rep to avoid allocations when new log is added. -LogBuffer::LogBuffer(const InfoLogLevel log_level, - const shared_ptr& info_log) - : rep_(nullptr), log_level_(log_level), info_log_(info_log) {} - -LogBuffer::~LogBuffer() { delete rep_; } - -void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { - if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { - // Skip the level because of its level. - return; - } - if (rep_ == nullptr) { - rep_ = new Rep(); - } - - const size_t kLogSizeLimit = 512; - char* alloc_mem = rep_->arena_.AllocateAligned(kLogSizeLimit); - BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); - char* p = buffered_log->message; - char* limit = alloc_mem + kLogSizeLimit - 1; - - // store the time - gettimeofday(&(buffered_log->now_tv), nullptr); - - // Print the message - if (p < limit) { - va_list backup_ap; - va_copy(backup_ap, ap); - p += vsnprintf(p, limit - p, format, backup_ap); - va_end(backup_ap); - } - - // Add '\0' to the end - *p = '\0'; - - rep_->logs_.push_back(buffered_log); -} - -void LogBuffer::FlushBufferToLog() const { - if (rep_ != nullptr) { - for (BufferedLog* log : rep_->logs_) { - const time_t seconds = log->now_tv.tv_sec; - struct tm t; - localtime_r(&seconds, &t); - Log(log_level_, info_log_, - "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", - t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, - t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); - } - } -} - FileLock::~FileLock() { } -void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { - if (log_buffer != nullptr) { - va_list ap; - va_start(ap, format); - log_buffer->AddLogToBuffer(format, ap); - va_end(ap); - } -} - void LogFlush(Logger *info_log) { if (info_log) { info_log->Flush(); diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 4db624975..e09052a3d 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -55,7 +55,7 @@ private: class HashLinkListRep : public MemTableRep { public: - HashLinkListRep(MemTableRep::KeyComparator& compare, Arena* arena, + HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size); virtual void Insert(const char* key) override; @@ -81,7 +81,7 @@ class HashLinkListRep : public MemTableRep { private: friend class DynamicIterator; - typedef SkipList FullList; + typedef SkipList FullList; size_t bucket_size_; @@ -92,7 +92,7 @@ class HashLinkListRep : public MemTableRep { // The user-supplied transform whose domain is the user keys. const SliceTransform* transform_; - MemTableRep::KeyComparator& compare_; + const MemTableRep::KeyComparator& compare_; // immutable after construction Arena* const arena_; @@ -314,7 +314,7 @@ class HashLinkListRep : public MemTableRep { }; }; -HashLinkListRep::HashLinkListRep(MemTableRep::KeyComparator& compare, +HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size) : bucket_size_(bucket_size), @@ -475,13 +475,13 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head, } // anon namespace MemTableRep* HashLinkListRepFactory::CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) { - return new HashLinkListRep(compare, arena, transform_, bucket_count_); + const MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform* transform) { + return new HashLinkListRep(compare, arena, transform, bucket_count_); } -MemTableRepFactory* NewHashLinkListRepFactory( - const SliceTransform* transform, size_t bucket_count) { - return new HashLinkListRepFactory(transform, bucket_count); +MemTableRepFactory* NewHashLinkListRepFactory(size_t bucket_count) { + return new HashLinkListRepFactory(bucket_count); } } // namespace rocksdb diff --git a/util/hash_linklist_rep.h b/util/hash_linklist_rep.h index efa9d8f2e..11fb7467f 100644 --- a/util/hash_linklist_rep.h +++ b/util/hash_linklist_rep.h @@ -14,25 +14,20 @@ namespace rocksdb { class HashLinkListRepFactory : public MemTableRepFactory { public: - explicit HashLinkListRepFactory( - const SliceTransform* transform, - size_t bucket_count) - : transform_(transform), - bucket_count_(bucket_count) { } + explicit HashLinkListRepFactory(size_t bucket_count) + : bucket_count_(bucket_count) { } - virtual ~HashLinkListRepFactory() { delete transform_; } + virtual ~HashLinkListRepFactory() {} - virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator& compare, - Arena* arena) override; + virtual MemTableRep* CreateMemTableRep( + const MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform* transform) override; virtual const char* Name() const override { return "HashLinkListRepFactory"; } - const SliceTransform* GetTransform() { return transform_; } - private: - const SliceTransform* transform_; const size_t bucket_count_; }; diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index 61da5ae41..307e19838 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -21,7 +21,7 @@ namespace { class HashSkipListRep : public MemTableRep { public: - HashSkipListRep(MemTableRep::KeyComparator& compare, Arena* arena, + HashSkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor); @@ -48,7 +48,7 @@ class HashSkipListRep : public MemTableRep { private: friend class DynamicIterator; - typedef SkipList Bucket; + typedef SkipList Bucket; size_t bucket_size_; @@ -62,7 +62,7 @@ class HashSkipListRep : public MemTableRep { // The user-supplied transform whose domain is the user keys. const SliceTransform* transform_; - MemTableRep::KeyComparator& compare_; + const MemTableRep::KeyComparator& compare_; // immutable after construction Arena* const arena_; @@ -221,7 +221,7 @@ class HashSkipListRep : public MemTableRep { }; }; -HashSkipListRep::HashSkipListRep(MemTableRep::KeyComparator& compare, +HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor) @@ -321,16 +321,17 @@ MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator() { } // anon namespace MemTableRep* HashSkipListRepFactory::CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) { - return new HashSkipListRep(compare, arena, transform_, bucket_count_, + const MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform* transform) { + return new HashSkipListRep(compare, arena, transform, bucket_count_, skiplist_height_, skiplist_branching_factor_); } MemTableRepFactory* NewHashSkipListRepFactory( - const SliceTransform* transform, size_t bucket_count, - int32_t skiplist_height, int32_t skiplist_branching_factor) { - return new HashSkipListRepFactory(transform, bucket_count, - skiplist_height, skiplist_branching_factor); + size_t bucket_count, int32_t skiplist_height, + int32_t skiplist_branching_factor) { + return new HashSkipListRepFactory(bucket_count, skiplist_height, + skiplist_branching_factor); } } // namespace rocksdb diff --git a/util/hash_skiplist_rep.h b/util/hash_skiplist_rep.h index 1ea844eda..abf4a68cd 100644 --- a/util/hash_skiplist_rep.h +++ b/util/hash_skiplist_rep.h @@ -15,28 +15,24 @@ namespace rocksdb { class HashSkipListRepFactory : public MemTableRepFactory { public: explicit HashSkipListRepFactory( - const SliceTransform* transform, size_t bucket_count, int32_t skiplist_height, int32_t skiplist_branching_factor) - : transform_(transform), - bucket_count_(bucket_count), + : bucket_count_(bucket_count), skiplist_height_(skiplist_height), skiplist_branching_factor_(skiplist_branching_factor) { } - virtual ~HashSkipListRepFactory() { delete transform_; } + virtual ~HashSkipListRepFactory() {} - virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator& compare, - Arena* arena) override; + virtual MemTableRep* CreateMemTableRep( + const MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform* transform) override; virtual const char* Name() const override { return "HashSkipListRepFactory"; } - const SliceTransform* GetTransform() { return transform_; } - private: - const SliceTransform* transform_; const size_t bucket_count_; const int32_t skiplist_height_; const int32_t skiplist_branching_factor_; diff --git a/util/log_buffer.cc b/util/log_buffer.cc new file mode 100644 index 000000000..f27d62126 --- /dev/null +++ b/util/log_buffer.cc @@ -0,0 +1,67 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/log_buffer.h" + +#include + +namespace rocksdb { + +LogBuffer::LogBuffer(const InfoLogLevel log_level, + Logger*info_log) + : log_level_(log_level), info_log_(info_log) {} + +void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { + if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { + // Skip the level because of its level. + return; + } + + const size_t kLogSizeLimit = 512; + char* alloc_mem = arena_.AllocateAligned(kLogSizeLimit); + BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); + char* p = buffered_log->message; + char* limit = alloc_mem + kLogSizeLimit - 1; + + // store the time + gettimeofday(&(buffered_log->now_tv), nullptr); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Add '\0' to the end + *p = '\0'; + + logs_.push_back(buffered_log); +} + +void LogBuffer::FlushBufferToLog() { + for (BufferedLog* log : logs_) { + const time_t seconds = log->now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + Log(log_level_, info_log_, + "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, + t.tm_sec, static_cast(log->now_tv.tv_usec), log->message); + } + logs_.clear(); +} + +void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(format, ap); + va_end(ap); + } +} + +} // namespace rocksdb diff --git a/util/log_buffer.h b/util/log_buffer.h new file mode 100644 index 000000000..76503a084 --- /dev/null +++ b/util/log_buffer.h @@ -0,0 +1,46 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include "rocksdb/env.h" +#include "util/arena.h" +#include "util/autovector.h" + +namespace rocksdb { + +class Logger; + +// A class to buffer info log entries and flush them in the end. +class LogBuffer { + public: + // log_level: the log level for all the logs + // info_log: logger to write the logs to + LogBuffer(const InfoLogLevel log_level, Logger* info_log); + + // Add a log entry to the buffer. + void AddLogToBuffer(const char* format, va_list ap); + + // Flush all buffered log to the info log. + void FlushBufferToLog(); + + private: + // One log entry with its timestamp + struct BufferedLog { + struct timeval now_tv; // Timestamp of the log + char message[1]; // Beginning of log message + }; + + const InfoLogLevel log_level_; + Logger* info_log_; + Arena arena_; + autovector logs_; +}; + +// Add log to the LogBuffer for a delayed info logging. It can be used when +// we want to add some logs inside a mutex. +extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); + +} // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index cfa952dfb..d2480ae5e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -67,6 +67,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() purge_redundant_kvs_while_flush(true), block_size_deviation(10), compaction_style(kCompactionStyleLevel), + verify_checksums_in_compaction(true), filter_deletes(false), max_sequential_skip_in_iterations(8), memtable_factory(std::shared_ptr(new SkipListFactory)), @@ -126,6 +127,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), block_size_deviation(options.block_size_deviation), compaction_style(options.compaction_style), + verify_checksums_in_compaction(options.verify_checksums_in_compaction), compaction_options_universal(options.compaction_options_universal), filter_deletes(options.filter_deletes), max_sequential_skip_in_iterations( @@ -379,6 +381,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { block_size_deviation); Log(log," Options.filter_deletes: %d", filter_deletes); + Log(log, " Options.verify_checksums_in_compaction: %d", + verify_checksums_in_compaction); Log(log," Options.compaction_style: %d", compaction_style); Log(log," Options.compaction_options_universal.size_ratio: %u", diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index ab77e7f3a..e78e760e9 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -10,11 +10,11 @@ namespace rocksdb { namespace { class SkipListRep : public MemTableRep { - SkipList skip_list_; + SkipList skip_list_; public: - explicit SkipListRep(MemTableRep::KeyComparator& compare, Arena* arena) + explicit SkipListRep(const MemTableRep::KeyComparator& compare, Arena* arena) : skip_list_(compare, arena) { -} + } // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. @@ -47,12 +47,12 @@ public: // Iteration over the contents of a skip list class Iterator : public MemTableRep::Iterator { - SkipList::Iterator iter_; + SkipList::Iterator iter_; public: // Initialize an iterator over the specified list. // The returned iterator is not valid. explicit Iterator( - const SkipList* list + const SkipList* list ) : iter_(list) { } virtual ~Iterator() override { } @@ -115,7 +115,8 @@ public: } MemTableRep* SkipListFactory::CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) { + const MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform*) { return new SkipListRep(compare, arena); } diff --git a/util/vectorrep.cc b/util/vectorrep.cc index e0f3d69b0..3777f7ffe 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -271,7 +271,8 @@ MemTableRep::Iterator* VectorRep::GetIterator() { } // anon namespace MemTableRep* VectorRepFactory::CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) { + const MemTableRep::KeyComparator& compare, Arena* arena, + const SliceTransform*) { return new VectorRep(compare, arena, count_); } } // namespace rocksdb diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 99bf1a2d9..294cc3a87 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -26,6 +26,17 @@ namespace rocksdb { +void BackupableDBOptions::Dump(Logger* logger) const { + Log(logger, " Options.backup_dir: %s", backup_dir.c_str()); + Log(logger, " Options.backup_env: %p", backup_env); + Log(logger, "Options.share_table_files: %d", + static_cast(share_table_files)); + Log(logger, " Options.info_log: %p", info_log); + Log(logger, " Options.sync: %d", static_cast(sync)); + Log(logger, " Options.destroy_old_data: %d", + static_cast(destroy_old_data)); +} + // -------- BackupEngineImpl class --------- class BackupEngineImpl : public BackupEngine { public: @@ -205,6 +216,8 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_) { + options_.Dump(options_.info_log); + // create all the dirs we need backup_env_->CreateDirIfMissing(GetAbsolutePath()); backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_);