diff --git a/build_tools/regression_build_test.sh b/build_tools/regression_build_test.sh index b0c130e3c..d38b67c3c 100755 --- a/build_tools/regression_build_test.sh +++ b/build_tools/regression_build_test.sh @@ -50,7 +50,7 @@ make release --num=$NUM \ --writes=$NUM \ --cache_size=6442450944 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --open_files=55000 \ --statistics=1 \ @@ -68,7 +68,7 @@ make release --num=$NUM \ --writes=$((NUM / 10)) \ --cache_size=6442450944 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --open_files=55000 \ --statistics=1 \ @@ -87,7 +87,7 @@ make release --num=$NUM \ --writes=$NUM \ --cache_size=6442450944 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --open_files=55000 \ --statistics=1 \ @@ -106,7 +106,7 @@ make release --num=$NUM \ --reads=$((NUM / 5)) \ --cache_size=6442450944 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --open_files=55000 \ --disable_seek_compaction=1 \ @@ -126,7 +126,7 @@ make release --num=$NUM \ --reads=$((NUM / 5)) \ --cache_size=104857600 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --open_files=55000 \ --disable_seek_compaction=1 \ @@ -147,7 +147,7 @@ make release --reads=$((NUM / 5)) \ --writes=512 \ --cache_size=6442450944 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --write_buffer_size=1000000000 \ --open_files=55000 \ @@ -169,7 +169,7 @@ make release --num=$((NUM / 4)) \ --writes=$((NUM / 4)) \ --cache_size=6442450944 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --open_files=55000 \ --statistics=1 \ @@ -179,6 +179,25 @@ make release --sync=0 \ --threads=1 > /dev/null +# dummy test just to compact the data +./db_bench \ + --benchmarks=readrandom \ + --db=$DATA_DIR \ + --use_existing_db=1 \ + --bloom_bits=10 \ + --num=$((NUM / 1000)) \ + --reads=$((NUM / 1000)) \ + --cache_size=6442450944 \ + --cache_numshardbits=6 \ + --table_cache_numshardbits=4 \ + --open_files=55000 \ + --statistics=1 \ + --histogram=1 \ + --disable_data_sync=1 \ + --disable_wal=1 \ + --sync=0 \ + --threads=16 > /dev/null + # measure readrandom after load with filluniquerandom with 6GB block cache ./db_bench \ --benchmarks=readrandom \ @@ -188,7 +207,7 @@ make release --num=$((NUM / 4)) \ --reads=$((NUM / 4)) \ --cache_size=6442450944 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --open_files=55000 \ --disable_seek_compaction=1 \ @@ -200,6 +219,28 @@ make release --sync=0 \ --threads=16 > ${STAT_FILE}.readrandom_filluniquerandom +# measure readwhilewriting after load with filluniquerandom with 6GB block cache +./db_bench \ + --benchmarks=readwhilewriting \ + --db=$DATA_DIR \ + --use_existing_db=1 \ + --bloom_bits=10 \ + --num=$((NUM / 4)) \ + --reads=$((NUM / 4)) \ + --writes_per_second=1000 \ + --write_buffer_size=100000000 \ + --cache_size=6442450944 \ + --cache_numshardbits=6 \ + --table_cache_numshardbits=4 \ + --open_files=55000 \ + --disable_seek_compaction=1 \ + --statistics=1 \ + --histogram=1 \ + --disable_data_sync=1 \ + --disable_wal=1 \ + --sync=0 \ + --threads=16 > ${STAT_FILE}.readwhilewriting + # measure memtable performance -- none of the data gets flushed to disk ./db_bench \ --benchmarks=fillrandom,readrandom, \ @@ -208,7 +249,7 @@ make release --num=$((NUM / 10)) \ --reads=$NUM \ --cache_size=6442450944 \ - --cache_numshardbits=4 \ + --cache_numshardbits=6 \ --table_cache_numshardbits=4 \ --write_buffer_size=1000000000 \ --open_files=55000 \ @@ -264,3 +305,4 @@ send_benchmark_to_ods readrandom readrandom_memtable_sst $STAT_FILE.readrandom_m send_benchmark_to_ods readrandom readrandom_fillunique_random $STAT_FILE.readrandom_filluniquerandom send_benchmark_to_ods fillrandom memtablefillrandom $STAT_FILE.memtablefillreadrandom send_benchmark_to_ods readrandom memtablereadrandom $STAT_FILE.memtablefillreadrandom +send_benchmark_to_ods readwhilewriting readwhilewriting $STAT_FILE.readwhilewriting diff --git a/db/builder.cc b/db/builder.cc index ad1334a15..61671db0d 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -42,7 +42,7 @@ Status BuildTable(const std::string& dbname, const Comparator* user_comparator, const SequenceNumber newest_snapshot, const SequenceNumber earliest_seqno_in_memtable, - const bool enable_compression) { + const CompressionType compression) { Status s; meta->file_size = 0; meta->smallest_seqno = meta->largest_seqno = 0; @@ -65,7 +65,7 @@ Status BuildTable(const std::string& dbname, } TableBuilder* builder = GetTableBuilder(options, file.get(), - options.compression); + compression); // the first key is the smallest key Slice key = iter->key(); diff --git a/db/builder.h b/db/builder.h index 8c525bd05..2600dc24b 100644 --- a/db/builder.h +++ b/db/builder.h @@ -43,6 +43,6 @@ extern Status BuildTable(const std::string& dbname, const Comparator* user_comparator, const SequenceNumber newest_snapshot, const SequenceNumber earliest_seqno_in_memtable, - const bool enable_compression); + const CompressionType compression); } // namespace rocksdb diff --git a/db/c.cc b/db/c.cc index 36ee2d486..68f361336 100644 --- a/db/c.cc +++ b/db/c.cc @@ -788,6 +788,10 @@ void rocksdb_env_set_background_threads(rocksdb_env_t* env, int n) { env->rep->SetBackgroundThreads(n); } +void rocksdb_env_set_high_priority_background_threads(rocksdb_env_t* env, int n) { + env->rep->SetBackgroundThreads(n, Env::HIGH); +} + void rocksdb_env_destroy(rocksdb_env_t* env) { if (!env->is_default) delete env->rep; delete env; diff --git a/db/db_bench.cc b/db/db_bench.cc index eb5d7cb42..e0ba58281 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -94,6 +94,8 @@ DEFINE_string(benchmarks, "\tmergerandom -- same as updaterandom/appendrandom using merge" " operator. " "Must be used with merge_operator\n" + "\treadrandommergerandom -- perform N random read-or-merge " + "operations. Must be used with merge_operator\n" "\tseekrandom -- N random seeks\n" "\tcrc32c -- repeated crc32c of 4K of data\n" "\tacquireload -- load N*1000 times\n" @@ -112,6 +114,11 @@ DEFINE_int64(numdistinct, 1000, "read/write on fewer keys so that gets are more likely to find the" " key and puts are more likely to update the same key"); +DEFINE_int64(merge_keys, -1, + "Number of distinct keys to use for MergeRandom and " + "ReadRandomMergeRandom. " + "If negative, there will be FLAGS_num keys."); + DEFINE_int64(reads, -1, "Number of read operations to do. " "If negative, do FLAGS_num reads."); @@ -297,6 +304,11 @@ DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed" "default value 90 means 90% operations out of all reads and writes" " operations are reads. In other words, 9 gets for every 1 put."); +DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed" + " as percentage) for the ReadRandomMergeRandom workload. The" + " default value 70 means 70% out of all read and merge operations" + " are merges. In other words, 7 merges for every 3 gets."); + DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/" "deletes (used in RandomWithVerify only). RandomWithVerify " "calculates writepercent as (100 - FLAGS_readwritepercent - " @@ -446,6 +458,9 @@ DEFINE_uint64(bytes_per_sync, rocksdb::Options().bytes_per_sync, DEFINE_bool(filter_deletes, false, " On true, deletes use bloom-filter and drop" " the delete if key not present"); +DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge" + " operations on a key in the memtable"); + static bool ValidatePrefixSize(const char* flagname, int32_t value) { if (value < 0 || value>=2000000000) { fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n", @@ -784,6 +799,7 @@ class Benchmark { long long reads_; long long writes_; long long readwrites_; + long long merge_keys_; int heap_counter_; char keyFormat_[100]; // will contain the format of key. e.g "%016d" void PrintHeader() { @@ -958,6 +974,7 @@ class Benchmark { readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads) ), + merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), heap_counter_(0) { std::vector files; FLAGS_env->GetChildren(FLAGS_db, &files); @@ -985,8 +1002,8 @@ class Benchmark { } unique_ptr GenerateKeyFromInt(long long v, const char* suffix = "") { - unique_ptr keyInStr(new char[kMaxKeySize]); - snprintf(keyInStr.get(), kMaxKeySize, keyFormat_, v, suffix); + unique_ptr keyInStr(new char[kMaxKeySize + 1]); + snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix); return keyInStr; } @@ -1087,6 +1104,14 @@ class Benchmark { method = &Benchmark::ReadWhileWriting; } else if (name == Slice("readrandomwriterandom")) { method = &Benchmark::ReadRandomWriteRandom; + } else if (name == Slice("readrandommergerandom")) { + if (FLAGS_merge_operator.empty()) { + fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n", + name.ToString().c_str()); + method = nullptr; + } else { + method = &Benchmark::ReadRandomMergeRandom; + } } else if (name == Slice("updaterandom")) { method = &Benchmark::UpdateRandom; } else if (name == Slice("appendrandom")) { @@ -1421,6 +1446,7 @@ class Benchmark { FLAGS_merge_operator.c_str()); exit(1); } + options.max_successive_merges = FLAGS_max_successive_merges; // set universal style compaction configurations, if applicable if (FLAGS_universal_size_ratio != 0) { @@ -2375,13 +2401,16 @@ class Benchmark { // // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8 // to simulate random additions over 64-bit integers using merge. + // + // The number of merges on the same key can be controlled by adjusting + // FLAGS_merge_keys. void MergeRandom(ThreadState* thread) { RandomGenerator gen; // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; + const long long k = thread->rand.Next() % merge_keys_; unique_ptr key = GenerateKeyFromInt(k); Status s = db_->Merge(write_options_, key.get(), @@ -2400,6 +2429,68 @@ class Benchmark { thread->stats.AddMessage(msg); } + // Read and merge random keys. The amount of reads and merges are controlled + // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct + // keys (and thus also the number of reads and merges on the same key) can be + // adjusted with FLAGS_merge_keys. + // + // As with MergeRandom, the merge operator to use should be defined by + // FLAGS_merge_operator. + void ReadRandomMergeRandom(ThreadState* thread) { + ReadOptions options(FLAGS_verify_checksum, true); + RandomGenerator gen; + std::string value; + long long num_hits = 0; + long long num_gets = 0; + long long num_merges = 0; + size_t max_length = 0; + + // the number of iterations is the larger of read_ or write_ + Duration duration(FLAGS_duration, readwrites_); + + while (!duration.Done(1)) { + const long long k = thread->rand.Next() % merge_keys_; + unique_ptr key = GenerateKeyFromInt(k); + + bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent; + + if (do_merge) { + Status s = db_->Merge(write_options_, key.get(), + gen.Generate(value_size_)); + if (!s.ok()) { + fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); + exit(1); + } + + num_merges++; + + } else { + Status s = db_->Get(options, key.get(), &value); + if (value.length() > max_length) + max_length = value.length(); + + if (!s.ok() && !s.IsNotFound()) { + fprintf(stderr, "get error: %s\n", s.ToString().c_str()); + // we continue after error rather than exiting so that we can + // find more errors if any + } else if (!s.IsNotFound()) { + num_hits++; + } + + num_gets++; + + } + + thread->stats.FinishedSingleOp(db_); + } + char msg[100]; + snprintf(msg, sizeof(msg), + "(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)", + num_gets, num_merges, readwrites_, num_hits, max_length); + thread->stats.AddMessage(msg); + } + + void Compact(ThreadState* thread) { db_->CompactRange(nullptr, nullptr); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 7cfd00b58..d07868d21 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -228,6 +228,28 @@ CompressionType GetCompressionType(const Options& options, int level, } } +CompressionType GetCompressionFlush(const Options& options) { + // Compressing memtable flushes might not help unless the sequential load + // optimization is used for leveled compaction. Otherwise the CPU and + // latency overhead is not offset by saving much space. + + bool can_compress; + + if (options.compaction_style == kCompactionStyleUniversal) { + can_compress = + (options.compaction_options_universal.compression_size_percent < 0); + } else { + // For leveled compress when min_level_to_compress == 0. + can_compress = (GetCompressionType(options, 0, true) != kNoCompression); + } + + if (can_compress) { + return options.compression; + } else { + return kNoCompression; + } +} + DBImpl::DBImpl(const Options& options, const std::string& dbname) : env_(options.env), dbname_(dbname), @@ -1086,7 +1108,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { s = BuildTable(dbname_, env_, options_, storage_options_, table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, - earliest_seqno_in_memtable, true); + earliest_seqno_in_memtable, + GetCompressionFlush(options_)); LogFlush(options_.info_log); mutex_.Lock(); } @@ -1147,15 +1170,11 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, Log(options_.info_log, "Level-0 flush table #%lu: started", (unsigned long)meta.number); - // We skip compression if universal compression is used and the size - // threshold is set for compression. - bool enable_compression = (options_.compaction_style - != kCompactionStyleUniversal || - options_.compaction_options_universal.compression_size_percent < 0); + s = BuildTable(dbname_, env_, options_, storage_options_, table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, - earliest_seqno_in_memtable, enable_compression); + earliest_seqno_in_memtable, GetCompressionFlush(options_)); LogFlush(options_.info_log); delete iter; Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s", @@ -2092,11 +2111,11 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, if (s.ok() && !options_.disableDataSync) { if (options_.use_fsync) { StopWatch sw(env_, options_.statistics.get(), - COMPACTION_OUTFILE_SYNC_MICROS); + COMPACTION_OUTFILE_SYNC_MICROS, false); s = compact->outfile->Fsync(); } else { StopWatch sw(env_, options_.statistics.get(), - COMPACTION_OUTFILE_SYNC_MICROS); + COMPACTION_OUTFILE_SYNC_MICROS, false); s = compact->outfile->Sync(); } } @@ -2725,7 +2744,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, bool* value_found) { Status s; - StopWatch sw(env_, options_.statistics.get(), DB_GET); + StopWatch sw(env_, options_.statistics.get(), DB_GET, false); SequenceNumber snapshot; if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; @@ -2795,7 +2814,7 @@ std::vector DBImpl::MultiGet( const std::vector& column_family, const std::vector& keys, std::vector* values) { - StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET); + StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false); SequenceNumber snapshot; std::vector to_delete; @@ -3001,7 +3020,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { w.disableWAL = options.disableWAL; w.done = false; - StopWatch sw(env_, options_.statistics.get(), DB_WRITE); + StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false); mutex_.Lock(); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { diff --git a/db/db_impl.h b/db/db_impl.h index 8b2ff38a2..9baea728f 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -628,4 +628,7 @@ extern Options SanitizeOptions(const std::string& db, CompressionType GetCompressionType(const Options& options, int level, const bool enable_compression); +// Determine compression type for L0 file written by memtable flush. +CompressionType GetCompressionFlush(const Options& options); + } // namespace rocksdb diff --git a/db/memtable.cc b/db/memtable.cc index 2dba364b0..796ba1b3a 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -329,4 +329,37 @@ bool MemTable::Update(SequenceNumber seq, ValueType type, // Key doesn't exist return false; } + +size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { + Slice memkey = key.memtable_key(); + + // A total ordered iterator is costly for some memtablerep (prefix aware + // reps). By passing in the user key, we allow efficient iterator creation. + // The iterator only needs to be ordered within the same user key. + std::shared_ptr iter( + table_->GetIterator(key.user_key())); + iter->Seek(memkey.data()); + + size_t num_successive_merges = 0; + + for (; iter->Valid(); iter->Next()) { + const char* entry = iter->key(); + uint32_t key_length; + const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (!comparator_.comparator.user_comparator()->Compare( + Slice(iter_key_ptr, key_length - 8), key.user_key()) == 0) { + break; + } + + const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8); + if (static_cast(tag & 0xff) != kTypeMerge) { + break; + } + + ++num_successive_merges; + } + + return num_successive_merges; +} + } // namespace rocksdb diff --git a/db/memtable.h b/db/memtable.h index 79d5ba2d0..12ccf3d37 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -107,6 +107,11 @@ class MemTable { const Slice& key, const Slice& value); + // Returns the number of successive merge entries starting from the newest + // entry for the key up to the last non-merge entry or last entry for the + // key in the memtable. + size_t CountSuccessiveMergeEntries(const LookupKey& key); + // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } diff --git a/db/merge_test.cc b/db/merge_test.cc index 0c14aff2c..887d8ad42 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -14,6 +14,7 @@ #include "rocksdb/merge_operator.h" #include "db/dbformat.h" #include "db/db_impl.h" +#include "db/write_batch_internal.h" #include "utilities/merge_operators.h" #include "util/testharness.h" #include "utilities/utility_db.h" @@ -21,13 +22,52 @@ using namespace std; using namespace rocksdb; +namespace { + int numMergeOperatorCalls; -std::shared_ptr OpenDb(const string& dbname, const bool ttl = false) { + void resetNumMergeOperatorCalls() { + numMergeOperatorCalls = 0; + } +} + +class CountMergeOperator : public AssociativeMergeOperator { + public: + CountMergeOperator() { + mergeOperator_ = MergeOperators::CreateUInt64AddOperator(); + } + + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override { + ++numMergeOperatorCalls; + return mergeOperator_->PartialMerge( + key, + *existing_value, + value, + new_value, + logger); + } + + virtual const char* Name() const override { + return "UInt64AddOperator"; + } + + private: + std::shared_ptr mergeOperator_; +}; + +std::shared_ptr OpenDb( + const string& dbname, + const bool ttl = false, + const unsigned max_successive_merges = 0) { DB* db; StackableDB* sdb; Options options; options.create_if_missing = true; - options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + options.merge_operator = std::make_shared(); + options.max_successive_merges = max_successive_merges; Status s; DestroyDB(dbname, Options()); if (ttl) { @@ -243,6 +283,67 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { } } +void testSuccessiveMerge( + Counters& counters, int max_num_merges, int num_merges) { + + counters.assert_remove("z"); + uint64_t sum = 0; + + for (int i = 1; i <= num_merges; ++i) { + resetNumMergeOperatorCalls(); + counters.assert_add("z", i); + sum += i; + + if (i % (max_num_merges + 1) == 0) { + assert(numMergeOperatorCalls == max_num_merges + 1); + } else { + assert(numMergeOperatorCalls == 0); + } + + resetNumMergeOperatorCalls(); + assert(counters.assert_get("z") == sum); + assert(numMergeOperatorCalls == i % (max_num_merges + 1)); + } +} + +void testSingleBatchSuccessiveMerge( + DB* db, + int max_num_merges, + int num_merges) { + assert(num_merges > max_num_merges); + + Slice key("BatchSuccessiveMerge"); + uint64_t merge_value = 1; + Slice merge_value_slice((char *)&merge_value, sizeof(merge_value)); + + // Create the batch + WriteBatch batch; + for (int i = 0; i < num_merges; ++i) { + batch.Merge(key, merge_value_slice); + } + + // Apply to memtable and count the number of merges + resetNumMergeOperatorCalls(); + { + Status s = db->Write(WriteOptions(), &batch); + assert(s.ok()); + } + assert(numMergeOperatorCalls == + num_merges - (num_merges % (max_num_merges + 1))); + + // Get the value + resetNumMergeOperatorCalls(); + string get_value_str; + { + Status s = db->Get(ReadOptions(), key, &get_value_str); + assert(s.ok()); + } + assert(get_value_str.size() == sizeof(uint64_t)); + uint64_t get_value = DecodeFixed64(&get_value_str[0]); + ASSERT_EQ(get_value, num_merges * merge_value); + ASSERT_EQ(numMergeOperatorCalls, (num_merges % (max_num_merges + 1))); +} + void runTest(int argc, const string& dbname, const bool use_ttl = false) { auto db = OpenDb(dbname, use_ttl); @@ -265,6 +366,19 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) { } DestroyDB(dbname, Options()); + db.reset(); + + { + cout << "Test merge in memtable... \n"; + unsigned maxMerge = 5; + auto db = OpenDb(dbname, use_ttl, maxMerge); + MergeBasedCounters counters(db, 0); + testCounters(counters, db.get(), compact); + testSuccessiveMerge(counters, maxMerge, maxMerge * 2); + testSingleBatchSuccessiveMerge(db.get(), 5, 7); + DestroyDB(dbname, Options()); + } + } int main(int argc, char *argv[]) { diff --git a/db/repair.cc b/db/repair.cc index fc9ba282d..6db90c865 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -225,7 +225,8 @@ class Repairer { Iterator* iter = mem->NewIterator(); status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_, iter, &meta, - icmp_.user_comparator(), 0, 0, true); + icmp_.user_comparator(), 0, 0, + kNoCompression); delete iter; delete mem->Unref(); mem = nullptr; diff --git a/db/write_batch.cc b/db/write_batch.cc index 5a5d7e278..eae0903c6 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -24,6 +24,7 @@ #include "rocksdb/write_batch.h" #include "rocksdb/options.h" +#include "rocksdb/merge_operator.h" #include "db/dbformat.h" #include "db/db_impl.h" #include "db/memtable.h" @@ -258,7 +259,62 @@ class MemTableInserter : public WriteBatch::Handler { } virtual void MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { - mem_->Add(sequence_, kTypeMerge, key, value); + bool perform_merge = false; + + if (options_->max_successive_merges > 0 && db_ != nullptr) { + LookupKey lkey(key, sequence_); + + // Count the number of successive merges at the head + // of the key in the memtable + size_t num_merges = mem_->CountSuccessiveMergeEntries(lkey); + + if (num_merges >= options_->max_successive_merges) { + perform_merge = true; + } + } + + if (perform_merge) { + // 1) Get the existing value + std::string get_value; + + // Pass in the sequence number so that we also include previous merge + // operations in the same batch. + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions read_options; + read_options.snapshot = &read_from_snapshot; + + db_->Get(read_options, key, &get_value); + Slice get_value_slice = Slice(get_value); + + // 2) Apply this merge + auto merge_operator = options_->merge_operator.get(); + assert(merge_operator); + + std::deque operands; + operands.push_front(value.ToString()); + std::string new_value; + if (!merge_operator->FullMerge(key, + &get_value_slice, + operands, + &new_value, + options_->info_log.get())) { + // Failed to merge! + RecordTick(options_->statistics.get(), NUMBER_MERGE_FAILURES); + + // Store the delta in memtable + perform_merge = false; + } else { + // 3) Add value to memtable + mem_->Add(sequence_, kTypeValue, key, new_value); + } + } + + if (!perform_merge) { + // Add merge operator to memtable + mem_->Add(sequence_, kTypeMerge, key, value); + } + sequence_++; } virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index a3b18084a..bd22e191b 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -311,6 +311,7 @@ extern void rocksdb_cache_destroy(rocksdb_cache_t* cache); extern rocksdb_env_t* rocksdb_create_default_env(); extern void rocksdb_env_set_background_threads(rocksdb_env_t* env, int n); +extern void rocksdb_env_set_high_priority_background_threads(rocksdb_env_t* env, int n); extern void rocksdb_env_destroy(rocksdb_env_t*); /* Universal Compaction options */ diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index bb75a9c01..3d5e74372 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -44,15 +44,15 @@ using std::shared_ptr; enum CompressionType : char { // NOTE: do not change the values of existing entries, as these are // part of the persistent format on disk. - kNoCompression = 0x0, + kNoCompression = 0x0, kSnappyCompression = 0x1, kZlibCompression = 0x2, kBZip2Compression = 0x3 }; enum CompactionStyle : char { - kCompactionStyleLevel = 0x0, // level based compaction style - kCompactionStyleUniversal = 0x1 // Universal compaction style + kCompactionStyleLevel = 0x0, // level based compaction style + kCompactionStyleUniversal = 0x1 // Universal compaction style }; // Compression options for different compression algorithms like Zlib @@ -60,12 +60,9 @@ struct CompressionOptions { int window_bits; int level; int strategy; - CompressionOptions():window_bits(-14), - level(-1), - strategy(0){} - CompressionOptions(int wbits, int lev, int strategy):window_bits(wbits), - level(lev), - strategy(strategy){} + CompressionOptions() : window_bits(-14), level(-1), strategy(0) {} + CompressionOptions(int wbits, int lev, int strategy) + : window_bits(wbits), level(lev), strategy(strategy) {} }; struct Options; @@ -180,7 +177,6 @@ struct ColumnFamilyOptions { // Default: 16 int block_restart_interval; - // Compress blocks using the specified compression algorithm. This // parameter can be changed dynamically. // @@ -211,7 +207,7 @@ struct ColumnFamilyOptions { // java/C api hard to construct. std::vector compression_per_level; - //different options for compression algorithms + // different options for compression algorithms CompressionOptions compression_opts; // If non-nullptr, use the specified filter policy to reduce disk reads. @@ -290,7 +286,6 @@ struct ColumnFamilyOptions { // will be 20MB, total file size for level-2 will be 200MB, // and total file size for level-3 will be 2GB. - // by default 'max_bytes_for_level_base' is 10MB. uint64_t max_bytes_for_level_base; // by default 'max_bytes_for_level_base' is 10. @@ -426,6 +421,17 @@ struct ColumnFamilyOptions { // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; + // Maximum number of successive merge operations on a key in the memtable. + // + // When a merge operation is added to the memtable and the maximum number of + // successive merges is reached, the value of the key will be calculated and + // inserted into the memtable instead of the merge operation. This will + // ensure that there are never more than max_successive_merges merge + // operations in the memtable. + // + // Default: 0 (disabled) + size_t max_successive_merges; + // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options @@ -560,6 +566,14 @@ struct DBOptions { // If <= 0, a proper value is automatically calculated (usually 1/10 of // writer_buffer_size). // + // There are two additonal restriction of the The specified size: + // (1) size should be in the range of [4096, 2 << 30] and + // (2) be the multiple of the CPU word (which helps with the memory + // alignment). + // + // We'll automatically check and adjust the size number to make sure it + // conforms to the restrictions. + // // Default: 0 size_t arena_block_size; @@ -614,7 +628,12 @@ struct DBOptions { // Specify the file access pattern once a compaction is started. // It will be applied to all input files of a compaction. // Default: NORMAL - enum { NONE, NORMAL, SEQUENTIAL, WILLNEED } access_hint_on_compaction_start; + enum { + NONE, + NORMAL, + SEQUENTIAL, + WILLNEED + } access_hint_on_compaction_start; // Use adaptive mutex, which spins in the user space before resorting // to kernel. This could reduce context switch when the mutex is not @@ -666,7 +685,7 @@ struct Options : public DBOptions, public ColumnFamilyOptions { // the block cache. It will not page in data from the OS cache or data that // resides in storage. enum ReadTier { - kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage + kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage kBlockCacheTier = 0x1 // data in memtable or block cache }; @@ -719,13 +738,14 @@ struct ReadOptions { prefix_seek(false), snapshot(nullptr), prefix(nullptr), - read_tier(kReadAllTier) { - } - ReadOptions(bool cksum, bool cache) : - verify_checksums(cksum), fill_cache(cache), - prefix_seek(false), snapshot(nullptr), prefix(nullptr), - read_tier(kReadAllTier) { - } + read_tier(kReadAllTier) {} + ReadOptions(bool cksum, bool cache) + : verify_checksums(cksum), + fill_cache(cache), + prefix_seek(false), + snapshot(nullptr), + prefix(nullptr), + read_tier(kReadAllTier) {} }; // Options that control write operations @@ -752,10 +772,7 @@ struct WriteOptions { // and the write may got lost after a crash. bool disableWAL; - WriteOptions() - : sync(false), - disableWAL(false) { - } + WriteOptions() : sync(false), disableWAL(false) {} }; // Options that control flush operations @@ -764,9 +781,7 @@ struct FlushOptions { // Default: true bool wait; - FlushOptions() - : wait(true) { - } + FlushOptions() : wait(true) {} }; } // namespace rocksdb diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index 335e02857..fbe2ae8a3 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -31,6 +31,14 @@ struct BackupableDBOptions { // Default: nullptr Env* backup_env; + // If share_table_files == true, backup will assume that table files with + // same name have the same contents. This enables incremental backups and + // avoids unnecessary data copies. + // If share_table_files == false, each backup will be on its own and will + // not share any data with other backups. + // default: true + bool share_table_files; + // Backup info and error messages will be written to info_log // if non-nullptr. // Default: nullptr @@ -49,6 +57,7 @@ struct BackupableDBOptions { explicit BackupableDBOptions(const std::string& _backup_dir, Env* _backup_env = nullptr, + bool _share_table_files = true, Logger* _info_log = nullptr, bool _sync = true, bool _destroy_old_data = false) : @@ -93,6 +102,14 @@ class BackupableDB : public StackableDB { Status PurgeOldBackups(uint32_t num_backups_to_keep); // deletes a specific backup Status DeleteBackup(BackupID backup_id); + // Call this from another thread if you want to stop the backup + // that is currently happening. It will return immediatelly, will + // not wait for the backup to stop. + // The backup will stop ASAP and the call to CreateNewBackup will + // return Status::Incomplete(). It will not clean up after itself, but + // the state will remain consistent. The state will be cleaned up + // next time you create BackupableDB or RestoreBackupableDB. + void StopBackup(); private: BackupEngine* backup_engine_; @@ -108,9 +125,10 @@ class RestoreBackupableDB { void GetBackupInfo(std::vector* backup_info); // restore from backup with backup_id - // IMPORTANT -- if you restore from some backup that is not the latest, - // and you start creating new backups from the new DB, all the backups - // that were newer than the backup you restored from will be deleted + // IMPORTANT -- if options_.share_table_files == true and you restore DB + // from some backup that is not the latest, and you start creating new + // backups from the new DB, all the backups that were newer than the + // backup you restored from will be deleted // // Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3. // If you try creating a new backup now, old backups 4 and 5 will be deleted diff --git a/util/arena_impl.cc b/util/arena_impl.cc index d5c2a537e..5125e2364 100644 --- a/util/arena_impl.cc +++ b/util/arena_impl.cc @@ -8,71 +8,86 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "util/arena_impl.h" +#include namespace rocksdb { -ArenaImpl::ArenaImpl(size_t block_size) { - if (block_size < kMinBlockSize) { - block_size_ = kMinBlockSize; - } else if (block_size > kMaxBlockSize) { - block_size_ = kMaxBlockSize; - } else { - block_size_ = block_size; +const size_t ArenaImpl::kMinBlockSize = 4096; +const size_t ArenaImpl::kMaxBlockSize = 2 << 30; +static const int kAlignUnit = sizeof(void*); + +size_t OptimizeBlockSize(size_t block_size) { + // Make sure block_size is in optimal range + block_size = std::max(ArenaImpl::kMinBlockSize, block_size); + block_size = std::min(ArenaImpl::kMaxBlockSize, block_size); + + // make sure block_size is the multiple of kAlignUnit + if (block_size % kAlignUnit != 0) { + block_size = (1 + block_size / kAlignUnit) * kAlignUnit; } - blocks_memory_ = 0; - alloc_ptr_ = nullptr; // First allocation will allocate a block - alloc_bytes_remaining_ = 0; + return block_size; +} + +ArenaImpl::ArenaImpl(size_t block_size) + : kBlockSize(OptimizeBlockSize(block_size)) { + assert(kBlockSize >= kMinBlockSize && kBlockSize <= kMaxBlockSize && + kBlockSize % kAlignUnit == 0); } ArenaImpl::~ArenaImpl() { - for (size_t i = 0; i < blocks_.size(); i++) { - delete[] blocks_[i]; + for (const auto& block : blocks_) { + delete[] block; } } -char* ArenaImpl::AllocateFallback(size_t bytes) { - if (bytes > block_size_ / 4) { +char* ArenaImpl::AllocateFallback(size_t bytes, bool aligned) { + if (bytes > kBlockSize / 4) { // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. - char* result = AllocateNewBlock(bytes); - return result; + return AllocateNewBlock(bytes); } // We waste the remaining space in the current block. - alloc_ptr_ = AllocateNewBlock(block_size_); - alloc_bytes_remaining_ = block_size_; + auto block_head = AllocateNewBlock(kBlockSize); + alloc_bytes_remaining_ = kBlockSize - bytes; - char* result = alloc_ptr_; - alloc_ptr_ += bytes; - alloc_bytes_remaining_ -= bytes; - return result; + if (aligned) { + aligned_alloc_ptr_ = block_head + bytes; + unaligned_alloc_ptr_ = block_head + kBlockSize; + return block_head; + } else { + aligned_alloc_ptr_ = block_head; + unaligned_alloc_ptr_ = block_head + kBlockSize - bytes; + return unaligned_alloc_ptr_; + } } char* ArenaImpl::AllocateAligned(size_t bytes) { - const int align = sizeof(void*); // We'll align to pointer size - assert((align & (align-1)) == 0); // Pointer size should be a power of 2 - size_t current_mod = reinterpret_cast(alloc_ptr_) & (align-1); - size_t slop = (current_mod == 0 ? 0 : align - current_mod); + assert((kAlignUnit & (kAlignUnit - 1)) == + 0); // Pointer size should be a power of 2 + size_t current_mod = + reinterpret_cast(aligned_alloc_ptr_) & (kAlignUnit - 1); + size_t slop = (current_mod == 0 ? 0 : kAlignUnit - current_mod); size_t needed = bytes + slop; char* result; if (needed <= alloc_bytes_remaining_) { - result = alloc_ptr_ + slop; - alloc_ptr_ += needed; + result = aligned_alloc_ptr_ + slop; + aligned_alloc_ptr_ += needed; alloc_bytes_remaining_ -= needed; } else { // AllocateFallback always returned aligned memory - result = AllocateFallback(bytes); + result = AllocateFallback(bytes, true /* aligned */); } - assert((reinterpret_cast(result) & (align-1)) == 0); + assert((reinterpret_cast(result) & (kAlignUnit - 1)) == 0); return result; } char* ArenaImpl::AllocateNewBlock(size_t block_bytes) { - char* result = new char[block_bytes]; + char* block = new char[block_bytes]; blocks_memory_ += block_bytes; - blocks_.push_back(result); - return result; + blocks_.push_back(block); + return block; } } // namespace rocksdb diff --git a/util/arena_impl.h b/util/arena_impl.h index b5a684247..538385ccc 100644 --- a/util/arena_impl.h +++ b/util/arena_impl.h @@ -22,49 +22,54 @@ namespace rocksdb { class ArenaImpl : public Arena { public: + // No copying allowed + ArenaImpl(const ArenaImpl&) = delete; + void operator=(const ArenaImpl&) = delete; + + static const size_t kMinBlockSize; + static const size_t kMaxBlockSize; + explicit ArenaImpl(size_t block_size = kMinBlockSize); virtual ~ArenaImpl(); - virtual char* Allocate(size_t bytes); + virtual char* Allocate(size_t bytes) override; - virtual char* AllocateAligned(size_t bytes); + virtual char* AllocateAligned(size_t bytes) override; // Returns an estimate of the total memory usage of data allocated - // by the arena (including space allocated but not yet used for user + // by the arena (exclude the space allocated but not yet used for future // allocations). - // - // TODO: Do we need to exclude space allocated but not used? virtual const size_t ApproximateMemoryUsage() { - return blocks_memory_ + blocks_.capacity() * sizeof(char*); + return blocks_memory_ + blocks_.capacity() * sizeof(char*) - + alloc_bytes_remaining_; } - virtual const size_t MemoryAllocatedBytes() { + virtual const size_t MemoryAllocatedBytes() override { return blocks_memory_; } private: - char* AllocateFallback(size_t bytes); - char* AllocateNewBlock(size_t block_bytes); - - static const size_t kMinBlockSize = 4096; - static const size_t kMaxBlockSize = 2 << 30; - // Number of bytes allocated in one block - size_t block_size_; - - // Allocation state - char* alloc_ptr_; - size_t alloc_bytes_remaining_; - + const size_t kBlockSize; // Array of new[] allocated memory blocks - std::vector blocks_; + typedef std::vector Blocks; + Blocks blocks_; + + // Stats for current active block. + // For each block, we allocate aligned memory chucks from one end and + // allocate unaligned memory chucks from the other end. Otherwise the + // memory waste for alignment will be higher if we allocate both types of + // memory from one direction. + char* unaligned_alloc_ptr_ = nullptr; + char* aligned_alloc_ptr_ = nullptr; + // How many bytes left in currently active block? + size_t alloc_bytes_remaining_ = 0; + + char* AllocateFallback(size_t bytes, bool aligned); + char* AllocateNewBlock(size_t block_bytes); // Bytes of memory in blocks allocated so far - size_t blocks_memory_; - - // No copying allowed - ArenaImpl(const ArenaImpl&); - void operator=(const ArenaImpl&); + size_t blocks_memory_ = 0; }; inline char* ArenaImpl::Allocate(size_t bytes) { @@ -73,12 +78,16 @@ inline char* ArenaImpl::Allocate(size_t bytes) { // them for our internal use). assert(bytes > 0); if (bytes <= alloc_bytes_remaining_) { - char* result = alloc_ptr_; - alloc_ptr_ += bytes; + unaligned_alloc_ptr_ -= bytes; alloc_bytes_remaining_ -= bytes; - return result; + return unaligned_alloc_ptr_; } - return AllocateFallback(bytes); + return AllocateFallback(bytes, false /* unaligned */); } +// check and adjust the block_size so that the return value is +// 1. in the range of [kMinBlockSize, kMaxBlockSize]. +// 2. the multiple of align unit. +extern size_t OptimizeBlockSize(size_t block_size); + } // namespace rocksdb diff --git a/util/arena_test.cc b/util/arena_test.cc index 12aa7f7fe..ca6dfc99d 100644 --- a/util/arena_test.cc +++ b/util/arena_test.cc @@ -57,8 +57,34 @@ TEST(ArenaImplTest, MemoryAllocatedBytes) { ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated); } +// Make sure we didn't count the allocate but not used memory space in +// Arena::ApproximateMemoryUsage() +TEST(ArenaImplTest, ApproximateMemoryUsageTest) { + const size_t kBlockSize = 4096; + const size_t kEntrySize = kBlockSize / 8; + const size_t kZero = 0; + ArenaImpl arena(kBlockSize); + ASSERT_EQ(kZero, arena.ApproximateMemoryUsage()); + + auto num_blocks = kBlockSize / kEntrySize; + + // first allocation + arena.AllocateAligned(kEntrySize); + auto mem_usage = arena.MemoryAllocatedBytes(); + ASSERT_EQ(mem_usage, kBlockSize); + auto usage = arena.ApproximateMemoryUsage(); + ASSERT_LT(usage, mem_usage); + for (size_t i = 1; i < num_blocks; ++i) { + arena.AllocateAligned(kEntrySize); + ASSERT_EQ(mem_usage, arena.MemoryAllocatedBytes()); + ASSERT_EQ(arena.ApproximateMemoryUsage(), usage + kEntrySize); + usage = arena.ApproximateMemoryUsage(); + } + ASSERT_GT(usage, mem_usage); +} + TEST(ArenaImplTest, Simple) { - std::vector > allocated; + std::vector> allocated; ArenaImpl arena_impl; const int N = 100000; size_t bytes = 0; @@ -68,8 +94,9 @@ TEST(ArenaImplTest, Simple) { if (i % (N / 10) == 0) { s = i; } else { - s = rnd.OneIn(4000) ? rnd.Uniform(6000) : - (rnd.OneIn(10) ? rnd.Uniform(100) : rnd.Uniform(20)); + s = rnd.OneIn(4000) + ? rnd.Uniform(6000) + : (rnd.OneIn(10) ? rnd.Uniform(100) : rnd.Uniform(20)); } if (s == 0) { // Our arena disallows size 0 allocations. @@ -89,7 +116,7 @@ TEST(ArenaImplTest, Simple) { bytes += s; allocated.push_back(std::make_pair(s, r)); ASSERT_GE(arena_impl.ApproximateMemoryUsage(), bytes); - if (i > N/10) { + if (i > N / 10) { ASSERT_LE(arena_impl.ApproximateMemoryUsage(), bytes * 1.10); } } diff --git a/util/options.cc b/util/options.cc index 543e08d5e..bbc1291b3 100644 --- a/util/options.cc +++ b/util/options.cc @@ -128,7 +128,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) table_factory(options.table_factory), table_properties_collectors(options.table_properties_collectors), inplace_update_support(options.inplace_update_support), - inplace_update_num_locks(options.inplace_update_num_locks) { + inplace_update_num_locks(options.inplace_update_num_locks), + max_successive_merges(0) { assert(memtable_factory.get() != nullptr); } @@ -389,6 +390,8 @@ Options::Dump(Logger* log) const inplace_update_support); Log(log, " Options.inplace_update_num_locks: %zd", inplace_update_num_locks); + Log(log, " Options.max_successive_merges: %zd", + max_successive_merges); } // Options::Dump // diff --git a/util/stop_watch.h b/util/stop_watch.h index e36bcb7ec..6325a7440 100644 --- a/util/stop_watch.h +++ b/util/stop_watch.h @@ -15,9 +15,10 @@ class StopWatch { explicit StopWatch( Env * const env, Statistics* statistics = nullptr, - const Histograms histogram_name = DB_GET) : + const Histograms histogram_name = DB_GET, + bool auto_start = true) : env_(env), - start_time_(env->NowMicros()), + start_time_((!auto_start && !statistics) ? 0 : env->NowMicros()), statistics_(statistics), histogram_name_(histogram_name) {} diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 61e009cd3..26bdd254b 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -20,6 +20,7 @@ #include #include #include +#include namespace rocksdb { @@ -31,6 +32,9 @@ class BackupEngine { Status CreateNewBackup(DB* db, bool flush_before_backup = false); Status PurgeOldBackups(uint32_t num_backups_to_keep); Status DeleteBackup(BackupID backup_id); + void StopBackup() { + stop_backup_.store(true, std::memory_order_release); + } void GetBackupInfo(std::vector* backup_info); Status RestoreDBFromBackup(BackupID backup_id, const std::string &db_dir, @@ -106,13 +110,16 @@ class BackupEngine { return "private"; } inline std::string GetPrivateFileRel(BackupID backup_id, - const std::string &file = "") const { + bool tmp = false, + const std::string& file = "") const { assert(file.size() == 0 || file[0] != '/'); - return GetPrivateDirRel() + "/" + std::to_string(backup_id) + "/" + file; + return GetPrivateDirRel() + "/" + std::to_string(backup_id) + + (tmp ? ".tmp" : "") + "/" + file; } - inline std::string GetSharedFileRel(const std::string& file = "") const { + inline std::string GetSharedFileRel(const std::string& file = "", + bool tmp = false) const { assert(file.size() == 0 || file[0] != '/'); - return "shared/" + file; + return "shared/" + file + (tmp ? ".tmp" : ""); } inline std::string GetLatestBackupFile(bool tmp = false) const { return GetAbsolutePath(std::string("LATEST_BACKUP") + (tmp ? ".tmp" : "")); @@ -151,6 +158,7 @@ class BackupEngine { std::map backups_; std::unordered_map backuped_file_refs_; std::vector obsolete_backups_; + std::atomic stop_backup_; // options data BackupableDBOptions options_; @@ -161,13 +169,17 @@ class BackupEngine { }; BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) - : options_(options), - db_env_(db_env), - backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_) { + : stop_backup_(false), + options_(options), + db_env_(db_env), + backup_env_(options.backup_env != nullptr ? options.backup_env + : db_env_) { // create all the dirs we need backup_env_->CreateDirIfMissing(GetAbsolutePath()); - backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel())); + if (options_.share_table_files) { + backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel())); + } backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel())); backup_env_->CreateDirIfMissing(GetBackupMetaDir()); @@ -298,8 +310,9 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { Log(options_.info_log, "Started the backup process -- creating backup %u", new_backup_id); - // create private dir - s = backup_env_->CreateDir(GetAbsolutePath(GetPrivateFileRel(new_backup_id))); + // create temporary private dir + s = backup_env_->CreateDir( + GetAbsolutePath(GetPrivateFileRel(new_backup_id, true))); // copy live_files for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { @@ -320,7 +333,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { // * if it's kDescriptorFile, limit the size to manifest_file_size s = BackupFile(new_backup_id, &new_backup, - type == kTableFile, /* shared */ + options_.share_table_files && type == kTableFile, db->GetName(), /* src_dir */ live_files[i], /* src_fname */ (type == kDescriptorFile) ? manifest_file_size : 0); @@ -342,6 +355,13 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { // we copied all the files, enable file deletions db->EnableFileDeletions(); + if (s.ok()) { + // move tmp private backup to real backup folder + s = backup_env_->RenameFile( + GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)), // tmp + GetAbsolutePath(GetPrivateFileRel(new_backup_id, false))); + } + if (s.ok()) { // persist the backup metadata on the disk s = new_backup.StoreToFile(options_.sync); @@ -561,6 +581,9 @@ Status BackupEngine::CopyFile(const std::string& src, Slice data; do { + if (stop_backup_.load(std::memory_order_acquire)) { + return Status::Incomplete("Backup stopped"); + } size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? copy_file_buffer_size_ : size_limit; s = src_file->Read(buffer_to_read, &data, buf.get()); @@ -590,12 +613,16 @@ Status BackupEngine::BackupFile(BackupID backup_id, assert(src_fname.size() > 0 && src_fname[0] == '/'); std::string dst_relative = src_fname.substr(1); + std::string dst_relative_tmp; if (shared) { - dst_relative = GetSharedFileRel(dst_relative); + dst_relative_tmp = GetSharedFileRel(dst_relative, true); + dst_relative = GetSharedFileRel(dst_relative, false); } else { - dst_relative = GetPrivateFileRel(backup_id, dst_relative); + dst_relative_tmp = GetPrivateFileRel(backup_id, true, dst_relative); + dst_relative = GetPrivateFileRel(backup_id, false, dst_relative); } std::string dst_path = GetAbsolutePath(dst_relative); + std::string dst_path_tmp = GetAbsolutePath(dst_relative_tmp); Status s; uint64_t size; @@ -607,12 +634,15 @@ Status BackupEngine::BackupFile(BackupID backup_id, } else { Log(options_.info_log, "Copying %s", src_fname.c_str()); s = CopyFile(src_dir + src_fname, - dst_path, + dst_path_tmp, db_env_, backup_env_, options_.sync, &size, size_limit); + if (s.ok() && shared) { + s = backup_env_->RenameFile(dst_path_tmp, dst_path); + } } if (s.ok()) { backup->AddFile(dst_relative, size); @@ -671,14 +701,16 @@ void BackupEngine::GarbageCollection(bool full_scan) { &private_children); for (auto& child : private_children) { BackupID backup_id = 0; + bool tmp_dir = child.find(".tmp") != std::string::npos; sscanf(child.c_str(), "%u", &backup_id); - if (backup_id == 0 || backups_.find(backup_id) != backups_.end()) { + if (!tmp_dir && // if it's tmp_dir, delete it + (backup_id == 0 || backups_.find(backup_id) != backups_.end())) { // it's either not a number or it's still alive. continue continue; } // here we have to delete the dir and all its children std::string full_private_path = - GetAbsolutePath(GetPrivateFileRel(backup_id)); + GetAbsolutePath(GetPrivateFileRel(backup_id, tmp_dir)); std::vector subchildren; backup_env_->GetChildren(full_private_path, &subchildren); for (auto& subchild : subchildren) { @@ -813,7 +845,9 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) { BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options) : StackableDB(db), backup_engine_(new BackupEngine(db->GetEnv(), options)) { - backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber()); + if (options.share_table_files) { + backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber()); + } } BackupableDB::~BackupableDB() { @@ -836,6 +870,10 @@ Status BackupableDB::DeleteBackup(BackupID backup_id) { return backup_engine_->DeleteBackup(backup_id); } +void BackupableDB::StopBackup() { + backup_engine_->StopBackup(); +} + // --- RestoreBackupableDB methods ------ RestoreBackupableDB::RestoreBackupableDB(Env* db_env, diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 18922ad0a..1b9175ae2 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -307,7 +307,7 @@ class BackupableDBTest { CreateLoggerFromOptions(dbname_, backupdir_, env_, Options(), &logger_); backupable_options_.reset(new BackupableDBOptions( - backupdir_, test_backup_env_.get(), logger_.get(), true)); + backupdir_, test_backup_env_.get(), true, logger_.get(), true)); // delete old files in db DestroyDB(dbname_, Options()); @@ -319,7 +319,8 @@ class BackupableDBTest { return db; } - void OpenBackupableDB(bool destroy_old_data = false, bool dummy = false) { + void OpenBackupableDB(bool destroy_old_data = false, bool dummy = false, + bool share_table_files = true) { // reset all the defaults test_backup_env_->SetLimitWrittenFiles(1000000); test_db_env_->SetLimitWrittenFiles(1000000); @@ -333,6 +334,7 @@ class BackupableDBTest { ASSERT_OK(DB::Open(options_, dbname_, &db)); } backupable_options_->destroy_old_data = destroy_old_data; + backupable_options_->share_table_files = share_table_files; db_.reset(new BackupableDB(db, *backupable_options_)); } @@ -661,6 +663,38 @@ TEST(BackupableDBTest, DeleteNewerBackups) { CloseRestoreDB(); } +TEST(BackupableDBTest, NoShareTableFiles) { + const int keys_iteration = 5000; + OpenBackupableDB(true, false, false); + for (int i = 0; i < 5; ++i) { + FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); + ASSERT_OK(db_->CreateNewBackup(!!(i % 2))); + } + CloseBackupableDB(); + + for (int i = 0; i < 5; ++i) { + AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1), + keys_iteration * 6); + } +} + +TEST(BackupableDBTest, DeleteTmpFiles) { + OpenBackupableDB(); + CloseBackupableDB(); + std::string shared_tmp = backupdir_ + "/shared/00006.sst.tmp"; + std::string private_tmp_dir = backupdir_ + "/private/10.tmp"; + std::string private_tmp_file = private_tmp_dir + "/00003.sst"; + file_manager_->WriteToFile(shared_tmp, "tmp"); + file_manager_->CreateDir(private_tmp_dir); + file_manager_->WriteToFile(private_tmp_file, "tmp"); + ASSERT_EQ(true, file_manager_->FileExists(private_tmp_dir)); + OpenBackupableDB(); + CloseBackupableDB(); + ASSERT_EQ(false, file_manager_->FileExists(shared_tmp)); + ASSERT_EQ(false, file_manager_->FileExists(private_tmp_file)); + ASSERT_EQ(false, file_manager_->FileExists(private_tmp_dir)); +} + } // anon namespace } // namespace rocksdb