Merge branch 'master' into performance

Conflicts:
	Makefile
	db/db_impl.cc
	db/db_test.cc
	db/memtable_list.cc
	db/memtable_list.h
	table/block_based_table_reader.cc
	table/table_test.cc
	util/cache.cc
	util/coding.cc
main
kailiu 11 years ago
commit a5e220f5ef
  1. 5
      Makefile
  2. 34
      db/compaction_picker.cc
  3. 5
      db/compaction_picker.h
  4. 2
      db/db_bench.cc
  5. 291
      db/db_impl.cc
  6. 47
      db/db_impl.h
  7. 5
      db/db_impl_readonly.h
  8. 125
      db/db_test.cc
  9. 70
      db/log_reader.cc
  10. 12
      db/log_reader.h
  11. 179
      db/log_test.cc
  12. 154
      db/memtable_list.cc
  13. 93
      db/memtable_list.h
  14. 2
      db/prefix_filter_iterator.h
  15. 1
      db/skiplist.h
  16. 175
      db/tailing_iter.cc
  17. 88
      db/tailing_iter.h
  18. 98
      db/version_set.cc
  19. 54
      db/version_set.h
  20. 77
      db/version_set_reduce_num_levels.cc
  21. 8
      hdfs/env_hdfs.h
  22. 11
      helpers/memenv/memenv.cc
  23. 9
      include/rocksdb/cache.h
  24. 6
      include/rocksdb/db.h
  25. 24
      include/rocksdb/env.h
  26. 6
      include/rocksdb/memtablerep.h
  27. 21
      include/rocksdb/options.h
  28. 6
      include/rocksdb/universal_compaction.h
  29. 6
      include/utilities/stackable_db.h
  30. 6
      table/block_based_table_factory.cc
  31. 36
      table/block_based_table_factory.h
  32. 31
      table/block_based_table_options.h
  33. 56
      table/block_based_table_reader.cc
  34. 8
      table/block_based_table_reader.h
  35. 5
      table/merger.h
  36. 1
      table/meta_blocks.h
  37. 5
      table/plain_table_reader.h
  38. 33
      table/table_test.cc
  39. 2
      tools/db_stress.cc
  40. 5
      util/cache.cc
  41. 3
      util/coding.cc
  42. 2
      util/coding.h
  43. 5
      util/env_hdfs.cc
  44. 30
      util/env_posix.cc
  45. 17
      util/ldb_cmd.cc
  46. 4
      util/options.cc
  47. 201
      utilities/backupable/backupable_db.cc
  48. 115
      utilities/backupable/backupable_db_test.cc

@ -134,13 +134,12 @@ endif # PLATFORM_SHARED_EXT
all: $(LIBRARY) $(PROGRAMS)
.PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \
release tags valgrind_check whitebox_crash_test format
release tags valgrind_check whitebox_crash_test format shared_lib
# Will also generate shared libraries.
release:
$(MAKE) clean
OPT="-DNDEBUG -O2" $(MAKE) all -j32
OPT="-DNDEBUG -O2" $(MAKE) $(SHARED) -j32
coverage:
$(MAKE) clean
@ -200,6 +199,8 @@ tags:
format:
build_tools/format-diff.sh
shared_lib: $(SHARED)
# ---------------------------------------------------------------------------
# Unit tests and tools
# ---------------------------------------------------------------------------

@ -8,6 +8,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction_picker.h"
#include <limits>
#include "util/statistics.h"
namespace rocksdb {
@ -22,6 +24,21 @@ uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum;
}
// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
if (op1 == 0) {
return 0;
}
if (op2 <= 0) {
return op1;
}
uint64_t casted_op2 = (uint64_t) op2;
if (std::numeric_limits<uint64_t>::max() / op1 < casted_op2) {
return op1;
}
return op1 * casted_op2;
}
} // anonymous namespace
CompactionPicker::CompactionPicker(const Options* options,
@ -30,15 +47,7 @@ CompactionPicker::CompactionPicker(const Options* options,
options_(options),
num_levels_(options->num_levels),
icmp_(icmp) {
Init();
}
void CompactionPicker::ReduceNumberOfLevels(int new_levels) {
num_levels_ = new_levels;
Init();
}
void CompactionPicker::Init() {
max_file_size_.reset(new uint64_t[NumberLevels()]);
level_max_bytes_.reset(new uint64_t[NumberLevels()]);
int target_file_size_multiplier = options_->target_file_size_multiplier;
@ -48,10 +57,11 @@ void CompactionPicker::Init() {
max_file_size_[i] = ULLONG_MAX;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
} else if (i > 1) {
max_file_size_[i] = max_file_size_[i - 1] * target_file_size_multiplier;
level_max_bytes_[i] =
level_max_bytes_[i - 1] * max_bytes_multiplier *
options_->max_bytes_for_level_multiplier_additional[i - 1];
max_file_size_[i] = MultiplyCheckOverflow(max_file_size_[i - 1],
target_file_size_multiplier);
level_max_bytes_[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes_[i - 1], max_bytes_multiplier),
options_->max_bytes_for_level_multiplier_additional[i - 1]);
} else {
max_file_size_[i] = options_->target_file_size_base;
level_max_bytes_[i] = options_->max_bytes_for_level_base;

@ -27,9 +27,6 @@ class CompactionPicker {
CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
virtual ~CompactionPicker();
// See VersionSet::ReduceNumberOfLevels()
void ReduceNumberOfLevels(int new_levels);
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
@ -120,8 +117,6 @@ class CompactionPicker {
const Options* const options_;
private:
void Init();
int num_levels_;
const InternalKeyComparator* const icmp_;

@ -21,6 +21,8 @@
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/perf_context.h"
#include "port/port.h"

@ -17,6 +17,7 @@
#include <stdint.h>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "db/builder.h"
@ -32,6 +33,7 @@
#include "db/prefix_filter_iterator.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/tailing_iter.h"
#include "db/transaction_log_impl.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
@ -265,8 +267,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
bg_cv_(&mutex_),
mem_rep_factory_(options_.memtable_factory.get()),
mem_(new MemTable(internal_comparator_, options_)),
imm_(options_.min_write_buffer_number_to_merge),
logfile_number_(0),
super_version_(nullptr),
super_version_number_(0),
tmp_batch_(),
bg_compaction_scheduled_(0),
bg_manual_only_(0),
@ -359,7 +363,7 @@ DBImpl::~DBImpl() {
delete mem_->Unref();
}
imm_.UnrefAll(&to_delete);
imm_.current()->Unref(&to_delete);
for (MemTable* m: to_delete) {
delete m;
}
@ -506,7 +510,7 @@ bool DBImpl::SuperVersion::Unref() {
void DBImpl::SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0);
imm.UnrefAll(&to_delete);
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
to_delete.push_back(m);
@ -514,13 +518,13 @@ void DBImpl::SuperVersion::Cleanup() {
current->Unref();
}
void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm,
void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current) {
mem = new_mem;
imm = new_imm;
current = new_current;
mem->Ref();
imm.RefAll();
imm->Ref();
current->Ref();
refs.store(1, std::memory_order_relaxed);
}
@ -894,6 +898,11 @@ Status DBImpl::Recover(bool read_only, bool error_if_log_file_exist) {
return s;
}
s = env_->NewDirectory(dbname_, &db_directory_);
if (!s.ok()) {
return s;
}
s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
@ -1187,6 +1196,9 @@ Status DBImpl::WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit,
(unsigned long) meta.number,
(unsigned long) meta.file_size,
s.ToString().c_str());
if (!options_.disableDataSync) {
db_directory_->Fsync();
}
mutex_.Lock();
}
base->Unref();
@ -1235,7 +1247,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
mutex_.AssertHeld();
assert(imm_.size() != 0);
if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
if (!imm_.IsFlushPending()) {
Log(options_.info_log, "FlushMemTableToOutputFile already in progress");
Status s = Status::IOError("FlushMemTableToOutputFile already in progress");
return s;
@ -1280,8 +1292,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Replace immutable memtable with the generated Table
s = imm_.InstallMemtableFlushResults(
mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free);
mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
if (s.ok()) {
InstallSuperVersion(deletion_state);
@ -1302,11 +1314,16 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
return s;
}
void DBImpl::CompactRange(const Slice* begin,
const Slice* end,
bool reduce_level,
int target_level) {
FlushMemTable(FlushOptions());
Status DBImpl::CompactRange(const Slice* begin,
const Slice* end,
bool reduce_level,
int target_level) {
Status s = FlushMemTable(FlushOptions());
if (!s.ok()) {
LogFlush(options_.info_log);
return s;
}
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
@ -1322,16 +1339,22 @@ void DBImpl::CompactRange(const Slice* begin,
// bottom-most level, the output level will be the same as input one
if (options_.compaction_style == kCompactionStyleUniversal ||
level == max_level_with_files) {
RunManualCompaction(level, level, begin, end);
s = RunManualCompaction(level, level, begin, end);
} else {
RunManualCompaction(level, level + 1, begin, end);
s = RunManualCompaction(level, level + 1, begin, end);
}
if (!s.ok()) {
LogFlush(options_.info_log);
return s;
}
}
if (reduce_level) {
ReFitLevel(max_level_with_files, target_level);
s = ReFitLevel(max_level_with_files, target_level);
}
LogFlush(options_.info_log);
return s;
}
// return the same level if it cannot be moved
@ -1350,7 +1373,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) {
return minimum_level;
}
void DBImpl::ReFitLevel(int level, int target_level) {
Status DBImpl::ReFitLevel(int level, int target_level) {
assert(level < NumberLevels());
SuperVersion* superversion_to_free = nullptr;
@ -1363,7 +1386,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
mutex_.Unlock();
Log(options_.info_log, "ReFitLevel: another thread is refitting");
delete new_superversion;
return;
return Status::NotSupported("another thread is refitting");
}
refitting_level_ = true;
@ -1384,6 +1407,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
assert(to_level <= level);
Status status;
if (to_level < level) {
Log(options_.info_log, "Before refitting:\n%s",
versions_->current()->DebugString().data());
@ -1397,7 +1421,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
Log(options_.info_log, "Apply version edit:\n%s",
edit.DebugString().data());
auto status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
superversion_to_free = InstallSuperVersion(new_superversion);
new_superversion = nullptr;
@ -1415,6 +1439,7 @@ void DBImpl::ReFitLevel(int level, int target_level) {
mutex_.Unlock();
delete superversion_to_free;
delete new_superversion;
return status;
}
int DBImpl::NumberLevels() {
@ -1429,6 +1454,10 @@ int DBImpl::Level0StopWriteTrigger() {
return options_.level0_stop_writes_trigger;
}
uint64_t DBImpl::CurrentVersionNumber() const {
return super_version_number_.load();
}
Status DBImpl::Flush(const FlushOptions& options) {
return FlushMemTable(options);
}
@ -1622,10 +1651,10 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path,
return status;
}
void DBImpl::RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end) {
Status DBImpl::RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end) {
assert(input_level >= 0);
InternalKey begin_storage, end_storage;
@ -1692,15 +1721,16 @@ void DBImpl::RunManualCompaction(int input_level,
assert(!manual.in_progress);
assert(bg_manual_only_ > 0);
--bg_manual_only_;
return manual.status;
}
void DBImpl::TEST_CompactRange(int level,
const Slice* begin,
const Slice* end) {
Status DBImpl::TEST_CompactRange(int level,
const Slice* begin,
const Slice* end) {
int output_level = (options_.compaction_style == kCompactionStyleUniversal)
? level
: level + 1;
RunManualCompaction(level, output_level, begin, end);
return RunManualCompaction(level, output_level, begin, end);
}
Status DBImpl::FlushMemTable(const FlushOptions& options) {
@ -1756,8 +1786,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else {
bool is_flush_pending =
imm_.IsFlushPending(options_.min_write_buffer_number_to_merge);
bool is_flush_pending = imm_.IsFlushPending();
if (is_flush_pending &&
(bg_flush_scheduled_ < options_.max_background_flushes)) {
// memtable flush needed
@ -1770,7 +1799,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// max_background_compactions hasn't been reached and, in case
// bg_manual_only_ > 0, if it's a manual compaction.
if ((manual_compaction_ ||
versions_->NeedsCompaction() ||
versions_->current()->NeedsCompaction() ||
(is_flush_pending && (options_.max_background_flushes <= 0))) &&
bg_compaction_scheduled_ < options_.max_background_compactions &&
(!bg_manual_only_ || manual_compaction_)) {
@ -1792,8 +1821,7 @@ void DBImpl::BGWorkCompaction(void* db) {
Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state) {
Status stat;
while (stat.ok() &&
imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
while (stat.ok() && imm_.IsFlushPending()) {
Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_);
@ -1913,7 +1941,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
mutex_.AssertHeld();
// TODO: remove memtable flush from formal compaction
while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
while (imm_.IsFlushPending()) {
Log(options_.info_log,
"BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
"available %d",
@ -1964,7 +1992,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->edit(), &mutex_);
status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get());
InstallSuperVersion(deletion_state);
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
@ -1999,6 +2027,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->status = status;
m->done = true;
}
// For universal compaction:
@ -2211,7 +2240,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->output_level(), out.number, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
return versions_->LogAndApply(compact->compaction->edit(), &mutex_,
db_directory_.get());
}
//
@ -2318,7 +2348,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
const uint64_t imm_start = env_->NowMicros();
LogFlush(options_.info_log);
mutex_.Lock();
if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
if (imm_.IsFlushPending()) {
FlushMemTableToOutputFile(nullptr, deletion_state);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
@ -2584,6 +2614,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
input.reset();
if (!options_.disableDataSync) {
db_directory_->Fsync();
}
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
@ -2651,8 +2684,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
namespace {
struct IterState {
port::Mutex* mu;
Version* version;
autovector<MemTable*> mem; // includes both mem_ and imm_
Version* version = nullptr;
MemTable* mem = nullptr;
MemTableListVersion* imm = nullptr;
DBImpl *db;
};
@ -2660,19 +2694,23 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
DBImpl::DeletionState deletion_state;
state->mu->Lock();
auto mems_size = state->mem.size();
for (size_t i = 0; i < mems_size; i++) {
MemTable* m = state->mem[i]->Unref();
if (state->mem) { // not set for immutable iterator
MemTable* m = state->mem->Unref();
if (m != nullptr) {
deletion_state.memtables_to_free.push_back(m);
}
}
if (state->version->Unref()) {
// fast path FindObsoleteFiles
state->db->FindObsoleteFiles(deletion_state, false, true);
if (state->version) { // not set for memtable-only iterator
state->version->Unref();
}
if (state->imm) { // not set for memtable-only iterator
state->imm->Unref(&deletion_state.memtables_to_free);
}
// fast path FindObsoleteFiles
state->db->FindObsoleteFiles(deletion_state, false, true);
state->mu->Unlock();
state->db->PurgeObsoleteFiles(deletion_state);
delete state;
}
} // namespace
@ -2681,7 +2719,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
IterState* cleanup = new IterState;
MemTable* mutable_mem;
autovector<MemTable*> immutables;
MemTableListVersion* immutable_mems;
Version* version;
// Collect together all needed child iterators for mem
@ -2690,27 +2728,22 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
mem_->Ref();
mutable_mem = mem_;
// Collect together all needed child iterators for imm_
imm_.GetMemTables(&immutables);
for (unsigned int i = 0; i < immutables.size(); i++) {
immutables[i]->Ref();
}
// Collect iterators for files in L0 - Ln
immutable_mems = imm_.current();
immutable_mems->Ref();
versions_->current()->Ref();
version = versions_->current();
mutex_.Unlock();
std::vector<Iterator*> memtables;
memtables.push_back(mutable_mem->NewIterator(options));
cleanup->mem.push_back(mutable_mem);
for (MemTable* m : immutables) {
memtables.push_back(m->NewIterator(options));
cleanup->mem.push_back(m);
}
version->AddIterators(options, storage_options_, &memtables);
std::vector<Iterator*> iterator_list;
iterator_list.push_back(mutable_mem->NewIterator(options));
cleanup->mem = mutable_mem;
cleanup->imm = immutable_mems;
// Collect all needed child iterators for immutable memtables
immutable_mems->AddIterators(options, &iterator_list);
// Collect iterators for files in L0 - Ln
version->AddIterators(options, storage_options_, &iterator_list);
Iterator* internal_iter = NewMergingIterator(
env_, &internal_comparator_, memtables.data(), memtables.size()
);
env_, &internal_comparator_, &iterator_list[0], iterator_list.size());
cleanup->version = version;
cleanup->mu = &mutex_;
cleanup->db = this;
@ -2724,6 +2757,60 @@ Iterator* DBImpl::TEST_NewInternalIterator() {
return NewInternalIterator(ReadOptions(), &ignored);
}
std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
const ReadOptions& options,
uint64_t* superversion_number) {
MemTable* mutable_mem;
MemTableListVersion* immutable_mems;
Version* version;
// get all child iterators and bump their refcounts under lock
mutex_.Lock();
mutable_mem = mem_;
mutable_mem->Ref();
immutable_mems = imm_.current();
immutable_mems->Ref();
version = versions_->current();
version->Ref();
if (superversion_number != nullptr) {
*superversion_number = CurrentVersionNumber();
}
mutex_.Unlock();
Iterator* mutable_iter = mutable_mem->NewIterator(options);
IterState* mutable_cleanup = new IterState();
mutable_cleanup->mem = mutable_mem;
mutable_cleanup->db = this;
mutable_cleanup->mu = &mutex_;
mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr);
// create a DBIter that only uses memtable content; see NewIterator()
mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
mutable_iter, kMaxSequenceNumber);
Iterator* immutable_iter;
IterState* immutable_cleanup = new IterState();
std::vector<Iterator*> list;
immutable_mems->AddIterators(options, &list);
immutable_cleanup->imm = immutable_mems;
version->AddIterators(options, storage_options_, &list);
immutable_cleanup->version = version;
immutable_cleanup->db = this;
immutable_cleanup->mu = &mutex_;
immutable_iter =
NewMergingIterator(env_, &internal_comparator_, &list[0], list.size());
immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup,
nullptr);
// create a DBIter that only uses memtable content; see NewIterator()
immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
immutable_iter, kMaxSequenceNumber);
return std::make_pair(mutable_iter, immutable_iter);
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
MutexLock l(&mutex_);
return versions_->current()->MaxNextLevelOverlappingBytes();
@ -2763,9 +2850,10 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) {
DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
SuperVersion* new_superversion) {
mutex_.AssertHeld();
new_superversion->Init(mem_, imm_, versions_->current());
new_superversion->Init(mem_, imm_.current(), versions_->current());
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
@ -2809,7 +2897,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else if (get_version->imm.Get(lkey, value, &s, merge_context, options_)) {
} else if (get_version->imm->Get(lkey, value, &s, merge_context, options_)) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else {
@ -2875,10 +2963,10 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
}
MemTable* mem = mem_;
MemTableList imm = imm_;
MemTableListVersion* imm = imm_.current();
Version* current = versions_->current();
mem->Ref();
imm.RefAll();
imm->Ref();
current->Ref();
// Unlock while reading from files and memtables
@ -2911,7 +2999,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
LookupKey lkey(keys[i], snapshot);
if (mem->Get(lkey, value, &s, merge_context, options_)) {
// Done
} else if (imm.Get(lkey, value, &s, merge_context, options_)) {
} else if (imm->Get(lkey, value, &s, merge_context, options_)) {
// Done
} else {
current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
@ -2932,7 +3020,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
MaybeScheduleFlushOrCompaction();
}
MemTable* m = mem->Unref();
imm.UnrefAll(&to_delete);
imm->Unref(&to_delete);
current->Unref();
mutex_.Unlock();
@ -2967,13 +3055,21 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* iter = NewInternalIterator(options, &latest_snapshot);
iter = NewDBIterator(
&dbname_, env_, options_, user_comparator(), iter,
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
Iterator* iter;
if (options.tailing) {
iter = new TailingIterator(this, options, user_comparator());
} else {
SequenceNumber latest_snapshot;
iter = NewInternalIterator(options, &latest_snapshot);
iter = NewDBIterator(
&dbname_, env_, options_, user_comparator(), iter,
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
}
if (options.prefix) {
// use extra wrapper to exclude any keys from the results which
// don't begin with the prefix
@ -3309,12 +3405,11 @@ Status DBImpl::MakeRoomForWrite(bool force,
RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall);
stall_level0_num_files_ += stall;
stall_level0_num_files_count_++;
} else if (
allow_hard_rate_limit_delay &&
options_.hard_rate_limit > 1.0 &&
(score = versions_->MaxCompactionScore()) > options_.hard_rate_limit) {
} else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 &&
(score = versions_->current()->MaxCompactionScore()) >
options_.hard_rate_limit) {
// Delay a write when the compaction score for any level is too large.
int max_level = versions_->MaxCompactionScoreLevel();
int max_level = versions_->current()->MaxCompactionScoreLevel();
mutex_.Unlock();
uint64_t delayed;
{
@ -3336,10 +3431,9 @@ Status DBImpl::MakeRoomForWrite(bool force,
allow_hard_rate_limit_delay = false;
}
mutex_.Lock();
} else if (
allow_soft_rate_limit_delay &&
options_.soft_rate_limit > 0.0 &&
(score = versions_->MaxCompactionScore()) > options_.soft_rate_limit) {
} else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 &&
(score = versions_->current()->MaxCompactionScore()) >
options_.soft_rate_limit) {
// Delay a write when the compaction score for any level is too large.
// TODO: add statistics
mutex_.Unlock();
@ -3494,8 +3588,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
// Pardon the long line but I think it is easier to read this way.
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n"
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < current->NumberLevels(); level++) {
@ -3515,9 +3609,21 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
total_bytes_read += bytes_read;
total_bytes_written += stats_[level].bytes_written;
uint64_t stalls = level == 0 ?
(stall_level0_slowdown_count_ +
stall_level0_num_files_count_ +
stall_memtable_compaction_count_) :
stall_leveln_slowdown_count_[level];
double stall_us = level == 0 ?
(stall_level0_slowdown_ +
stall_level0_num_files_ +
stall_memtable_compaction_) :
stall_leveln_slowdown_[level];
snprintf(
buf, sizeof(buf),
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n",
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n",
level,
files,
current->NumLevelBytes(level) / 1048576.0,
@ -3539,8 +3645,13 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
stats_[level].files_out_levelnp1,
stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1,
stats_[level].count,
stall_leveln_slowdown_[level] / 1000000.0,
(unsigned long) stall_leveln_slowdown_count_[level]);
(int) ((double) stats_[level].micros /
1000.0 /
(stats_[level].count + 1)),
(double) stall_us / 1000.0 / (stalls + 1),
stall_us / 1000000.0,
(unsigned long) stalls);
total_slowdown += stall_leveln_slowdown_[level];
total_slowdown_count += stall_leveln_slowdown_count_[level];
value->append(buf);
@ -3788,7 +3899,7 @@ Status DBImpl::DeleteFile(std::string name) {
}
}
edit.DeleteFile(level, number);
status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
if (status.ok()) {
InstallSuperVersion(deletion_state);
}
@ -3896,7 +4007,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_,
impl->db_directory_.get());
}
if (s.ok()) {
delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
@ -3904,6 +4016,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleFlushOrCompaction();
impl->MaybeScheduleLogDBDeployStats();
s = impl->db_directory_->Fsync();
}
}

@ -11,6 +11,7 @@
#include <atomic>
#include <deque>
#include <set>
#include <utility>
#include <vector>
#include "db/dbformat.h"
@ -65,8 +66,8 @@ class DBImpl : public DB {
virtual void ReleaseSnapshot(const Snapshot* snapshot);
virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1);
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1);
virtual int NumberLevels();
virtual int MaxMemCompactionLevel();
virtual int Level0StopWriteTrigger();
@ -91,17 +92,17 @@ class DBImpl : public DB {
virtual Status GetDbIdentity(std::string& identity);
void RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end);
Status RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin, *end]
void TEST_CompactRange(int level,
const Slice* begin,
const Slice* end);
Status TEST_CompactRange(int level,
const Slice* begin,
const Slice* end);
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable();
@ -141,10 +142,10 @@ class DBImpl : public DB {
// holds references to memtable, all immutable memtables and version
struct SuperVersion {
MemTable* mem;
MemTableList imm;
MemTableListVersion* imm;
Version* current;
std::atomic<uint32_t> refs;
// We need to_delete because during Cleanup(), imm.UnrefAll() returns
// We need to_delete because during Cleanup(), imm->Unref() returns
// all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction
autovector<MemTable*> to_delete;
@ -162,7 +163,7 @@ class DBImpl : public DB {
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
void Init(MemTable* new_mem, const MemTableList& new_imm,
void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current);
};
@ -256,6 +257,7 @@ class DBImpl : public DB {
private:
friend class DB;
friend class TailingIterator;
struct CompactionState;
struct Writer;
@ -357,7 +359,18 @@ class DBImpl : public DB {
// Move the files in the input level to the target level.
// If target_level < 0, automatically calculate the minimum level that could
// hold the data set.
void ReFitLevel(int level, int target_level = -1);
Status ReFitLevel(int level, int target_level = -1);
// Returns the current SuperVersion number.
uint64_t CurrentVersionNumber() const;
// Returns a pair of iterators (mutable-only and immutable-only) used
// internally by TailingIterator and stores CurrentVersionNumber() in
// *superversion_number. These iterators are always up-to-date, i.e. can
// be used to read new data.
std::pair<Iterator*, Iterator*> GetTailingIteratorPair(
const ReadOptions& options,
uint64_t* superversion_number);
// Constant after construction
const InternalFilterPolicy internal_filter_policy_;
@ -381,8 +394,15 @@ class DBImpl : public DB {
SuperVersion* super_version_;
// An ordinal representing the current SuperVersion. Updated by
// InstallSuperVersion(), i.e. incremented every time super_version_
// changes.
std::atomic<uint64_t> super_version_number_;
std::string host_name_;
std::unique_ptr<Directory> db_directory_;
// Queue of writers.
std::deque<Writer*> writers_;
WriteBatch tmp_batch_;
@ -412,6 +432,7 @@ class DBImpl : public DB {
int input_level;
int output_level;
bool done;
Status status;
bool in_progress; // compaction request being processed?
const InternalKey* begin; // nullptr means beginning of key range
const InternalKey* end; // nullptr means end of key range

@ -49,8 +49,9 @@ public:
virtual Status Write(const WriteOptions& options, WriteBatch* updates) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1) {
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status DisableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");

@ -22,15 +22,18 @@
#include "rocksdb/filter_policy.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/plain_table_factory.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "table/block_based_table_factory.h"
#include "util/hash.h"
#include "util/hash_linklist_rep.h"
#include "utilities/merge_operators.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/statistics.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
@ -838,6 +841,9 @@ TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
options.filter_policy = filter_policy.get();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
DestroyAndReopen(&options);
ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
@ -4789,8 +4795,9 @@ class ModelDB: public DB {
sizes[i] = 0;
}
}
virtual void CompactRange(const Slice* start, const Slice* end,
bool reduce_level, int target_level) {
virtual Status CompactRange(const Slice* start, const Slice* end,
bool reduce_level, int target_level) {
return Status::NotSupported("Not supported operation.");
}
virtual int NumberLevels()
@ -5271,6 +5278,118 @@ void BM_LogAndApply(int iters, int num_base_files) {
buf, iters, us, ((float)us) / iters);
}
TEST(DBTest, TailingIteratorSingle) {
ReadOptions read_options;
read_options.tailing = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
iter->SeekToFirst();
ASSERT_TRUE(!iter->Valid());
// add a record and check that iter can see it
ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor"));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "mirko");
iter->Next();
ASSERT_TRUE(!iter->Valid());
}
TEST(DBTest, TailingIteratorKeepAdding) {
ReadOptions read_options;
read_options.tailing = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
std::string value(1024, 'a');
const int num_records = 10000;
for (int i = 0; i < num_records; ++i) {
char buf[32];
snprintf(buf, sizeof(buf), "%016d", i);
Slice key(buf, 16);
ASSERT_OK(db_->Put(WriteOptions(), key, value));
iter->Seek(key);
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(key), 0);
}
}
TEST(DBTest, TailingIteratorDeletes) {
ReadOptions read_options;
read_options.tailing = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
// write a single record, read it using the iterator, then delete it
ASSERT_OK(db_->Put(WriteOptions(), "0test", "test"));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "0test");
ASSERT_OK(db_->Delete(WriteOptions(), "0test"));
// write many more records
const int num_records = 10000;
std::string value(1024, 'A');
for (int i = 0; i < num_records; ++i) {
char buf[32];
snprintf(buf, sizeof(buf), "1%015d", i);
Slice key(buf, 16);
ASSERT_OK(db_->Put(WriteOptions(), key, value));
}
// force a flush to make sure that no records are read from memtable
dbfull()->TEST_FlushMemTable();
// skip "0test"
iter->Next();
// make sure we can read all new records using the existing iterator
int count = 0;
for (; iter->Valid(); iter->Next(), ++count) ;
ASSERT_EQ(count, num_records);
}
TEST(DBTest, TailingIteratorPrefixSeek) {
ReadOptions read_options;
read_options.tailing = true;
read_options.prefix_seek = true;
auto prefix_extractor = NewFixedPrefixTransform(2);
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.prefix_extractor = prefix_extractor;
options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor));
DestroyAndReopen(&options);
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
ASSERT_OK(db_->Put(WriteOptions(), "0101", "test"));
dbfull()->TEST_FlushMemTable();
ASSERT_OK(db_->Put(WriteOptions(), "0202", "test"));
// Seek(0102) shouldn't find any records since 0202 has a different prefix
iter->Seek("0102");
ASSERT_TRUE(!iter->Valid());
iter->Seek("0202");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "0202");
iter->Next();
ASSERT_TRUE(!iter->Valid());
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -28,6 +28,8 @@ Reader::Reader(unique_ptr<SequentialFile>&& file, Reporter* reporter,
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
read_error_(false),
eof_offset_(0),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}
void Reader::UnmarkEOF() {
if (read_error_) {
return;
}
eof_ = false;
if (eof_offset_ == 0) {
return;
}
// If the EOF was in the middle of a block (a partial block was read) we have
// to read the rest of the block as ReadPhysicalRecord can only read full
// blocks and expects the file position indicator to be aligned to the start
// of a block.
//
// consumed_bytes + buffer_size() + remaining == kBlockSize
size_t consumed_bytes = eof_offset_ - buffer_.size();
size_t remaining = kBlockSize - eof_offset_;
// backing_store_ is used to concatenate what is left in buffer_ and
// the remainder of the block. If buffer_ already uses backing_store_,
// we just append the new data.
if (buffer_.data() != backing_store_ + consumed_bytes) {
// Buffer_ does not use backing_store_ for storage.
// Copy what is left in buffer_ to backing_store.
memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
}
Slice read_buffer;
Status status = file_->Read(remaining, &read_buffer,
backing_store_ + eof_offset_);
size_t added = read_buffer.size();
end_of_buffer_offset_ += added;
if (!status.ok()) {
if (added > 0) {
ReportDrop(added, status);
}
read_error_ = true;
return;
}
if (read_buffer.data() != backing_store_ + eof_offset_) {
// Read did not write to backing_store_
memmove(backing_store_ + eof_offset_, read_buffer.data(),
read_buffer.size());
}
buffer_ = Slice(backing_store_ + consumed_bytes,
eof_offset_ + added - consumed_bytes);
if (added < remaining) {
eof_ = true;
eof_offset_ += added;
} else {
eof_offset_ = 0;
}
}
void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
}
@ -184,7 +249,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
@ -192,10 +257,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
read_error_ = true;
return kEof;
} else if (buffer_.size() < (size_t)kBlockSize) {
eof_ = true;
eof_offset_ = buffer_.size();
}
continue;
} else if (buffer_.size() == 0) {

@ -69,9 +69,10 @@ class Reader {
// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
void UnmarkEOF() {
eof_ = false;
}
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
void UnmarkEOF();
SequentialFile* file() { return file_.get(); }
@ -82,6 +83,11 @@ class Reader {
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file
// Offset of the file position indicator within the last block when an
// EOF was detected.
size_t eof_offset_;
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;

@ -47,36 +47,93 @@ class LogTest {
public:
std::string contents_;
explicit StringDest(Slice& reader_contents) :
WritableFile(),
contents_(""),
reader_contents_(reader_contents),
last_flush_(0) {
reader_contents_ = Slice(contents_.data(), 0);
};
virtual Status Close() { return Status::OK(); }
virtual Status Flush() { return Status::OK(); }
virtual Status Flush() {
ASSERT_TRUE(reader_contents_.size() <= last_flush_);
size_t offset = last_flush_ - reader_contents_.size();
reader_contents_ = Slice(
contents_.data() + offset,
contents_.size() - offset);
last_flush_ = contents_.size();
return Status::OK();
}
virtual Status Sync() { return Status::OK(); }
virtual Status Append(const Slice& slice) {
contents_.append(slice.data(), slice.size());
return Status::OK();
}
void Drop(size_t bytes) {
contents_.resize(contents_.size() - bytes);
reader_contents_ = Slice(
reader_contents_.data(), reader_contents_.size() - bytes);
last_flush_ = contents_.size();
}
private:
Slice& reader_contents_;
size_t last_flush_;
};
class StringSource : public SequentialFile {
public:
Slice contents_;
Slice& contents_;
bool force_error_;
size_t force_error_position_;
bool force_eof_;
size_t force_eof_position_;
bool returned_partial_;
StringSource() : force_error_(false), returned_partial_(false) { }
explicit StringSource(Slice& contents) :
contents_(contents),
force_error_(false),
force_error_position_(0),
force_eof_(false),
force_eof_position_(0),
returned_partial_(false) { }
virtual Status Read(size_t n, Slice* result, char* scratch) {
ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
if (force_error_) {
force_error_ = false;
returned_partial_ = true;
return Status::Corruption("read error");
if (force_error_position_ >= n) {
force_error_position_ -= n;
} else {
*result = Slice(contents_.data(), force_error_position_);
contents_.remove_prefix(force_error_position_);
force_error_ = false;
returned_partial_ = true;
return Status::Corruption("read error");
}
}
if (contents_.size() < n) {
n = contents_.size();
returned_partial_ = true;
}
*result = Slice(contents_.data(), n);
if (force_eof_) {
if (force_eof_position_ >= n) {
force_eof_position_ -= n;
} else {
force_eof_ = false;
n = force_eof_position_;
returned_partial_ = true;
}
}
// By using scratch we ensure that caller has control over the
// lifetime of result.data()
memcpy(scratch, contents_.data(), n);
*result = Slice(scratch, n);
contents_.remove_prefix(n);
return Status::OK();
}
@ -123,10 +180,10 @@ class LogTest {
src->contents_ = dest_contents();
}
Slice reader_contents_;
unique_ptr<StringDest> dest_holder_;
unique_ptr<StringSource> source_holder_;
ReportCollector report_;
bool reading_;
Writer writer_;
Reader reader_;
@ -135,16 +192,15 @@ class LogTest {
static uint64_t initial_offset_last_record_offsets_[];
public:
LogTest() : dest_holder_(new StringDest),
source_holder_(new StringSource),
reading_(false),
LogTest() : reader_contents_(),
dest_holder_(new StringDest(reader_contents_)),
source_holder_(new StringSource(reader_contents_)),
writer_(std::move(dest_holder_)),
reader_(std::move(source_holder_), &report_, true/*checksum*/,
0/*initial_offset*/) {
}
void Write(const std::string& msg) {
ASSERT_TRUE(!reading_) << "Write() after starting to read";
writer_.AddRecord(Slice(msg));
}
@ -153,10 +209,6 @@ class LogTest {
}
std::string Read() {
if (!reading_) {
reading_ = true;
reset_source_contents();
}
std::string scratch;
Slice record;
if (reader_.ReadRecord(&record, &scratch)) {
@ -175,7 +227,9 @@ class LogTest {
}
void ShrinkSize(int bytes) {
dest_contents().resize(dest_contents().size() - bytes);
auto dest = dynamic_cast<StringDest*>(writer_.file());
assert(dest);
dest->Drop(bytes);
}
void FixChecksum(int header_offset, int len) {
@ -185,9 +239,10 @@ class LogTest {
EncodeFixed32(&dest_contents()[header_offset], crc);
}
void ForceError() {
void ForceError(size_t position = 0) {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->force_error_ = true;
src->force_error_position_ = position;
}
size_t DroppedBytes() const {
@ -198,6 +253,22 @@ class LogTest {
return report_.message_;
}
void ForceEOF(size_t position = 0) {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->force_eof_ = true;
src->force_eof_position_ = position;
}
void UnmarkEOF() {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->returned_partial_ = false;
reader_.UnmarkEOF();
}
bool IsEOF() {
return reader_.IsEOF();
}
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
if (report_.message_.find(msg) == std::string::npos) {
@ -217,9 +288,7 @@ class LogTest {
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog();
reading_ = true;
unique_ptr<StringSource> source(new StringSource);
source->contents_ = dest_contents();
unique_ptr<StringSource> source(new StringSource(reader_contents_));
unique_ptr<Reader> offset_reader(
new Reader(std::move(source), &report_, true/*checksum*/,
WrittenBytes() + offset_past_end));
@ -231,9 +300,7 @@ class LogTest {
void CheckInitialOffsetRecord(uint64_t initial_offset,
int expected_record_offset) {
WriteInitialOffsetLog();
reading_ = true;
unique_ptr<StringSource> source(new StringSource);
source->contents_ = dest_contents();
unique_ptr<StringSource> source(new StringSource(reader_contents_));
unique_ptr<Reader> offset_reader(
new Reader(std::move(source), &report_, true/*checksum*/,
initial_offset));
@ -520,6 +587,70 @@ TEST(LogTest, ReadPastEnd) {
CheckOffsetPastEndReturnsNoRecords(5);
}
TEST(LogTest, ClearEofSingleBlock) {
Write("foo");
Write("bar");
ForceEOF(3 + kHeaderSize + 2);
ASSERT_EQ("foo", Read());
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_TRUE(IsEOF());
ASSERT_EQ("EOF", Read());
Write("xxx");
UnmarkEOF();
ASSERT_EQ("xxx", Read());
ASSERT_TRUE(IsEOF());
}
TEST(LogTest, ClearEofMultiBlock) {
size_t num_full_blocks = 5;
size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25;
Write(BigString("foo", n));
Write(BigString("bar", n));
ForceEOF(n + num_full_blocks * kHeaderSize + 10);
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_TRUE(IsEOF());
UnmarkEOF();
ASSERT_EQ(BigString("bar", n), Read());
ASSERT_TRUE(IsEOF());
Write(BigString("xxx", n));
UnmarkEOF();
ASSERT_EQ(BigString("xxx", n), Read());
ASSERT_TRUE(IsEOF());
}
TEST(LogTest, ClearEofError) {
// If an error occurs during Read() in UnmarkEOF(), the records contained
// in the buffer should be returned on subsequent calls of ReadRecord()
// until no more full records are left, whereafter ReadRecord() should return
// false to indicate that it cannot read any further.
Write("foo");
Write("bar");
UnmarkEOF();
ASSERT_EQ("foo", Read());
ASSERT_TRUE(IsEOF());
Write("xxx");
ForceError(0);
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, ClearEofError2) {
Write("foo");
Write("bar");
UnmarkEOF();
ASSERT_EQ("foo", Read());
Write("xxx");
ForceError(3);
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("read error"));
}
} // namespace log
} // namespace rocksdb

@ -16,41 +16,85 @@ namespace rocksdb {
class InternalKeyComparator;
class Mutex;
class MemTableListIterator;
class VersionSet;
using std::list;
// Increase reference count on all underling memtables
void MemTableList::RefAll() {
for (auto &memtable : memlist_) {
memtable->Ref();
MemTableListVersion::MemTableListVersion(MemTableListVersion* old) {
if (old != nullptr) {
memlist_ = old->memlist_;
size_ = old->size_;
for (auto& m : memlist_) {
m->Ref();
}
}
}
// Drop reference count on all underling memtables. If the
// refcount of an underlying memtable drops to zero, then
// return it in to_delete vector.
void MemTableList::UnrefAll(autovector<MemTable*>* to_delete) {
for (auto &memtable : memlist_) {
MemTable* m = memtable->Unref();
if (m != nullptr) {
to_delete->push_back(m);
void MemTableListVersion::Ref() { ++refs_; }
void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
// if to_delete is equal to nullptr it means we're confident
// that refs_ will not be zero
assert(to_delete != nullptr);
for (const auto& m : memlist_) {
MemTable* x = m->Unref();
if (x != nullptr) {
to_delete->push_back(x);
}
}
delete this;
}
}
int MemTableListVersion::size() const { return size_; }
// Returns the total number of memtables in the list
int MemTableList::size() {
assert(num_flush_not_started_ <= size_);
return size_;
int MemTableList::size() const {
assert(num_flush_not_started_ <= current_->size_);
return current_->size_;
}
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext& merge_context,
const Options& options) {
for (auto& memtable : memlist_) {
if (memtable->Get(key, value, s, merge_context, options)) {
return true;
}
}
return false;
}
void MemTableListVersion::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list) {
for (auto& m : memlist_) {
iterator_list->push_back(m->NewIterator(options));
}
}
// caller is responsible for referencing m
void MemTableListVersion::Add(MemTable* m) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
memlist_.push_front(m);
++size_;
}
// caller is responsible for unreferencing m
void MemTableListVersion::Remove(MemTable* m) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
memlist_.remove(m);
--size_;
}
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) {
bool MemTableList::IsFlushPending() {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge)) {
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.NoBarrier_Load() != nullptr);
return true;
}
@ -59,7 +103,8 @@ bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) {
// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
for (auto it = memlist_.rbegin(); it != memlist_.rend(); it++) {
const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_in_progress_) {
assert(!m->flush_completed_);
@ -67,21 +112,19 @@ void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
if (num_flush_not_started_ == 0) {
imm_flush_needed.Release_Store(nullptr);
}
m->flush_in_progress_ = true; // flushing will start very soon
m->flush_in_progress_ = true; // flushing will start very soon
ret->push_back(m);
}
}
flush_requested_ = false; // start-flush request is complete
flush_requested_ = false; // start-flush request is complete
}
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
const autovector<MemTable*> &mems,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
autovector<MemTable*>* to_delete) {
const autovector<MemTable*>& mems, VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory) {
mu->AssertHeld();
// If the flush was not successful, then just reset state.
@ -122,8 +165,8 @@ Status MemTableList::InstallMemtableFlushResults(
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
while (!memlist_.empty() && s.ok()) {
MemTable* m = memlist_.back(); // get the last element
while (!current_->memlist_.empty() && s.ok()) {
MemTable* m = current_->memlist_.back(); // get the last element
if (!m->flush_completed_) {
break;
}
@ -133,7 +176,11 @@ Status MemTableList::InstallMemtableFlushResults(
(unsigned long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu);
s = vset->LogAndApply(&m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
@ -144,7 +191,7 @@ Status MemTableList::InstallMemtableFlushResults(
"Level-0 commit table #%lu: memtable #%lu done",
(unsigned long)m->file_number_,
(unsigned long)mem_id);
memlist_.remove(m);
current_->Remove(m);
assert(m->file_number_ > 0);
// pending_outputs can be cleared only after the newly created file
@ -155,7 +202,6 @@ Status MemTableList::InstallMemtableFlushResults(
if (m->Unref() != nullptr) {
to_delete->push_back(m);
}
size_--;
} else {
//commit failed. setup state so that we can flush again.
Log(info_log,
@ -172,7 +218,7 @@ Status MemTableList::InstallMemtableFlushResults(
s = Status::IOError("Unable to commit flushed memtable");
}
++mem_id;
} while (!memlist_.empty() && (m = memlist_.back()) &&
} while (!current_->memlist_.empty() && (m = current_->memlist_.back()) &&
m->file_number_ == file_number);
}
commit_in_progress_ = false;
@ -181,9 +227,14 @@ Status MemTableList::InstallMemtableFlushResults(
// New memtables are inserted at the front of the list.
void MemTableList::Add(MemTable* m) {
assert(size_ >= num_flush_not_started_);
size_++;
memlist_.push_front(m);
assert(current_->size_ >= num_flush_not_started_);
InstallNewVersion();
// this method is used to move mutable memtable into an immutable list.
// since mutable memtable is already refcounted by the DBImpl,
// and when moving to the imutable list we don't unref it,
// we don't have to ref the memtable here. we just take over the
// reference from the DBImpl.
current_->Add(m);
m->MarkImmutable();
num_flush_not_started_++;
if (num_flush_not_started_ == 1) {
@ -194,28 +245,21 @@ void MemTableList::Add(MemTable* m) {
// Returns an estimate of the number of bytes of data in use.
size_t MemTableList::ApproximateMemoryUsage() {
size_t size = 0;
for (auto &memtable : memlist_) {
for (auto& memtable : current_->memlist_) {
size += memtable->ApproximateMemoryUsage();
}
return size;
}
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options) {
for (auto &memtable : memlist_) {
if (memtable->Get(key, value, s, merge_context, options)) {
return true;
}
}
return false;
}
void MemTableList::GetMemTables(autovector<MemTable*>* output) {
for (auto &memtable : memlist_) {
output->push_back(memtable);
void MemTableList::InstallNewVersion() {
if (current_->refs_ == 1) {
// we're the only one using the version, just keep using it
} else {
// somebody else holds the current version, we need to create new one
MemTableListVersion* version = current_;
current_ = new MemTableListVersion(current_);
current_->Ref();
version->Unref();
}
}

@ -13,62 +13,93 @@
#include "db/memtable.h"
#include "db/skiplist.h"
#include "rocksdb/db.h"
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/autovector.h"
namespace rocksdb {
class InternalKeyComparator;
class Mutex;
class MemTableListIterator;
//
// keeps a list of immutable memtables in a vector. the list is immutable
// if refcount is bigger than one. It is used as a state for Get() and
// Iterator code paths
class MemTableListVersion {
public:
explicit MemTableListVersion(MemTableListVersion* old = nullptr);
void Ref();
void Unref(autovector<MemTable*>* to_delete = nullptr);
int size() const;
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
void AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list);
private:
// REQUIRE: m is mutable memtable
void Add(MemTable* m);
// REQUIRE: m is mutable memtable
void Remove(MemTable* m);
friend class MemTableList;
std::list<MemTable*> memlist_;
int size_ = 0;
int refs_ = 0;
};
// This class stores references to all the immutable memtables.
// The memtables are flushed to L0 as soon as possible and in
// any order. If there are more than one immutable memtable, their
// flushes can occur concurrently. However, they are 'committed'
// to the manifest in FIFO order to maintain correctness and
// recoverability from a crash.
//
class MemTableList {
public:
// A list of memtables.
MemTableList() : size_(0), num_flush_not_started_(0),
commit_in_progress_(false),
flush_requested_(false) {
explicit MemTableList(int min_write_buffer_number_to_merge)
: min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
current_(new MemTableListVersion()),
num_flush_not_started_(0),
commit_in_progress_(false),
flush_requested_(false) {
imm_flush_needed.Release_Store(nullptr);
current_->Ref();
}
~MemTableList() {};
~MemTableList() {}
MemTableListVersion* current() { return current_; }
// so that backgrund threads can detect non-nullptr pointer to
// determine whether this is anything more to start flushing.
port::AtomicPointer imm_flush_needed;
// Increase reference count on all underling memtables
void RefAll();
// Drop reference count on all underling memtables. If the refcount
// on an underlying memtable drops to zero, then return it in
// to_delete vector.
void UnrefAll(autovector<MemTable*>* to_delete);
// Returns the total number of memtables in the list
int size();
int size() const;
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool IsFlushPending(int min_write_buffer_number_to_merge);
bool IsFlushPending();
// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.
void PickMemtablesToFlush(autovector<MemTable*>* mems);
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(const autovector<MemTable*> &m,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
autovector<MemTable*>* to_delete);
Status InstallMemtableFlushResults(const autovector<MemTable*>& m,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
autovector<MemTable*>* to_delete,
Directory* db_directory);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().
@ -77,14 +108,6 @@ class MemTableList {
// Returns an estimate of the number of bytes of data in use.
size_t ApproximateMemoryUsage();
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
// Returns the list of underlying memtables.
void GetMemTables(autovector<MemTable*>* list);
// Request a flush of all existing memtables to storage
void FlushRequested() { flush_requested_ = true; }
@ -93,8 +116,12 @@ class MemTableList {
// void operator=(const MemTableList&);
private:
std::list<MemTable*> memlist_;
int size_;
// DB mutex held
void InstallNewVersion();
int min_write_buffer_number_to_merge_;
MemTableListVersion* current_;
// the number of elements that still need flushing
int num_flush_not_started_;

@ -12,6 +12,8 @@
#pragma once
#include "rocksdb/iterator.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
namespace rocksdb {

@ -35,6 +35,7 @@
#include <stdlib.h>
#include "port/port.h"
#include "util/random.h"
#include "rocksdb/arena.h"
namespace rocksdb {

@ -0,0 +1,175 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/tailing_iter.h"
#include <string>
#include <utility>
#include "db/db_impl.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
namespace rocksdb {
TailingIterator::TailingIterator(DBImpl* db, const ReadOptions& options,
const Comparator* comparator)
: db_(db), options_(options), comparator_(comparator),
version_number_(0), current_(nullptr),
status_(Status::InvalidArgument("Seek() not called on this iterator")) {}
bool TailingIterator::Valid() const {
return current_ != nullptr;
}
void TailingIterator::SeekToFirst() {
if (!IsCurrentVersion()) {
CreateIterators();
}
mutable_->SeekToFirst();
immutable_->SeekToFirst();
UpdateCurrent();
}
void TailingIterator::Seek(const Slice& target) {
if (!IsCurrentVersion()) {
CreateIterators();
}
mutable_->Seek(target);
// We maintain the interval (prev_key_, immutable_->key()] such that there
// are no records with keys within that range in immutable_ other than
// immutable_->key(). Since immutable_ can't change in this version, we don't
// need to do a seek if 'target' belongs to that interval (i.e. immutable_ is
// already at the correct position)!
//
// If options.prefix_seek is used and immutable_ is not valid, seek if target
// has a different prefix than prev_key.
//
// prev_key_ is updated by Next(). SeekImmutable() sets prev_key_ to
// 'target' -- in this case, prev_key_ is included in the interval, so
// prev_inclusive_ has to be set.
if (!is_prev_set_ ||
comparator_->Compare(prev_key_, target) >= !is_prev_inclusive_ ||
(immutable_->Valid() &&
comparator_->Compare(target, immutable_->key()) > 0) ||
(options_.prefix_seek && !IsSamePrefix(target))) {
SeekImmutable(target);
}
UpdateCurrent();
}
void TailingIterator::Next() {
assert(Valid());
if (!IsCurrentVersion()) {
// save the current key, create new iterators and then seek
std::string current_key = key().ToString();
Slice key_slice(current_key.data(), current_key.size());
CreateIterators();
Seek(key_slice);
if (!Valid() || key().compare(key_slice) != 0) {
// record with current_key no longer exists
return;
}
} else if (current_ == immutable_.get()) {
// immutable iterator is advanced -- update prev_key_
prev_key_ = key().ToString();
is_prev_inclusive_ = false;
is_prev_set_ = true;
}
current_->Next();
UpdateCurrent();
}
Slice TailingIterator::key() const {
assert(Valid());
return current_->key();
}
Slice TailingIterator::value() const {
assert(Valid());
return current_->value();
}
Status TailingIterator::status() const {
if (!status_.ok()) {
return status_;
} else if (!mutable_->status().ok()) {
return mutable_->status();
} else {
return immutable_->status();
}
}
void TailingIterator::Prev() {
status_ = Status::NotSupported("This iterator doesn't support Prev()");
}
void TailingIterator::SeekToLast() {
status_ = Status::NotSupported("This iterator doesn't support SeekToLast()");
}
void TailingIterator::CreateIterators() {
std::pair<Iterator*, Iterator*> iters =
db_->GetTailingIteratorPair(options_, &version_number_);
assert(iters.first && iters.second);
mutable_.reset(iters.first);
immutable_.reset(iters.second);
current_ = nullptr;
is_prev_set_ = false;
}
void TailingIterator::UpdateCurrent() {
current_ = nullptr;
if (mutable_->Valid()) {
current_ = mutable_.get();
}
if (immutable_->Valid() &&
(current_ == nullptr ||
comparator_->Compare(immutable_->key(), current_->key()) < 0)) {
current_ = immutable_.get();
}
if (!status_.ok()) {
// reset status that was set by Prev() or SeekToLast()
status_ = Status::OK();
}
}
bool TailingIterator::IsCurrentVersion() const {
return mutable_ != nullptr && immutable_ != nullptr &&
version_number_ == db_->CurrentVersionNumber();
}
bool TailingIterator::IsSamePrefix(const Slice& target) const {
const SliceTransform* extractor = db_->options_.prefix_extractor;
assert(extractor);
assert(is_prev_set_);
return extractor->Transform(target)
.compare(extractor->Transform(prev_key_)) == 0;
}
void TailingIterator::SeekImmutable(const Slice& target) {
prev_key_ = target.ToString();
is_prev_inclusive_ = true;
is_prev_set_ = true;
immutable_->Seek(target);
}
} // namespace rocksdb

@ -0,0 +1,88 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
namespace rocksdb {
class DBImpl;
/**
* TailingIterator is a special type of iterator that doesn't use an (implicit)
* snapshot. In other words, it can be used to read data that was added to the
* db after the iterator had been created.
*
* TailingIterator is optimized for sequential reading. It doesn't support
* Prev() and SeekToLast() operations.
*/
class TailingIterator : public Iterator {
public:
TailingIterator(DBImpl* db, const ReadOptions& options,
const Comparator* comparator);
virtual ~TailingIterator() {}
virtual bool Valid() const override;
virtual void SeekToFirst() override;
virtual void SeekToLast() override;
virtual void Seek(const Slice& target) override;
virtual void Next() override;
virtual void Prev() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual Status status() const override;
private:
DBImpl* const db_;
const ReadOptions options_;
const Comparator* const comparator_;
uint64_t version_number_;
// TailingIterator merges the contents of the two iterators below (one using
// mutable memtable contents only, other over SSTs and immutable memtables).
// See DBIter::GetTailingIteratorPair().
std::unique_ptr<Iterator> mutable_;
std::unique_ptr<Iterator> immutable_;
// points to either mutable_ or immutable_
Iterator* current_;
// key that precedes immutable iterator's current key
std::string prev_key_;
// unless prev_set is true, prev_key/prev_head is not valid and shouldn't be
// used; reset by createIterators()
bool is_prev_set_;
// prev_key_ was set by SeekImmutable(), which means that the interval of
// keys covered by immutable_ is [prev_key_, current], i.e. it includes the
// left endpoint
bool is_prev_inclusive_;
// internal iterator status
Status status_;
// check if this iterator's version matches DB's version
bool IsCurrentVersion() const;
// check if SeekImmutable() is needed due to target having a different prefix
// than prev_key_ (used when options.prefix_seek is set)
bool IsSamePrefix(const Slice& target) const;
// creates mutable_ and immutable_ iterators and updates version_number_
void CreateIterators();
// set current_ to be one of the iterators with the smallest key
void UpdateCurrent();
// seek on immutable_ and update prev_key
void SeekImmutable(const Slice& target);
};
} // namespace rocksdb

@ -761,6 +761,28 @@ bool Version::Unref() {
return false;
}
bool Version::NeedsCompaction() const {
if (file_to_compact_ != nullptr) {
return true;
}
// In universal compaction case, this check doesn't really
// check the compaction condition, but checks num of files threshold
// only. We are not going to miss any compaction opportunity
// but it's likely that more compactions are scheduled but
// ending up with nothing to do. We can improve it later.
// TODO(sdong): improve this function to be accurate for universal
// compactions.
int num_levels_to_check =
(vset_->options_->compaction_style != kCompactionStyleUniversal) ?
NumberLevels() - 1 : 1;
for (int i = 0; i < num_levels_to_check; i++) {
if (compaction_score_[i] >= 1) {
return true;
}
}
return false;
}
bool Version::OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
@ -1418,6 +1440,7 @@ void VersionSet::AppendVersion(Version* v) {
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
Directory* db_directory,
bool new_descriptor_log) {
mu->AssertHeld();
@ -1546,6 +1569,9 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// of it later
env_->DeleteFile(DescriptorFileName(dbname_, old_manifest_file_number));
}
if (!options_->disableDataSync && db_directory != nullptr) {
db_directory->Fsync();
}
}
// find offset in manifest file where this version is stored.
@ -1762,6 +1788,78 @@ Status VersionSet::Recover() {
return s;
}
Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
const Options* options,
const EnvOptions& storage_options,
int new_levels) {
if (new_levels <= 1) {
return Status::InvalidArgument(
"Number of levels needs to be bigger than 1");
}
const InternalKeyComparator cmp(options->comparator);
TableCache tc(dbname, options, storage_options, 10);
VersionSet versions(dbname, options, storage_options, &tc, &cmp);
Status status;
status = versions.Recover();
if (!status.ok()) {
return status;
}
Version* current_version = versions.current();
int current_levels = current_version->NumberLevels();
if (current_levels <= new_levels) {
return Status::OK();
}
// Make sure there are file only on one level from
// (new_levels-1) to (current_levels-1)
int first_nonempty_level = -1;
int first_nonempty_level_filenum = 0;
for (int i = new_levels - 1; i < current_levels; i++) {
int file_num = current_version->NumLevelFiles(i);
if (file_num != 0) {
if (first_nonempty_level < 0) {
first_nonempty_level = i;
first_nonempty_level_filenum = file_num;
} else {
char msg[255];
snprintf(msg, sizeof(msg),
"Found at least two levels containing files: "
"[%d:%d],[%d:%d].\n",
first_nonempty_level, first_nonempty_level_filenum, i,
file_num);
return Status::InvalidArgument(msg);
}
}
}
std::vector<FileMetaData*>* old_files_list = current_version->files_;
// we need to allocate an array with the old number of levels size to
// avoid SIGSEGV in WriteSnapshot()
// however, all levels bigger or equal to new_levels will be empty
std::vector<FileMetaData*>* new_files_list =
new std::vector<FileMetaData*>[current_levels];
for (int i = 0; i < new_levels - 1; i++) {
new_files_list[i] = old_files_list[i];
}
if (first_nonempty_level > 0) {
new_files_list[new_levels - 1] = old_files_list[first_nonempty_level];
}
delete[] current_version->files_;
current_version->files_ = new_files_list;
current_version->num_levels_ = new_levels;
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return versions.LogAndApply(&ve, &dummy_mutex, nullptr, true);
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
bool verbose, bool hex) {
struct LogReporter : public log::Reader::Reporter {

@ -101,6 +101,15 @@ class Version {
// and return true. Otherwise, return false.
bool Unref();
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const;
// Returns the maxmimum compaction score for levels 1 to max
double MaxCompactionScore() const { return max_compaction_score_; }
// See field declaration
int MaxCompactionScoreLevel() const { return max_compaction_score_level_; }
void GetOverlappingInputs(
int level,
const InternalKey* begin, // nullptr means before all keys
@ -277,6 +286,7 @@ class VersionSet {
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu,
Directory* db_directory = nullptr,
bool new_descriptor_log = false);
// Recover the last saved descriptor from persistent storage.
@ -285,10 +295,16 @@ class VersionSet {
// Try to reduce the number of levels. This call is valid when
// only one level from the new max level to the old
// max level containing files.
// The call is static, since number of levels is immutable during
// the lifetime of a RocksDB instance. It reduces number of levels
// in a DB by applying changes to manifest.
// For example, a db currently has 7 levels [0-6], and a call to
// to reduce to 5 [0-4] can only be executed when only one level
// among [4-6] contains files.
Status ReduceNumberOfLevels(int new_levels, port::Mutex* mu);
static Status ReduceNumberOfLevels(const std::string& dbname,
const Options* options,
const EnvOptions& storage_options,
int new_levels);
// Return the current version.
Version* current() const { return current_; }
@ -364,42 +380,6 @@ class VersionSet {
// The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c);
// Returns true iff some level needs a compaction because it has
// exceeded its target size.
bool NeedsSizeCompaction() const {
// In universal compaction case, this check doesn't really
// check the compaction condition, but checks num of files threshold
// only. We are not going to miss any compaction opportunity
// but it's likely that more compactions are scheduled but
// ending up with nothing to do. We can improve it later.
// TODO: improve this function to be accurate for universal
// compactions.
int num_levels_to_check =
(options_->compaction_style != kCompactionStyleUniversal) ?
NumberLevels() - 1 : 1;
for (int i = 0; i < num_levels_to_check; i++) {
if (current_->compaction_score_[i] >= 1) {
return true;
}
}
return false;
}
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
return ((current_->file_to_compact_ != nullptr) ||
NeedsSizeCompaction());
}
// Returns the maxmimum compaction score for levels 1 to max
double MaxCompactionScore() const {
return current_->max_compaction_score_;
}
// See field declaration
int MaxCompactionScoreLevel() const {
return current_->max_compaction_score_level_;
}
// Add all files listed in any live version to *live.
void AddLiveFiles(std::vector<uint64_t>* live_list);

@ -1,77 +0,0 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "db/version_set.h"
#include <algorithm>
#include <stdio.h>
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "util/logging.h"
namespace rocksdb {
Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
if(new_levels <= 1) {
return Status::InvalidArgument(
"Number of levels needs to be bigger than 1");
}
Version* current_version = current_;
int current_levels = current_version->NumberLevels();
if (current_levels <= new_levels) {
return Status::OK();
}
// Make sure there are file only on one level from
// (new_levels-1) to (current_levels-1)
int first_nonempty_level = -1;
int first_nonempty_level_filenum = 0;
for (int i = new_levels - 1; i < current_levels; i++) {
int file_num = current_version->NumLevelFiles(i);
if (file_num != 0) {
if (first_nonempty_level < 0) {
first_nonempty_level = i;
first_nonempty_level_filenum = file_num;
} else {
char msg[255];
sprintf(msg, "Found at least two levels containing files: "
"[%d:%d],[%d:%d].\n",
first_nonempty_level, first_nonempty_level_filenum, i, file_num);
return Status::InvalidArgument(msg);
}
}
}
Status st;
std::vector<FileMetaData*>* old_files_list = current_version->files_;
std::vector<FileMetaData*>* new_files_list =
new std::vector<FileMetaData*>[new_levels];
for (int i = 0; i < new_levels - 1; i++) {
new_files_list[i] = old_files_list[i];
}
if (first_nonempty_level > 0) {
new_files_list[new_levels - 1] = old_files_list[first_nonempty_level];
}
delete[] current_version->files_;
current_version->files_ = new_files_list;
current_version->num_levels_ = new_levels;
num_levels_ = new_levels;
compaction_picker_->ReduceNumberOfLevels(new_levels);
VersionEdit ve;
st = LogAndApply(&ve, mu, true);
return st;
}
}

@ -70,6 +70,9 @@ class HdfsEnv : public Env {
unique_ptr<RandomRWFile>* result,
const EnvOptions& options);
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result);
virtual bool FileExists(const std::string& fname);
virtual Status GetChildren(const std::string& path,
@ -246,6 +249,11 @@ class HdfsEnv : public Env {
return notsup;
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return notsup;
}
virtual bool FileExists(const std::string& fname){return false;}
virtual Status GetChildren(const std::string& path,

@ -221,6 +221,11 @@ class WritableFileImpl : public WritableFile {
FileState* file_;
};
class InMemoryDirectory : public Directory {
public:
virtual Status Fsync() { return Status::OK(); }
};
class InMemoryEnv : public EnvWrapper {
public:
explicit InMemoryEnv(Env* base_env) : EnvWrapper(base_env) { }
@ -274,6 +279,12 @@ class InMemoryEnv : public EnvWrapper {
return Status::OK();
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
result->reset(new InMemoryDirectory());
return Status::OK();
}
virtual bool FileExists(const std::string& fname) {
MutexLock lock(&mutex_);
return file_map_.find(fname) != file_map_.end();

@ -107,6 +107,15 @@ class Cache {
// returns the memory size for the entries residing in the cache.
virtual size_t GetUsage() const = 0;
// Call this on shutdown if you want to speed it up. Cache will disown
// any underlying data and will not free it on delete. This call will leak
// memory - call this only if you're shutting down the process.
// Any attempts of using cache after this call will fail terribly.
// Always delete the DB object before calling this method!
virtual void DisownData() {
// default implementation is noop
};
private:
void LRU_Remove(Handle* e);
void LRU_Append(Handle* e);

@ -215,9 +215,9 @@ class DB {
// hosting all the files. In this case, client could set reduce_level
// to true, to move the files back to the minimum level capable of holding
// the data set or a given level (specified by non-negative target_level).
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) = 0;
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) = 0;
// Number of levels used for this DB.
virtual int NumberLevels() = 0;

@ -33,6 +33,7 @@ class SequentialFile;
class Slice;
class WritableFile;
class RandomRWFile;
class Directory;
struct Options;
using std::unique_ptr;
@ -122,6 +123,16 @@ class Env {
unique_ptr<RandomRWFile>* result,
const EnvOptions& options) = 0;
// Create an object that represents a directory. Will fail if directory
// doesn't exist. If the directory exists, it will open the directory
// and create a new Directory object.
//
// On success, stores a pointer to the new Directory in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK.
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) = 0;
// Returns true iff the named file exists.
virtual bool FileExists(const std::string& fname) = 0;
@ -488,6 +499,15 @@ class RandomRWFile {
void operator=(const RandomRWFile&);
};
// Directory object represents collection of files and implements
// filesystem operations that can be executed on directories.
class Directory {
public:
virtual ~Directory() {}
// Fsync directory
virtual Status Fsync() = 0;
};
// An interface for writing log messages.
class Logger {
public:
@ -578,6 +598,10 @@ class EnvWrapper : public Env {
const EnvOptions& options) {
return target_->NewRandomRWFile(f, r, options);
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return target_->NewDirectory(name, result);
}
bool FileExists(const std::string& f) { return target_->FileExists(f); }
Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
return target_->GetChildren(dir, r);

@ -37,12 +37,14 @@
#define STORAGE_ROCKSDB_DB_MEMTABLEREP_H_
#include <memory>
#include "rocksdb/arena.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
namespace rocksdb {
class Arena;
class Slice;
class SliceTransform;
class MemTableRep {
public:
// KeyComparator provides a means to compare keys, which are internal keys

@ -15,11 +15,6 @@
#include <vector>
#include <stdint.h>
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/universal_compaction.h"
namespace rocksdb {
@ -34,6 +29,11 @@ class Logger;
class MergeOperator;
class Snapshot;
class TableFactory;
class MemTableRepFactory;
class TablePropertiesCollector;
class Slice;
class SliceTransform;
class Statistics;
using std::shared_ptr;
@ -772,20 +772,27 @@ struct ReadOptions {
// Default: kReadAllTier
ReadTier read_tier;
// Specify to create a tailing iterator -- a special iterator that has a
// view of the complete database (i.e. it can also be used to read newly
// added data) and is optimized for sequential reads.
bool tailing;
ReadOptions()
: verify_checksums(false),
fill_cache(true),
prefix_seek(false),
snapshot(nullptr),
prefix(nullptr),
read_tier(kReadAllTier) {}
read_tier(kReadAllTier),
tailing(false) {}
ReadOptions(bool cksum, bool cache)
: verify_checksums(cksum),
fill_cache(cache),
prefix_seek(false),
snapshot(nullptr),
prefix(nullptr),
read_tier(kReadAllTier) {}
read_tier(kReadAllTier),
tailing(false) {}
};
// Options that control write operations

@ -6,14 +6,8 @@
#ifndef STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H
#define STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H
#include <stddef.h>
#include <string>
#include <memory>
#include <vector>
#include <stdint.h>
#include <climits>
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
namespace rocksdb {

@ -85,9 +85,9 @@ class StackableDB : public DB {
return db_->GetApproximateSizes(r, n, sizes);
}
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) override {
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) override {
return db_->CompactRange(begin, end, reduce_level, target_level);
}

@ -20,10 +20,10 @@ namespace rocksdb {
Status BlockBasedTableFactory::GetTableReader(
const Options& options, const EnvOptions& soptions,
unique_ptr<RandomAccessFile> && file, uint64_t file_size,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const {
return BlockBasedTable::Open(options, soptions, std::move(file), file_size,
table_reader);
return BlockBasedTable::Open(options, soptions, table_options_,
std::move(file), file_size, table_reader);
}
TableBuilder* BlockBasedTableFactory::GetTableBuilder(

@ -14,6 +14,7 @@
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "table/block_based_table_options.h"
namespace rocksdb {
@ -30,40 +31,25 @@ class BlockBasedTable;
class BlockBasedTableBuilder;
class BlockBasedTableFactory: public TableFactory {
public:
struct TableOptions {
// @flush_block_policy_factory creates the instances of flush block policy.
// which provides a configurable way to determine when to flush a block in
// the block based tables. If not set, table builder will use the default
// block flush policy, which cut blocks by block size (please refer to
// `FlushBlockBySizePolicy`).
std::shared_ptr<FlushBlockPolicyFactory> flush_block_policy_factory;
};
public:
BlockBasedTableFactory() : BlockBasedTableFactory(BlockBasedTableOptions()) {}
explicit BlockBasedTableFactory(const BlockBasedTableOptions& table_options)
: table_options_(table_options) {}
BlockBasedTableFactory() : BlockBasedTableFactory(TableOptions()) { }
BlockBasedTableFactory(const TableOptions& table_options):
table_options_(table_options) {
}
~BlockBasedTableFactory() {}
~BlockBasedTableFactory() {
}
const char* Name() const override {
return "BlockBasedTable";
}
const char* Name() const override { return "BlockBasedTable"; }
Status GetTableReader(const Options& options, const EnvOptions& soptions,
unique_ptr<RandomAccessFile> && file,
uint64_t file_size,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const override;
TableBuilder* GetTableBuilder(const Options& options, WritableFile* file,
CompressionType compression_type) const
override;
CompressionType compression_type)
const override;
private:
TableOptions table_options_;
BlockBasedTableOptions table_options_;
};
} // namespace rocksdb

@ -0,0 +1,31 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <memory>
namespace rocksdb {
class FlushBlockPolicyFactory;
struct BlockBasedTableOptions {
// @flush_block_policy_factory creates the instances of flush block policy.
// which provides a configurable way to determine when to flush a block in
// the block based tables. If not set, table builder will use the default
// block flush policy, which cut blocks by block size (please refer to
// `FlushBlockBySizePolicy`).
std::shared_ptr<FlushBlockPolicyFactory> flush_block_policy_factory;
// TODO(kailiu) Temporarily disable this feature by making the default value
// to be false. Also in master branch, this file is non-public so no user
// will be able to change the value of `cache_index_and_filter_blocks`.
//
// Indicating if we'd put index/filter blocks to the block cache.
// If not specified, each "table reader" object will pre-load index/filter
// block during table initialization.
bool cache_index_and_filter_blocks = false;
};
} // namespace rocksdb

@ -27,6 +27,7 @@
#include "util/coding.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
#include "table/block_based_table_options.h"
namespace rocksdb {
@ -48,9 +49,9 @@ struct BlockBasedTable::Rep {
Status status;
unique_ptr<RandomAccessFile> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t cache_key_prefix_size;
size_t cache_key_prefix_size = 0;
char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;
size_t compressed_cache_key_prefix_size = 0;
// Handle to metaindex_block: saved from footer
BlockHandle metaindex_handle;
@ -223,15 +224,15 @@ Cache::Handle* GetFromBlockCache(
} // end of anonymous namespace
Status BlockBasedTable::Open(const Options& options,
const EnvOptions& soptions,
unique_ptr<RandomAccessFile> && file,
uint64_t size,
Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions,
const BlockBasedTableOptions& table_options,
unique_ptr<RandomAccessFile>&& file,
uint64_t file_size,
unique_ptr<TableReader>* table_reader) {
table_reader->reset();
Footer footer(kBlockBasedTableMagicNumber);
auto s = ReadFooterFromFile(file.get(), size, &footer);
auto s = ReadFooterFromFile(file.get(), file_size, &footer);
if (!s.ok()) return s;
// We've successfully read the footer and the index block: we're
@ -254,13 +255,8 @@ Status BlockBasedTable::Open(const Options& options,
if (meta_iter->Valid() && meta_iter->key() == kPropertiesBlock) {
s = meta_iter->status();
if (s.ok()) {
s = ReadProperties(
meta_iter->value(),
rep->file.get(),
rep->options.env,
rep->options.info_log.get(),
&rep->table_properties
);
s = ReadProperties(meta_iter->value(), rep->file.get(), rep->options.env,
rep->options.info_log.get(), &rep->table_properties);
}
if (!s.ok()) {
@ -271,11 +267,21 @@ Status BlockBasedTable::Open(const Options& options,
}
}
// Initialize index/filter blocks. If block cache is not specified,
// these blocks will be kept in member variables in Rep, which will
// reside in the memory as long as this table object is alive; otherwise
// they will be added to block cache.
if (!options.block_cache) {
// Will use block cache for index/filter blocks access?
if (options.block_cache && table_options.cache_index_and_filter_blocks) {
// Call IndexBlockReader() to implicitly add index to the block_cache
unique_ptr<Iterator> iter(new_table->IndexBlockReader(ReadOptions()));
s = iter->status();
if (s.ok()) {
// Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry = new_table->GetFilter();
filter_entry.Release(options.block_cache.get());
}
} else {
// If we don't use block cache for index/filter blocks access, we'll
// pre-load these blocks, which will kept in member variables in Rep
// and with a same life-time as this table object.
Block* index_block = nullptr;
// TODO: we never really verify check sum for index block
s = ReadBlockFromFile(
@ -303,18 +309,7 @@ Status BlockBasedTable::Open(const Options& options,
} else {
delete index_block;
}
} else {
// Call IndexBlockReader() to implicitly add index to the block_cache
unique_ptr<Iterator> iter(
new_table->IndexBlockReader(ReadOptions())
);
s = iter->status();
if (s.ok()) {
// Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry = new_table->GetFilter();
filter_entry.Release(options.block_cache.get());
}
}
if (s.ok()) {
@ -740,7 +735,6 @@ BlockBasedTable::GetFilter(bool no_io) const {
// Get the iterator from the index block.
Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const {
if (rep_->index_block) {
assert (!rep_->options.block_cache);
return rep_->index_block->NewIterator(rep_->options.comparator);
}

@ -29,6 +29,7 @@ struct ReadOptions;
class TableCache;
class TableReader;
class FilterBlockReader;
struct BlockBasedTableOptions;
using std::unique_ptr;
@ -49,10 +50,9 @@ class BlockBasedTable : public TableReader {
// to nullptr and returns a non-ok status.
//
// *file must remain live while this Table is in use.
static Status Open(const Options& options,
const EnvOptions& soptions,
unique_ptr<RandomAccessFile>&& file,
uint64_t file_size,
static Status Open(const Options& db_options, const EnvOptions& env_options,
const BlockBasedTableOptions& table_options,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader);
bool PrefixMayMatch(const Slice& internal_prefix) override;

@ -23,7 +23,8 @@ class Env;
// key is present in K child iterators, it will be yielded K times.
//
// REQUIRES: n >= 0
extern Iterator* NewMergingIterator(
Env* const env, const Comparator* comparator, Iterator** children, int n);
extern Iterator* NewMergingIterator(Env* const env,
const Comparator* comparator,
Iterator** children, int n);
} // namespace rocksdb

@ -11,6 +11,7 @@
#include "rocksdb/comparator.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/table_properties.h"
#include "table/block_builder.h"
namespace rocksdb {

@ -10,6 +10,7 @@
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/plain_table_factory.h"
namespace rocksdb {
@ -38,7 +39,7 @@ using std::unordered_map;
class PlainTableReader: public TableReader {
public:
static Status Open(const Options& options, const EnvOptions& soptions,
unique_ptr<RandomAccessFile> && file, uint64_t file_size,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table, const int bloom_num_bits,
double hash_table_ratio);
@ -196,7 +197,7 @@ class PlainTableReader: public TableReader {
uint32_t& ret_offset);
Slice GetPrefix(const Slice& target) {
assert(target.size() >= 8); // target is internal key
assert(target.size() >= 8); // target is internal key
return options_.prefix_extractor->Transform(
Slice(target.data(), target.size() - 8));
}

@ -293,14 +293,11 @@ class KeyConvertingIterator: public Iterator {
class TableConstructor: public Constructor {
public:
explicit TableConstructor(
const Comparator* cmp, bool convert_to_internal_key = false)
: Constructor(cmp),
convert_to_internal_key_(convert_to_internal_key) {
}
~TableConstructor() {
Reset();
}
explicit TableConstructor(const Comparator* cmp,
bool convert_to_internal_key = false)
: Constructor(cmp), convert_to_internal_key_(convert_to_internal_key) {}
~TableConstructor() { Reset(); }
virtual Status FinishImpl(const Options& options, const KVMap& data) {
Reset();
sink_.reset(new StringSink());
@ -329,14 +326,11 @@ class TableConstructor: public Constructor {
// Open the table
uniq_id_ = cur_uniq_id_++;
source_.reset(
new StringSource(sink_->contents(), uniq_id_,
options.allow_mmap_reads));
unique_ptr<TableFactory> table_factory;
return options.table_factory->GetTableReader(options, soptions,
std::move(source_),
sink_->contents().size(),
&table_reader_);
source_.reset(new StringSource(sink_->contents(), uniq_id_,
options.allow_mmap_reads));
return options.table_factory->GetTableReader(
options, soptions, std::move(source_), sink_->contents().size(),
&table_reader_);
}
virtual Iterator* NewIterator() const {
@ -630,7 +624,7 @@ class Harness {
internal_comparator_.reset(new InternalKeyComparator(options_.comparator));
support_prev_ = true;
only_support_prefix_seek_ = false;
BlockBasedTableFactory::TableOptions table_options;
BlockBasedTableOptions table_options;
switch (args.type) {
case BLOCK_BASED_TABLE_TEST:
table_options.flush_block_policy_factory.reset(
@ -1053,6 +1047,11 @@ TEST(BlockBasedTableTest, BlockCacheTest) {
options.create_if_missing = true;
options.statistics = CreateDBStatistics();
options.block_cache = NewLRUCache(1024);
// Enable the cache for index/filter blocks
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
std::vector<std::string> keys;
KVMap kvmap;

@ -31,6 +31,8 @@
#include "utilities/utility_db.h"
#include "rocksdb/env.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "port/port.h"
#include "util/coding.h"

@ -417,6 +417,7 @@ class ShardedLRUCache : public Cache {
virtual size_t GetCapacity() const {
return capacity_;
}
virtual size_t GetUsage() const {
// We will not lock the cache when getting the usage from shards.
// for (size_t i = 0; i < num_shard_bits_; ++i)
@ -427,6 +428,10 @@ class ShardedLRUCache : public Cache {
}
return usage;
}
virtual void DisownData() {
shards_ = nullptr;
}
};
} // end anonymous namespace

@ -8,7 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/coding.h"
#include <algorithm>
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
namespace rocksdb {

@ -17,6 +17,8 @@
#include <stdint.h>
#include <string.h>
#include <string>
#include "rocksdb/write_batch.h"
#include "port/port.h"
namespace rocksdb {

@ -366,6 +366,11 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname,
return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv");
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return Status::NotSupported("NewDirectory not yet supported on HdfsEnv");
}
bool HdfsEnv::FileExists(const std::string& fname) {
int value = hdfsExists(fileSys_, fname.c_str());
if (value == 0) {

@ -873,6 +873,24 @@ class PosixRandomRWFile : public RandomRWFile {
#endif
};
class PosixDirectory : public Directory {
public:
explicit PosixDirectory(int fd) : fd_(fd) {}
~PosixDirectory() {
close(fd_);
}
virtual Status Fsync() {
if (fsync(fd_) == -1) {
return IOError("directory", errno);
}
return Status::OK();
}
private:
int fd_;
};
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
mutex_lockedFiles.Lock();
if (lock) {
@ -1044,6 +1062,18 @@ class PosixEnv : public Env {
return s;
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
result->reset();
const int fd = open(name.c_str(), 0);
if (fd < 0) {
return IOError(name, errno);
} else {
result->reset(new PosixDirectory(fd));
}
return Status::OK();
}
virtual bool FileExists(const std::string& fname) {
return access(fname.c_str(), F_OK) == 0;
}

@ -1069,23 +1069,8 @@ void ReduceDBLevelsCommand::DoCommand() {
CloseDB();
EnvOptions soptions;
TableCache tc(db_path_, &opt, soptions, 10);
const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, soptions, &tc, &cmp);
// We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change (like LogAndApply)
// to the manifest file.
st = versions.Recover();
if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
return;
}
port::Mutex mu;
mu.Lock();
st = versions.ReduceNumberOfLevels(new_levels_, &mu);
mu.Unlock();
st = VersionSet::ReduceNumberOfLevels(db_path_, &opt, soptions, new_levels_);
if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
return;

@ -17,6 +17,10 @@
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table_properties.h"
#include "table/block_based_table_factory.h"
namespace rocksdb {

@ -10,6 +10,7 @@
#include "utilities/backupable_db.h"
#include "db/filename.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "rocksdb/transaction_log.h"
#define __STDC_FORMAT_MACROS
@ -21,6 +22,7 @@
#include <string>
#include <limits>
#include <atomic>
#include <unordered_map>
namespace rocksdb {
@ -47,12 +49,22 @@ class BackupEngine {
void DeleteBackupsNewerThan(uint64_t sequence_number);
private:
struct FileInfo {
FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
: refs(0), filename(fname), size(sz), checksum_value(checksum) {}
int refs;
const std::string filename;
const uint64_t size;
uint32_t checksum_value;
};
class BackupMeta {
public:
BackupMeta(const std::string& meta_filename,
std::unordered_map<std::string, int>* file_refs, Env* env)
std::unordered_map<std::string, FileInfo>* file_infos, Env* env)
: timestamp_(0), size_(0), meta_filename_(meta_filename),
file_refs_(file_refs), env_(env) {}
file_infos_(file_infos), env_(env) {}
~BackupMeta() {}
@ -72,7 +84,8 @@ class BackupEngine {
return sequence_number_;
}
void AddFile(const std::string& filename, uint64_t size);
Status AddFile(const FileInfo& file_info);
void Delete();
bool Empty() {
@ -95,7 +108,7 @@ class BackupEngine {
std::string const meta_filename_;
// files with relative paths (without "/" prefix!!)
std::vector<std::string> files_;
std::unordered_map<std::string, int>* file_refs_;
std::unordered_map<std::string, FileInfo>* file_infos_;
Env* env_;
static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
@ -140,6 +153,7 @@ class BackupEngine {
Env* dst_env,
bool sync,
uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0);
// if size_limit == 0, there is no size limit, copy everything
Status BackupFile(BackupID backup_id,
@ -148,15 +162,21 @@ class BackupEngine {
const std::string& src_dir,
const std::string& src_fname, // starts with "/"
uint64_t size_limit = 0);
Status CalculateChecksum(const std::string& src,
Env* src_env,
uint64_t size_limit,
uint32_t* checksum_value);
// Will delete all the files we don't need anymore
// If full_scan == true, it will do the full scan of files/ directory
// and delete all the files that are not referenced from backuped_file_refs_
// and delete all the files that are not referenced from backuped_file_infos__
void GarbageCollection(bool full_scan);
// backup state data
BackupID latest_backup_id_;
std::map<BackupID, BackupMeta> backups_;
std::unordered_map<std::string, int> backuped_file_refs_;
std::unordered_map<std::string, FileInfo> backuped_file_infos_;
std::vector<BackupID> obsolete_backups_;
std::atomic<bool> stop_backup_;
@ -197,7 +217,7 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
assert(backups_.find(backup_id) == backups_.end());
backups_.insert(std::make_pair(
backup_id, BackupMeta(GetBackupMetaFile(backup_id),
&backuped_file_refs_, backup_env_)));
&backuped_file_infos_, backup_env_)));
}
if (options_.destroy_old_data) { // Destory old data
@ -301,7 +321,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
assert(backups_.find(new_backup_id) == backups_.end());
auto ret = backups_.insert(std::make_pair(
new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id),
&backuped_file_refs_, backup_env_)));
&backuped_file_infos_, backup_env_)));
assert(ret.second == true);
auto& new_backup = ret.first->second;
new_backup.RecordTimestamp();
@ -477,10 +497,19 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id,
"/" + dst;
Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false);
uint32_t checksum_value;
s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false,
nullptr /* size */, &checksum_value);
if (!s.ok()) {
break;
}
const auto iter = backuped_file_infos_.find(file);
assert(iter != backuped_file_infos_.end());
if (iter->second.checksum_value != checksum_value) {
s = Status::Corruption("Checksum check failed");
break;
}
}
Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str());
@ -554,6 +583,7 @@ Status BackupEngine::CopyFile(const std::string& src,
Env* dst_env,
bool sync,
uint64_t* size,
uint32_t* checksum_value,
uint64_t size_limit) {
Status s;
unique_ptr<WritableFile> dst_file;
@ -563,6 +593,9 @@ Status BackupEngine::CopyFile(const std::string& src,
if (size != nullptr) {
*size = 0;
}
if (checksum_value != nullptr) {
*checksum_value = 0;
}
// Check if size limit is set. if not, set it to very big number
if (size_limit == 0) {
@ -588,12 +621,19 @@ Status BackupEngine::CopyFile(const std::string& src,
copy_file_buffer_size_ : size_limit;
s = src_file->Read(buffer_to_read, &data, buf.get());
size_limit -= data.size();
if (!s.ok()) {
return s;
}
if (size != nullptr) {
*size += data.size();
}
if (s.ok()) {
s = dst_file->Append(data);
if (checksum_value != nullptr) {
*checksum_value = crc32c::Extend(*checksum_value, data.data(),
data.size());
}
s = dst_file->Append(data);
} while (s.ok() && data.size() > 0 && size_limit > 0);
if (s.ok() && sync) {
@ -628,9 +668,15 @@ Status BackupEngine::BackupFile(BackupID backup_id,
// if it's shared, we also need to check if it exists -- if it does,
// no need to copy it again
uint32_t checksum_value = 0;
if (shared && backup_env_->FileExists(dst_path)) {
backup_env_->GetFileSize(dst_path, &size); // Ignore error
Log(options_.info_log, "%s already present", src_fname.c_str());
Log(options_.info_log, "%s already present, calculate checksum",
src_fname.c_str());
s = CalculateChecksum(src_dir + src_fname,
db_env_,
size_limit,
&checksum_value);
} else {
Log(options_.info_log, "Copying %s", src_fname.c_str());
s = CopyFile(src_dir + src_fname,
@ -639,22 +685,63 @@ Status BackupEngine::BackupFile(BackupID backup_id,
backup_env_,
options_.sync,
&size,
&checksum_value,
size_limit);
if (s.ok() && shared) {
s = backup_env_->RenameFile(dst_path_tmp, dst_path);
}
}
if (s.ok()) {
backup->AddFile(dst_relative, size);
s = backup->AddFile(FileInfo(dst_relative, size, checksum_value));
}
return s;
}
Status BackupEngine::CalculateChecksum(const std::string& src,
Env* src_env,
uint64_t size_limit,
uint32_t* checksum_value) {
*checksum_value = 0;
if (size_limit == 0) {
size_limit = std::numeric_limits<uint64_t>::max();
}
EnvOptions env_options;
env_options.use_mmap_writes = false;
std::unique_ptr<SequentialFile> src_file;
Status s = src_env->NewSequentialFile(src, &src_file, env_options);
if (!s.ok()) {
return s;
}
std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
Slice data;
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped");
}
size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
copy_file_buffer_size_ : size_limit;
s = src_file->Read(buffer_to_read, &data, buf.get());
if (!s.ok()) {
return s;
}
size_limit -= data.size();
*checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
} while (data.size() > 0 && size_limit > 0);
return s;
}
void BackupEngine::GarbageCollection(bool full_scan) {
Log(options_.info_log, "Starting garbage collection");
std::vector<std::string> to_delete;
for (auto& itr : backuped_file_refs_) {
if (itr.second == 0) {
for (auto& itr : backuped_file_infos_) {
if (itr.second.refs == 0) {
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
s.ToString().c_str());
@ -662,7 +749,7 @@ void BackupEngine::GarbageCollection(bool full_scan) {
}
}
for (auto& td : to_delete) {
backuped_file_refs_.erase(td);
backuped_file_infos_.erase(td);
}
if (!full_scan) {
// take care of private dirs -- if full_scan == true, then full_scan will
@ -685,7 +772,7 @@ void BackupEngine::GarbageCollection(bool full_scan) {
for (auto& child : shared_children) {
std::string rel_fname = GetSharedFileRel(child);
// if it's not refcounted, delete it
if (backuped_file_refs_.find(rel_fname) == backuped_file_refs_.end()) {
if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) {
// this might be a directory, but DeleteFile will just fail in that
// case, so we're good
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
@ -730,23 +817,34 @@ void BackupEngine::GarbageCollection(bool full_scan) {
// ------- BackupMeta class --------
void BackupEngine::BackupMeta::AddFile(const std::string& filename,
uint64_t size) {
size_ += size;
files_.push_back(filename);
auto itr = file_refs_->find(filename);
if (itr == file_refs_->end()) {
file_refs_->insert(std::make_pair(filename, 1));
Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) {
size_ += file_info.size;
files_.push_back(file_info.filename);
auto itr = file_infos_->find(file_info.filename);
if (itr == file_infos_->end()) {
auto ret = file_infos_->insert({file_info.filename, file_info});
if (ret.second) {
ret.first->second.refs = 1;
} else {
// if this happens, something is seriously wrong
return Status::Corruption("In memory metadata insertion error");
}
} else {
++itr->second; // increase refcount if already present
if (itr->second.checksum_value != file_info.checksum_value) {
return Status::Corruption("Checksum mismatch for existing backup file");
}
++itr->second.refs; // increase refcount if already present
}
return Status::OK();
}
void BackupEngine::BackupMeta::Delete() {
for (auto& file : files_) {
auto itr = file_refs_->find(file);
assert(itr != file_refs_->end());
--(itr->second); // decrease refcount
for (const auto& file : files_) {
auto itr = file_infos_->find(file);
assert(itr != file_infos_->end());
--(itr->second.refs); // decrease refcount
}
files_.clear();
// delete meta file
@ -758,8 +856,8 @@ void BackupEngine::BackupMeta::Delete() {
// <timestamp>
// <seq number>
// <number of files>
// <file1>
// <file2>
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
// ...
// TODO: maybe add checksum?
Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
@ -789,18 +887,40 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
sscanf(data.data(), "%u%n", &num_files, &bytes_read);
data.remove_prefix(bytes_read + 1); // +1 for '\n'
std::vector<std::pair<std::string, uint64_t>> files;
std::vector<FileInfo> files;
for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
std::string filename = GetSliceUntil(&data, '\n').ToString();
auto line = GetSliceUntil(&data, '\n');
std::string filename = GetSliceUntil(&line, ' ').ToString();
uint64_t size;
s = env_->GetFileSize(backup_dir + "/" + filename, &size);
files.push_back(std::make_pair(filename, size));
if (line.empty()) {
return Status::Corruption("File checksum is missing");
}
uint32_t checksum_value = 0;
if (line.starts_with("crc32 ")) {
line.remove_prefix(6);
sscanf(line.data(), "%u", &checksum_value);
if (memcmp(line.data(), std::to_string(checksum_value).c_str(),
line.size() - 1) != 0) {
return Status::Corruption("Invalid checksum value");
}
} else {
return Status::Corruption("Unknown checksum type");
}
files.emplace_back(filename, size, checksum_value);
}
if (s.ok()) {
for (auto file : files) {
AddFile(file.first, file.second);
for (const auto& file_info : files) {
s = AddFile(file_info);
if (!s.ok()) {
break;
}
}
}
@ -824,8 +944,13 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
sequence_number_);
len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size());
for (size_t i = 0; i < files_.size(); ++i) {
len += snprintf(buf.get() + len, buf_size - len, "%s\n", files_[i].c_str());
for (const auto& file : files_) {
const auto& iter = file_infos_->find(file);
assert(iter != file_infos_->end());
// use crc32 for now, switch to something else if needed
len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
file.c_str(), iter->second.checksum_value);
}
s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));

@ -154,7 +154,6 @@ class TestEnv : public EnvWrapper {
Status NewSequentialFile(const std::string& f,
unique_ptr<SequentialFile>* r,
const EnvOptions& options) {
opened_files_.push_back(f);
if (dummy_sequential_file_) {
r->reset(new TestEnv::DummySequentialFile());
return Status::OK();
@ -165,6 +164,7 @@ class TestEnv : public EnvWrapper {
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
const EnvOptions& options) {
written_files_.push_back(f);
if (limit_written_files_ <= 0) {
return Status::IOError("Sorry, can't do this");
}
@ -172,14 +172,14 @@ class TestEnv : public EnvWrapper {
return EnvWrapper::NewWritableFile(f, r, options);
}
void AssertOpenedFiles(std::vector<std::string>& should_have_opened) {
sort(should_have_opened.begin(), should_have_opened.end());
sort(opened_files_.begin(), opened_files_.end());
ASSERT_TRUE(opened_files_ == should_have_opened);
void AssertWrittenFiles(std::vector<std::string>& should_have_written) {
sort(should_have_written.begin(), should_have_written.end());
sort(written_files_.begin(), written_files_.end());
ASSERT_TRUE(written_files_ == should_have_written);
}
void ClearOpenedFiles() {
opened_files_.clear();
void ClearWrittenFiles() {
written_files_.clear();
}
void SetLimitWrittenFiles(uint64_t limit) {
@ -192,7 +192,7 @@ class TestEnv : public EnvWrapper {
private:
bool dummy_sequential_file_ = false;
std::vector<std::string> opened_files_;
std::vector<std::string> written_files_;
uint64_t limit_written_files_ = 1000000;
}; // TestEnv
@ -239,6 +239,46 @@ class FileManager : public EnvWrapper {
return s;
}
Status CorruptChecksum(const std::string& fname, bool appear_valid) {
std::string metadata;
Status s = ReadFileToString(this, fname, &metadata);
if (!s.ok()) {
return s;
}
s = DeleteFile(fname);
if (!s.ok()) {
return s;
}
std::vector<int64_t> positions;
auto pos = metadata.find(" crc32 ");
if (pos == std::string::npos) {
return Status::Corruption("checksum not found");
}
do {
positions.push_back(pos);
pos = metadata.find(" crc32 ", pos + 6);
} while (pos != std::string::npos);
pos = positions[rnd_.Next() % positions.size()];
if (metadata.size() < pos + 7) {
return Status::Corruption("bad CRC32 checksum value");
}
if (appear_valid) {
if (metadata[pos + 8] == '\n') {
// single digit value, safe to insert one more digit
metadata.insert(pos + 8, 1, '0');
} else {
metadata.erase(pos + 8, 1);
}
} else {
metadata[pos + 7] = 'a';
}
return WriteToFile(fname, metadata);
}
Status WriteToFile(const std::string& fname, const std::string& data) {
unique_ptr<WritableFile> file;
EnvOptions env_options;
@ -249,6 +289,7 @@ class FileManager : public EnvWrapper {
}
return file->Append(Slice(data));
}
private:
Random rnd_;
}; // FileManager
@ -412,30 +453,43 @@ TEST(BackupableDBTest, NoDoubleCopy) {
// should write 5 DB files + LATEST_BACKUP + one meta file
test_backup_env_->SetLimitWrittenFiles(7);
test_db_env_->ClearOpenedFiles();
test_backup_env_->ClearWrittenFiles();
test_db_env_->SetLimitWrittenFiles(0);
dummy_db_->live_files_ = { "/00010.sst", "/00011.sst",
"/CURRENT", "/MANIFEST-01" };
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
ASSERT_OK(db_->CreateNewBackup(false));
std::vector<std::string> should_have_openened = dummy_db_->live_files_;
should_have_openened.push_back("/00011.log");
AppendPath(dbname_, should_have_openened);
test_db_env_->AssertOpenedFiles(should_have_openened);
std::vector<std::string> should_have_written = {
"/shared/00010.sst.tmp",
"/shared/00011.sst.tmp",
"/private/1.tmp/CURRENT",
"/private/1.tmp/MANIFEST-01",
"/private/1.tmp/00011.log",
"/meta/1.tmp",
"/LATEST_BACKUP.tmp"
};
AppendPath(dbname_ + "_backup", should_have_written);
test_backup_env_->AssertWrittenFiles(should_have_written);
// should write 4 new DB files + LATEST_BACKUP + one meta file
// should not write/copy 00010.sst, since it's already there!
test_backup_env_->SetLimitWrittenFiles(6);
test_db_env_->ClearOpenedFiles();
test_backup_env_->ClearWrittenFiles();
dummy_db_->live_files_ = { "/00010.sst", "/00015.sst",
"/CURRENT", "/MANIFEST-01" };
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
ASSERT_OK(db_->CreateNewBackup(false));
// should not open 00010.sst - it's already there
should_have_openened = { "/00015.sst", "/CURRENT",
"/MANIFEST-01", "/00011.log" };
AppendPath(dbname_, should_have_openened);
test_db_env_->AssertOpenedFiles(should_have_openened);
should_have_written = {
"/shared/00015.sst.tmp",
"/private/2.tmp/CURRENT",
"/private/2.tmp/MANIFEST-01",
"/private/2.tmp/00011.log",
"/meta/2.tmp",
"/LATEST_BACKUP.tmp"
};
AppendPath(dbname_ + "_backup", should_have_written);
test_backup_env_->AssertWrittenFiles(should_have_written);
ASSERT_OK(db_->DeleteBackup(1));
ASSERT_EQ(true,
@ -463,6 +517,8 @@ TEST(BackupableDBTest, NoDoubleCopy) {
// 3. Corrupted backup meta file or missing backuped file - we should
// not be able to open that backup, but all other backups should be
// fine
// 4. Corrupted checksum value - if the checksum is not a valid uint32_t,
// db open should fail, otherwise, it aborts during the restore process.
TEST(BackupableDBTest, CorruptionsTest) {
const int keys_iteration = 5000;
Random rnd(6);
@ -519,12 +575,29 @@ TEST(BackupableDBTest, CorruptionsTest) {
CloseRestoreDB();
ASSERT_TRUE(!s.ok());
// new backup should be 4!
// --------- case 4. corrupted checksum value ----
ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false));
// checksum of backup 3 is an invalid value, this can be detected at
// db open time, and it reverts to the previous backup automatically
AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5);
// checksum of the backup 2 appears to be valid, this can cause checksum
// mismatch and abort restore process
ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true));
ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2"));
OpenRestoreDB();
ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2"));
s = restore_db_->RestoreDBFromBackup(2, dbname_, dbname_);
ASSERT_TRUE(!s.ok());
ASSERT_OK(restore_db_->DeleteBackup(2));
CloseRestoreDB();
AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5);
// new backup should be 2!
OpenBackupableDB();
FillDB(db_.get(), keys_iteration * 3, keys_iteration * 4);
FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2);
ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2)));
CloseBackupableDB();
AssertBackupConsistency(4, 0, keys_iteration * 4, keys_iteration * 5);
AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5);
}
// open DB, write, close DB, backup, restore, repeat

Loading…
Cancel
Save