diff --git a/HISTORY.md b/HISTORY.md index 0a69d12da..2a49d72ed 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,7 @@ * Removed arena.h from public header files. * By default, checksums are verified on every read from database +* Added is_manual_compaction to CompactionFilter::Context ## 2.7.0 (01/28/2014) diff --git a/Makefile b/Makefile index 564f1c117..3b59a4ee5 100644 --- a/Makefile +++ b/Makefile @@ -145,7 +145,7 @@ endif # PLATFORM_SHARED_EXT all: $(LIBRARY) $(PROGRAMS) -dbg: $(PROGRAMS) +dbg: $(LIBRARY) $(PROGRAMS) # Will also generate shared libraries. release: diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index 8e83ae497..5a15aca33 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -19,7 +19,8 @@ # # -DLEVELDB_PLATFORM_POSIX if cstdatomic is present # -DLEVELDB_PLATFORM_NOATOMIC if it is not -# -DSNAPPY if the Snappy library is present +# -DSNAPPY if the Snappy library is present +# -DLZ4 if the LZ4 library is present # # Using gflags in rocksdb: # Our project depends on gflags, which requires users to take some extra steps @@ -38,7 +39,7 @@ if test -z "$OUTPUT"; then fi # we depend on C++11 -PLATFORM_CXXFLAGS="-std=gnu++11" +PLATFORM_CXXFLAGS="-std=c++11" # we currently depend on POSIX platform COMMON_FLAGS="-DROCKSDB_PLATFORM_POSIX" @@ -244,6 +245,17 @@ EOF PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lbz2" fi + # Test whether lz4 library is installed + $CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null < + #include + int main() {} +EOF + if [ "$?" = 0 ]; then + COMMON_FLAGS="$COMMON_FLAGS -DLZ4" + PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -llz4" + fi + # Test whether tcmalloc is available $CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null < to_delete; // should be called outside the mutex - SuperVersion(); + SuperVersion() = default; ~SuperVersion(); SuperVersion* Ref(); // Returns true if this was the last reference and caller should diff --git a/db/compaction.cc b/db/compaction.cc index 44e0dea84..79470268d 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -41,6 +41,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, score_(0), bottommost_level_(false), is_full_compaction_(false), + is_manual_compaction_(false), level_ptrs_(std::vector(number_levels_)) { cfd_->Ref(); diff --git a/db/compaction.h b/db/compaction.h index f92dc1db0..8fd95f909 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -85,6 +85,9 @@ class Compaction { // Does this compaction include all sst files? bool IsFullCompaction() { return is_full_compaction_; } + // Was this compaction triggered manually by the client? + bool IsManualCompaction() { return is_manual_compaction_; } + private: friend class CompactionPicker; friend class UniversalCompactionPicker; @@ -125,6 +128,9 @@ class Compaction { // Does this compaction include all sst files? bool is_full_compaction_; + // Is this compaction requested by the client? + bool is_manual_compaction_; + // level_ptrs_ holds indices into input_version_->levels_: our state // is that we are positioned at one of the file ranges for each // higher level than the ones involved in this compaction (i.e. for diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 330a8b1c4..4a384ffd0 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -358,6 +358,9 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, // Is this compaction creating a file at the bottommost level c->SetupBottomMostLevel(true); + + c->is_manual_compaction_ = true; + return c; } diff --git a/db/db_bench.cc b/db/db_bench.cc index 19938e0c1..e40732f28 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -60,8 +60,8 @@ DEFINE_string(benchmarks, "randomwithverify," "fill100K," "crc32c," - "snappycomp," - "snappyuncomp," + "compress," + "uncompress," "acquireload," "fillfromstdin,", @@ -338,6 +338,10 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { return rocksdb::kZlibCompression; else if (!strcasecmp(ctype, "bzip2")) return rocksdb::kBZip2Compression; + else if (!strcasecmp(ctype, "lz4")) + return rocksdb::kLZ4Compression; + else if (!strcasecmp(ctype, "lz4hc")) + return rocksdb::kLZ4HCCompression; fprintf(stdout, "Cannot parse compression type '%s'\n", ctype); return rocksdb::kSnappyCompression; //default value @@ -479,7 +483,8 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { } return true; } -DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipList"); +DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and " + "plain table"); enum RepFactory { kSkipList, @@ -501,6 +506,8 @@ enum RepFactory StringToRepFactory(const char* ctype) { } static enum RepFactory FLAGS_rep_factory; DEFINE_string(memtablerep, "skip_list", ""); +DEFINE_bool(use_plain_table, false, "if use plain table " + "instead of block-based table format"); DEFINE_string(merge_operator, "", "The merge operator to use with the database." "If a new merge operator is specified, be sure to use fresh" @@ -841,7 +848,13 @@ class Benchmark { case rocksdb::kBZip2Compression: fprintf(stdout, "Compression: bzip2\n"); break; - } + case rocksdb::kLZ4Compression: + fprintf(stdout, "Compression: lz4\n"); + break; + case rocksdb::kLZ4HCCompression: + fprintf(stdout, "Compression: lz4hc\n"); + break; + } switch (FLAGS_rep_factory) { case kPrefixHash: @@ -896,6 +909,16 @@ class Benchmark { strlen(text), &compressed); name = "BZip2"; break; + case kLZ4Compression: + result = port::LZ4_Compress(Options().compression_opts, text, + strlen(text), &compressed); + name = "LZ4"; + break; + case kLZ4HCCompression: + result = port::LZ4HC_Compress(Options().compression_opts, text, + strlen(text), &compressed); + name = "LZ4HC"; + break; case kNoCompression: assert(false); // cannot happen break; @@ -975,7 +998,8 @@ class Benchmark { filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), - prefix_extractor_(NewFixedPrefixTransform(FLAGS_key_size-1)), + prefix_extractor_(NewFixedPrefixTransform(FLAGS_use_plain_table ? + FLAGS_prefix_size : FLAGS_key_size-1)), db_(nullptr), num_(FLAGS_num), value_size_(FLAGS_value_size), @@ -1146,10 +1170,10 @@ class Benchmark { method = &Benchmark::Crc32c; } else if (name == Slice("acquireload")) { method = &Benchmark::AcquireLoad; - } else if (name == Slice("snappycomp")) { - method = &Benchmark::SnappyCompress; - } else if (name == Slice("snappyuncomp")) { - method = &Benchmark::SnappyUncompress; + } else if (name == Slice("compress")) { + method = &Benchmark::Compress; + } else if (name == Slice("uncompress")) { + method = &Benchmark::Uncompress; } else if (name == Slice("heapprofile")) { HeapProfile(); } else if (name == Slice("stats")) { @@ -1302,23 +1326,47 @@ class Benchmark { if (ptr == nullptr) exit(1); // Disable unused variable warning. } - void SnappyCompress(ThreadState* thread) { + void Compress(ThreadState *thread) { RandomGenerator gen; Slice input = gen.Generate(Options().block_size); int64_t bytes = 0; int64_t produced = 0; bool ok = true; std::string compressed; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Compress(Options().compression_opts, input.data(), + + // Compress 1G + while (ok && bytes < int64_t(1) << 30) { + switch (FLAGS_compression_type_e) { + case rocksdb::kSnappyCompression: + ok = port::Snappy_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kZlibCompression: + ok = port::Zlib_Compress(Options().compression_opts, input.data(), input.size(), &compressed); + break; + case rocksdb::kBZip2Compression: + ok = port::BZip2_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kLZ4Compression: + ok = port::LZ4_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kLZ4HCCompression: + ok = port::LZ4HC_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + default: + ok = false; + } produced += compressed.size(); bytes += input.size(); thread->stats.FinishedSingleOp(nullptr); } if (!ok) { - thread->stats.AddMessage("(snappy failure)"); + thread->stats.AddMessage("(compression failure)"); } else { char buf[100]; snprintf(buf, sizeof(buf), "(output: %.1f%%)", @@ -1328,24 +1376,78 @@ class Benchmark { } } - void SnappyUncompress(ThreadState* thread) { + void Uncompress(ThreadState *thread) { RandomGenerator gen; Slice input = gen.Generate(Options().block_size); std::string compressed; - bool ok = port::Snappy_Compress(Options().compression_opts, input.data(), - input.size(), &compressed); + + bool ok; + switch (FLAGS_compression_type_e) { + case rocksdb::kSnappyCompression: + ok = port::Snappy_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kZlibCompression: + ok = port::Zlib_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kBZip2Compression: + ok = port::BZip2_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kLZ4Compression: + ok = port::LZ4_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kLZ4HCCompression: + ok = port::LZ4HC_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + default: + ok = false; + } + int64_t bytes = 0; - char* uncompressed = new char[input.size()]; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed); + int decompress_size; + while (ok && bytes < 1024 * 1048576) { + char *uncompressed = nullptr; + switch (FLAGS_compression_type_e) { + case rocksdb::kSnappyCompression: + // allocate here to make comparison fair + uncompressed = new char[input.size()]; + ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), + uncompressed); + break; + case rocksdb::kZlibCompression: + uncompressed = port::Zlib_Uncompress( + compressed.data(), compressed.size(), &decompress_size); + ok = uncompressed != nullptr; + break; + case rocksdb::kBZip2Compression: + uncompressed = port::BZip2_Uncompress( + compressed.data(), compressed.size(), &decompress_size); + ok = uncompressed != nullptr; + break; + case rocksdb::kLZ4Compression: + uncompressed = port::LZ4_Uncompress( + compressed.data(), compressed.size(), &decompress_size); + ok = uncompressed != nullptr; + break; + case rocksdb::kLZ4HCCompression: + uncompressed = port::LZ4_Uncompress( + compressed.data(), compressed.size(), &decompress_size); + ok = uncompressed != nullptr; + break; + default: + ok = false; + } + delete[] uncompressed; bytes += input.size(); thread->stats.FinishedSingleOp(nullptr); } - delete[] uncompressed; if (!ok) { - thread->stats.AddMessage("(snappy failure)"); + thread->stats.AddMessage("(compression failure)"); } else { thread->stats.AddBytes(bytes); } @@ -1368,8 +1470,9 @@ class Benchmark { options.compaction_style = FLAGS_compaction_style_e; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; - options.prefix_extractor = FLAGS_use_prefix_blooms ? prefix_extractor_ - : nullptr; + options.prefix_extractor = + (FLAGS_use_plain_table || FLAGS_use_prefix_blooms) ? prefix_extractor_ + : nullptr; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; options.env = FLAGS_env; @@ -1383,8 +1486,8 @@ class Benchmark { FLAGS_max_bytes_for_level_multiplier; options.filter_deletes = FLAGS_filter_deletes; if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) { - fprintf(stderr, - "prefix_size should be non-zero iff memtablerep == prefix_hash\n"); + fprintf(stderr, "prefix_size should be non-zero iff memtablerep " + "== prefix_hash\n"); exit(1); } switch (FLAGS_rep_factory) { @@ -1401,6 +1504,22 @@ class Benchmark { ); break; } + if (FLAGS_use_plain_table) { + if (FLAGS_rep_factory != kPrefixHash) { + fprintf(stderr, "Waring: plain table is used with skipList\n"); + } + if (!FLAGS_mmap_read && !FLAGS_mmap_write) { + fprintf(stderr, "plain table format requires mmap to operate\n"); + exit(1); + } + + int bloom_bits_per_key = FLAGS_bloom_bits; + if (bloom_bits_per_key < 0) { + bloom_bits_per_key = 0; + } + options.table_factory = std::shared_ptr( + NewPlainTableFactory(FLAGS_key_size, bloom_bits_per_key, 0.75)); + } if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) { if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() != (unsigned int)FLAGS_num_levels) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 1ce682424..20c4c1093 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -116,6 +116,7 @@ struct DBImpl::CompactionState { CompactionFilter::Context GetFilterContext() { CompactionFilter::Context context; context.is_full_compaction = compaction->IsFullCompaction(); + context.is_manual_compaction = compaction->IsManualCompaction(); return context; } }; @@ -1182,11 +1183,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, DeletionState& deletion_state) { mutex_.AssertHeld(); assert(cfd->imm()->size() != 0); - - if (!cfd->imm()->IsFlushPending()) { - Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); - return Status::IOError("FlushMemTableToOutputFile already in progress"); - } + assert(cfd->imm()->IsFlushPending()); // Save the contents of the earliest memtable as a new Table uint64_t file_number; @@ -1194,7 +1191,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, cfd->imm()->PickMemtablesToFlush(&mems); if (mems.empty()) { Log(options_.info_log, "Nothing in memstore to flush"); - return Status::IOError("Nothing in memstore to flush"); + return Status::OK(); } // record the logfile_number_ before we release the mutex @@ -1217,15 +1214,20 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, // This will release and re-acquire the mutex. Status s = WriteLevel0Table(cfd, mems, edit, &file_number); - if (s.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) { - s = Status::IOError("Column family closed during memtable flush"); + if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) { + s = Status::ShutdownInProgress( + "Column family closed during memtable flush"); + } + + if (!s.ok()) { + cfd->imm()->RollbackMemtableFlush(mems, file_number, &pending_outputs_); + return s; } // Replace immutable memtable with the generated Table s = cfd->imm()->InstallMemtableFlushResults( - cfd, mems, versions_.get(), s, &mutex_, options_.info_log.get(), - file_number, pending_outputs_, &deletion_state.memtables_to_free, - db_directory_.get()); + cfd, mems, versions_.get(), &mutex_, options_.info_log.get(), file_number, + pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); if (s.ok()) { InstallSuperVersion(cfd, deletion_state); @@ -1410,7 +1412,8 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS); if (seq > versions_->LastSequence()) { - return Status::IOError("Requested sequence not yet written in the db"); + return Status::NotFound( + "Requested sequence not yet written in the db"); } // Get all sorted Wal Files. // Do binary search and open files and find the seq number. @@ -1474,16 +1477,19 @@ Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number, if (type == kAliveLogFile) { std::string fname = LogFileName(options_.wal_dir, number); Status status = ReadFirstLine(fname, result); - if (!status.ok()) { - // check if the file got moved to archive. - std::string archived_file = - ArchivedLogFileName(options_.wal_dir, number); - Status s = ReadFirstLine(archived_file, result); - if (!s.ok()) { - return Status::IOError("Log File has been deleted: " + archived_file); - } + if (status.ok() || env_->FileExists(fname)) { + // return OK or any error that is not caused non-existing file + return status; } - return Status::OK(); + + // check if the file got moved to archive. + std::string archived_file = + ArchivedLogFileName(options_.wal_dir, number); + Status s = ReadFirstLine(archived_file, result); + if (s.ok() || env_->FileExists(archived_file)) { + return s; + } + return Status::NotFound("Log File has been deleted: " + archived_file); } else if (type == kArchivedLogFile) { std::string fname = ArchivedLogFileName(options_.wal_dir, number); Status status = ReadFirstLine(fname, result); @@ -1498,12 +1504,17 @@ Status DBImpl::ReadFirstLine(const std::string& fname, Env* env; Logger* info_log; const char* fname; - Status* status; // nullptr if options_.paranoid_checks==false + + Status* status; + bool ignore_error; // true if options_.paranoid_checks==false virtual void Corruption(size_t bytes, const Status& s) { Log(info_log, "%s%s: dropping %d bytes; %s", - (this->status == nullptr ? "(ignoring error) " : ""), + (this->ignore_error ? "(ignoring error) " : ""), fname, static_cast(bytes), s.ToString().c_str()); - if (this->status != nullptr && this->status->ok()) *this->status = s; + if (this->status->ok()) { + // only keep the first error + *this->status = s; + } } }; @@ -1519,23 +1530,30 @@ Status DBImpl::ReadFirstLine(const std::string& fname, reporter.env = env_; reporter.info_log = options_.info_log.get(); reporter.fname = fname.c_str(); - reporter.status = (options_.paranoid_checks ? &status : nullptr); + reporter.status = &status; + reporter.ignore_error = !options_.paranoid_checks; log::Reader reader(std::move(file), &reporter, true/*checksum*/, 0/*initial_offset*/); std::string scratch; Slice record; - if (reader.ReadRecord(&record, &scratch) && status.ok()) { + if (reader.ReadRecord(&record, &scratch) && + (status.ok() || !options_.paranoid_checks)) { if (record.size() < 12) { reporter.Corruption( record.size(), Status::Corruption("log record too small")); - return Status::IOError("Corruption noted"); // TODO read record's till the first no corrupt entry? + } else { + WriteBatchInternal::SetContents(batch, record); + return Status::OK(); } - WriteBatchInternal::SetContents(batch, record); - return Status::OK(); } - return Status::IOError("Error reading from file " + fname); + + // ReadRecord returns false on EOF, which is deemed as OK() by Reader + if (status.ok()) { + status = Status::Corruption("eof reached"); + } + return status; } struct CompareLogByPointer { @@ -2206,7 +2224,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { compact->compaction->level(), compact->compaction->num_input_files(1), compact->compaction->level() + 1); - return Status::IOError("Compaction input files inconsistent"); + return Status::Corruption("Compaction input files inconsistent"); } Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", @@ -2573,7 +2591,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) { - status = Status::IOError("Column family closing started during compaction"); + status = Status::ShutdownInProgress( + "Database shutdown started during compaction"); } if (status.ok() && compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input.get()); @@ -3687,6 +3706,21 @@ void DBImpl::GetLiveFilesMetaData(std::vector* metadata) { versions_->GetLiveFilesMetaData(metadata); } +void DBImpl::TEST_GetFilesMetaData( + std::vector>* metadata) { + MutexLock l(&mutex_); + metadata->resize(NumberLevels()); + for (int level = 0; level < NumberLevels(); level++) { + const std::vector& files = + default_cf_handle_->cfd()->current()->files_[level]; + + (*metadata)[level].clear(); + for (const auto& f : files) { + (*metadata)[level].push_back(*f); + } + } +} + Status DBImpl::GetDbIdentity(std::string& identity) { std::string idfilename = IdentityFileName(dbname_); unique_ptr idfile; diff --git a/db/db_impl.h b/db/db_impl.h index 3c971391c..f42f7bf3a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -174,6 +174,8 @@ class DBImpl : public DB { default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL; } + void TEST_GetFilesMetaData(std::vector>* metadata); + // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { diff --git a/db/db_test.cc b/db/db_test.cc index d525314f7..fbef0842e 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -56,7 +56,19 @@ static bool BZip2CompressionSupported(const CompressionOptions& options) { return port::BZip2_Compress(options, in.data(), in.size(), &out); } -static std::string RandomString(Random* rnd, int len) { +static bool LZ4CompressionSupported(const CompressionOptions &options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::LZ4_Compress(options, in.data(), in.size(), &out); +} + +static bool LZ4HCCompressionSupported(const CompressionOptions &options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::LZ4HC_Compress(options, in.data(), in.size(), &out); +} + +static std::string RandomString(Random *rnd, int len) { std::string r; test::RandomString(rnd, len, &r); return r; @@ -1649,6 +1661,42 @@ TEST(DBTest, Recover) { } while (ChangeOptions()); } +TEST(DBTest, RecoverWithTableHandle) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.write_buffer_size = 100; + options.disable_auto_compactions = true; + DestroyAndReopen(&options); + + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("bar", "v2")); + dbfull()->TEST_FlushMemTable(); + ASSERT_OK(Put("foo", "v3")); + ASSERT_OK(Put("bar", "v4")); + dbfull()->TEST_FlushMemTable(); + ASSERT_OK(Put("big", std::string(100, 'a'))); + Reopen(); + + std::vector> files; + dbfull()->TEST_GetFilesMetaData(&files); + int total_files = 0; + for (const auto& level : files) { + total_files += level.size(); + } + ASSERT_EQ(total_files, 3); + for (const auto& level : files) { + for (const auto& file : level) { + if (kInfiniteMaxOpenFiles == option_config_) { + ASSERT_TRUE(file.table_reader_handle != nullptr); + } else { + ASSERT_TRUE(file.table_reader_handle == nullptr); + } + } + } + } while (ChangeOptions()); +} + TEST(DBTest, IgnoreRecoveredLog) { std::string backup_logs = dbname_ + "/backup_logs"; @@ -2624,6 +2672,14 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits, CompressionOptions(wbits, lev, strategy))) { type = kBZip2Compression; fprintf(stderr, "using bzip2\n"); + } else if (LZ4CompressionSupported( + CompressionOptions(wbits, lev, strategy))) { + type = kLZ4Compression; + fprintf(stderr, "using lz4\n"); + } else if (LZ4HCCompressionSupported( + CompressionOptions(wbits, lev, strategy))) { + type = kLZ4HCCompression; + fprintf(stderr, "using lz4hc\n"); } else { fprintf(stderr, "skipping test, compression disabled\n"); return false; @@ -2917,7 +2973,11 @@ class DeleteFilterFactory : public CompactionFilterFactory { public: virtual std::unique_ptr CreateCompactionFilter(const CompactionFilter::Context& context) override { - return std::unique_ptr(new DeleteFilter()); + if (context.is_manual_compaction) { + return std::unique_ptr(new DeleteFilter()); + } else { + return std::unique_ptr(nullptr); + } } virtual const char* Name() const override { diff --git a/db/memtable.cc b/db/memtable.cc index 2f84a289e..5a718e70c 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -208,116 +208,147 @@ void MemTable::Add(SequenceNumber s, ValueType type, } } +// Callback from MemTable::Get() +namespace { + +struct Saver { + Status* status; + const LookupKey* key; + bool* found_final_value; // Is value set correctly? Used by KeyMayExist + bool* merge_in_progress; + std::string* value; + const MergeOperator* merge_operator; + // the merge operations encountered; + MergeContext* merge_context; + MemTable* mem; + Logger* logger; + Statistics* statistics; + bool inplace_update_support; +}; +} // namespace + +static bool SaveValue(void* arg, const char* entry) { + Saver* s = reinterpret_cast(arg); + MergeContext* merge_context = s->merge_context; + const MergeOperator* merge_operator = s->merge_operator; + + assert(s != nullptr && merge_context != nullptr); + + // entry format is: + // klength varint32 + // userkey char[klength-8] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (s->mem->GetInternalKeyComparator().user_comparator()->Compare( + Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(tag & 0xff)) { + case kTypeValue: { + if (s->inplace_update_support) { + s->mem->GetLock(s->key->user_key())->ReadLock(); + } + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + *(s->status) = Status::OK(); + if (*(s->merge_in_progress)) { + assert(merge_operator); + if (!merge_operator->FullMerge(s->key->user_key(), &v, + merge_context->GetOperands(), s->value, + s->logger)) { + RecordTick(s->statistics, NUMBER_MERGE_FAILURES); + *(s->status) = + Status::Corruption("Error: Could not perform merge."); + } + } else { + s->value->assign(v.data(), v.size()); + } + if (s->inplace_update_support) { + s->mem->GetLock(s->key->user_key())->Unlock(); + } + *(s->found_final_value) = true; + return false; + } + case kTypeDeletion: { + if (*(s->merge_in_progress)) { + assert(merge_operator); + *(s->status) = Status::OK(); + if (!merge_operator->FullMerge(s->key->user_key(), nullptr, + merge_context->GetOperands(), s->value, + s->logger)) { + RecordTick(s->statistics, NUMBER_MERGE_FAILURES); + *(s->status) = + Status::Corruption("Error: Could not perform merge."); + } + } else { + *(s->status) = Status::NotFound(); + } + *(s->found_final_value) = true; + return false; + } + case kTypeMerge: { + std::string merge_result; // temporary area for merge results later + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + *(s->merge_in_progress) = true; + merge_context->PushOperand(v); + while (merge_context->GetNumOperands() >= 2) { + // Attempt to associative merge. (Returns true if successful) + if (merge_operator->PartialMerge( + s->key->user_key(), merge_context->GetOperand(0), + merge_context->GetOperand(1), &merge_result, s->logger)) { + merge_context->PushPartialMergeResult(merge_result); + } else { + // Stack them because user can't associative merge + break; + } + } + return true; + } + default: + assert(false); + return true; + } + } + + // s->state could be Corrupt, merge or notfound + return false; +} + bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext& merge_context, const Options& options) { StopWatchNano memtable_get_timer(options.env, false); StartPerfTimer(&memtable_get_timer); - Slice mem_key = key.memtable_key(); Slice user_key = key.user_key(); + bool found_final_value = false; + bool merge_in_progress = s->IsMergeInProgress(); - std::unique_ptr iter; if (prefix_bloom_ && !prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) { // iter is null if prefix bloom says the key does not exist } else { - iter.reset(table_->GetIterator(user_key)); - iter->Seek(key.internal_key(), mem_key.data()); - } - - bool merge_in_progress = s->IsMergeInProgress(); - auto merge_operator = options.merge_operator.get(); - auto logger = options.info_log; - std::string merge_result; - - bool found_final_value = false; - for (; !found_final_value && iter && iter->Valid(); iter->Next()) { - // entry format is: - // klength varint32 - // userkey char[klength-8] - // tag uint64 - // vlength varint32 - // value char[vlength] - // Check that it belongs to same user key. We do not check the - // sequence number since the Seek() call above should have skipped - // all entries with overly large sequence numbers. - const char* entry = iter->key(); - uint32_t key_length = 0; - const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); - if (comparator_.comparator.user_comparator()->Compare( - Slice(key_ptr, key_length - 8), key.user_key()) == 0) { - // Correct user key - const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); - switch (static_cast(tag & 0xff)) { - case kTypeValue: { - if (options.inplace_update_support) { - GetLock(key.user_key())->ReadLock(); - } - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - *s = Status::OK(); - if (merge_in_progress) { - assert(merge_operator); - if (!merge_operator->FullMerge(key.user_key(), &v, - merge_context.GetOperands(), value, - logger.get())) { - RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES); - *s = Status::Corruption("Error: Could not perform merge."); - } - } else { - value->assign(v.data(), v.size()); - } - if (options.inplace_update_support) { - GetLock(key.user_key())->Unlock(); - } - found_final_value = true; - break; - } - case kTypeDeletion: { - if (merge_in_progress) { - assert(merge_operator); - *s = Status::OK(); - if (!merge_operator->FullMerge(key.user_key(), nullptr, - merge_context.GetOperands(), value, - logger.get())) { - RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES); - *s = Status::Corruption("Error: Could not perform merge."); - } - } else { - *s = Status::NotFound(); - } - found_final_value = true; - break; - } - case kTypeMerge: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - merge_in_progress = true; - merge_context.PushOperand(v); - while(merge_context.GetNumOperands() >= 2) { - // Attempt to associative merge. (Returns true if successful) - if (merge_operator->PartialMerge(key.user_key(), - merge_context.GetOperand(0), - merge_context.GetOperand(1), - &merge_result, logger.get())) { - merge_context.PushPartialMergeResult(merge_result); - } else { - // Stack them because user can't associative merge - break; - } - } - break; - } - default: - assert(false); - break; - } - } else { - // exit loop if user key does not match - break; - } + Saver saver; + saver.status = s; + saver.found_final_value = &found_final_value; + saver.merge_in_progress = &merge_in_progress; + saver.key = &key; + saver.value = value; + saver.status = s; + saver.mem = this; + saver.merge_context = &merge_context; + saver.merge_operator = options.merge_operator.get(); + saver.logger = options.info_log.get(); + saver.inplace_update_support = options.inplace_update_support; + saver.statistics = options.statistics.get(); + table_->Get(key, &saver, SaveValue); } // No change to value, since we have not yet found a Put/Delete - if (!found_final_value && merge_in_progress) { *s = Status::MergeInProgress(""); } @@ -489,4 +520,13 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { return num_successive_merges; } +void MemTableRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + auto iter = GetIterator(k.user_key()); + for (iter->Seek(k.internal_key(), k.memtable_key().data()); + iter->Valid() && callback_func(callback_args, iter->key()); + iter->Next()) { + } +} + } // namespace rocksdb diff --git a/db/memtable.h b/db/memtable.h index 61bebaee0..6f36ce4a1 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -154,6 +154,13 @@ class MemTable { // Notify the underlying storage that no more items will be added void MarkImmutable() { table_->MarkReadOnly(); } + // Get the lock associated for the key + port::RWMutex* GetLock(const Slice& key); + + const InternalKeyComparator& GetInternalKeyComparator() const { + return comparator_.comparator; + } + private: friend class MemTableIterator; friend class MemTableBackwardIterator; @@ -190,9 +197,6 @@ class MemTable { MemTable(const MemTable&); void operator=(const MemTable&); - // Get the lock associated for the key - port::RWMutex* GetLock(const Slice& key); - const SliceTransform* const prefix_extractor_; std::unique_ptr prefix_bloom_; }; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 240edde15..25971755d 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -120,31 +120,34 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { flush_requested_ = false; // start-flush request is complete } +void MemTableList::RollbackMemtableFlush(const autovector& mems, + uint64_t file_number, + std::set* pending_outputs) { + assert(!mems.empty()); + + // If the flush was not successful, then just reset state. + // Maybe a suceeding attempt to flush will be successful. + for (MemTable* m : mems) { + assert(m->flush_in_progress_); + assert(m->file_number_ == 0); + + m->flush_in_progress_ = false; + m->flush_completed_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + } + pending_outputs->erase(file_number); + imm_flush_needed.Release_Store(reinterpret_cast(1)); +} + // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const autovector& mems, VersionSet* vset, - Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, + port::Mutex* mu, Logger* info_log, uint64_t file_number, std::set& pending_outputs, autovector* to_delete, Directory* db_directory) { mu->AssertHeld(); - // If the flush was not successful, then just reset state. - // Maybe a suceeding attempt to flush will be successful. - if (!flushStatus.ok()) { - for (MemTable* m : mems) { - assert(m->flush_in_progress_); - assert(m->file_number_ == 0); - - m->flush_in_progress_ = false; - m->flush_completed_ = false; - m->edit_.Clear(); - num_flush_not_started_++; - imm_flush_needed.Release_Store((void *)1); - pending_outputs.erase(file_number); - } - return flushStatus; - } - // flush was sucessful for (size_t i = 0; i < mems.size(); ++i) { // All the edits are associated with the first memtable of this batch. @@ -216,7 +219,6 @@ Status MemTableList::InstallMemtableFlushResults( pending_outputs.erase(m->file_number_); m->file_number_ = 0; imm_flush_needed.Release_Store((void *)1); - s = Status::IOError("Unable to commit flushed memtable"); } ++mem_id; } while (!current_->memlist_.empty() && (m = current_->memlist_.back()) && diff --git a/db/memtable_list.h b/db/memtable_list.h index 9ade48798..1013c8ff8 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -83,8 +83,8 @@ class MemTableList { MemTableListVersion* current() { return current_; } - // so that backgrund threads can detect non-nullptr pointer to - // determine whether this is anything more to start flushing. + // so that background threads can detect non-nullptr pointer to + // determine whether there is anything more to start flushing. port::AtomicPointer imm_flush_needed; // Returns the total number of memtables in the list @@ -98,12 +98,20 @@ class MemTableList { // memtables are guaranteed to be in the ascending order of created time. void PickMemtablesToFlush(autovector* mems); + // Reset status of the given memtable list back to pending state so that + // they can get picked up again on the next round of flush. + void RollbackMemtableFlush(const autovector& mems, + uint64_t file_number, + std::set* pending_outputs); + // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults( - ColumnFamilyData* cfd, const autovector& m, VersionSet* vset, - Status flushStatus, port::Mutex* mu, Logger* info_log, - uint64_t file_number, std::set& pending_outputs, - autovector* to_delete, Directory* db_directory); + Status InstallMemtableFlushResults(ColumnFamilyData* cfd, + const autovector& m, + VersionSet* vset, port::Mutex* mu, + Logger* info_log, uint64_t file_number, + std::set& pending_outputs, + autovector* to_delete, + Directory* db_directory); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/repair.cc b/db/repair.cc index ed11870b0..a3f311a25 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -127,7 +127,7 @@ class Repairer { return status; } if (filenames.empty()) { - return Status::IOError(dbname_, "repair found no files"); + return Status::Corruption(dbname_, "repair found no files"); } uint64_t number; diff --git a/db/simple_table_db_test.cc b/db/simple_table_db_test.cc index 3d1420c0c..a67114663 100644 --- a/db/simple_table_db_test.cc +++ b/db/simple_table_db_test.cc @@ -96,7 +96,7 @@ public: void SetupForCompaction() override; - TableProperties& GetTableProperties() override; + std::shared_ptr GetTableProperties() const override; ~SimpleTableReader(); @@ -172,7 +172,7 @@ struct SimpleTableReader::Rep { unique_ptr file; uint64_t index_start_offset; int num_entries; - TableProperties table_properties; + std::shared_ptr table_properties; const static int user_key_size = 16; const static int offset_length = 8; @@ -215,7 +215,8 @@ Status SimpleTableReader::Open(const Options& options, void SimpleTableReader::SetupForCompaction() { } -TableProperties& SimpleTableReader::GetTableProperties() { +std::shared_ptr SimpleTableReader::GetTableProperties() + const { return rep_->table_properties; } diff --git a/db/skiplist.h b/db/skiplist.h index e713fe42a..e4a253bcc 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -33,6 +33,7 @@ #pragma once #include #include +#include "util/arena.h" #include "port/port.h" #include "util/arena.h" #include "util/random.h" diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 961a7302b..a9f770ca5 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -116,7 +116,7 @@ class RegularKeysStartWithA: public TablePropertiesCollector { } virtual UserCollectedProperties GetReadableProperties() const { - return {}; + return UserCollectedProperties{}; } @@ -157,7 +157,7 @@ void TestCustomizedTablePropertiesCollector( // -- Step 2: Read properties FakeRandomeAccessFile readable(writable->contents()); - TableProperties props; + TableProperties* props; Status s = ReadTableProperties( &readable, writable->contents().size(), @@ -166,9 +166,10 @@ void TestCustomizedTablePropertiesCollector( nullptr, &props ); + std::unique_ptr props_guard(props); ASSERT_OK(s); - auto user_collected = props.user_collected_properties; + auto user_collected = props->user_collected_properties; ASSERT_EQ("Rocksdb", user_collected.at("TablePropertiesTest")); @@ -256,7 +257,7 @@ void TestInternalKeyPropertiesCollector( ASSERT_OK(builder->Finish()); FakeRandomeAccessFile readable(writable->contents()); - TableProperties props; + TableProperties* props; Status s = ReadTableProperties( &readable, writable->contents().size(), @@ -267,7 +268,8 @@ void TestInternalKeyPropertiesCollector( ); ASSERT_OK(s); - auto user_collected = props.user_collected_properties; + std::unique_ptr props_guard(props); + auto user_collected = props->user_collected_properties; uint64_t deleted = GetDeletedKeys(user_collected); ASSERT_EQ(4u, deleted); diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 077cf048c..8a9b988ab 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -46,9 +46,6 @@ Status TransactionLogIteratorImpl::OpenLogFile( // Try the archive dir, as it could have moved in the meanwhile. fname = ArchivedLogFileName(dir_, logFile->LogNumber()); status = env->NewSequentialFile(fname, file, soptions_); - if (!status.ok()) { - return Status::IOError("Requested file not present in the dir"); - } } return status; } @@ -187,7 +184,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) { currentStatus_ = Status::OK(); } else { - currentStatus_ = Status::IOError("NO MORE DATA LEFT"); + currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); } return; } diff --git a/db/version_set.cc b/db/version_set.cc index d9fbce255..972b887f5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1855,8 +1855,18 @@ Status VersionSet::Recover( if (s.ok()) { for (auto cfd : *column_family_set_) { + auto builders_iter = builders.find(cfd->GetID()); + assert(builders_iter != builders.end()); + auto builder = builders_iter->second; + + if (options_->max_open_files == -1) { + // unlimited table cache. Pre-load table handle now. + // Need to do it out of the mutex. + builder->LoadTableHandlers(); + } + Version* v = new Version(cfd, this, current_version_number_++); - builders[cfd->GetID()]->SaveTo(v); + builder->SaveTo(v); // Install recovered version std::vector size_being_compacted(v->NumberLevels() - 1); diff --git a/db/version_set.h b/db/version_set.h index aa29640b2..49e47b2d3 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -46,6 +46,7 @@ class VersionSet; class MergeContext; struct ColumnFamilyData; class ColumnFamilySet; +class LookupKey; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index bd22e191b..405b292da 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -238,8 +238,10 @@ extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*); enum { rocksdb_no_compression = 0, rocksdb_snappy_compression = 1, - rocksdb_zlib_compression = 1, - rocksdb_bz2_compression = 1 + rocksdb_zlib_compression = 2, + rocksdb_bz2_compression = 3, + rocksdb_lz4_compression = 4, + rocksdb_lz4hc_compression = 5 }; extern void rocksdb_options_set_compression(rocksdb_options_t*, int); diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index f24132a6f..dfd2f928b 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -25,6 +25,9 @@ class CompactionFilter { struct Context { // Does this compaction run include all data files bool is_full_compaction; + // Is this compaction requested by the client (true), + // or is it occurring as an automatic compaction process + bool is_manual_compaction; }; virtual ~CompactionFilter() {} diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2f348d54f..0ba4a0ecf 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -52,6 +52,7 @@ struct ReadOptions; struct WriteOptions; struct FlushOptions; class WriteBatch; +class Env; // Metadata associated with each SST file. struct LiveFileMetaData { @@ -379,7 +380,7 @@ class DB { // GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup - // THIS METHOD IS DEPRECATED. Use the GetTableMetaData to get more + // THIS METHOD IS DEPRECATED. Use the GetLiveFilesMetaData to get more // detailed information on the live files. // Retrieve the list of all files in the database. The files are // relative to the dbname and are not absolute paths. The valid size of the diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index ab3af26c1..428f27d4e 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -36,10 +36,12 @@ #pragma once #include +#include namespace rocksdb { class Arena; +class LookupKey; class Slice; class SliceTransform; @@ -73,6 +75,20 @@ class MemTableRep { // nothing. virtual void MarkReadOnly() { } + // Look up key from the mem table, since the first key in the mem table whose + // user_key matches the one given k, call the function callback_func(), with + // callback_args directly forwarded as the first parameter, and the mem table + // key as the second parameter. If the return value is false, then terminates. + // Otherwise, go through the next key. + // It's safe for Get() to terminate after having finished all the potential + // key for the k.user_key(), or not. + // + // Default: + // Get() function with a default value of dynamically construct an iterator, + // seek and call the call back function. + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)); + // Report an approximation of how much memory has been used other than memory // that was allocated through the arena. virtual size_t ApproximateMemoryUsage() = 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 33acc318f..2caba3cc6 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -45,10 +45,8 @@ using std::shared_ptr; enum CompressionType : char { // NOTE: do not change the values of existing entries, as these are // part of the persistent format on disk. - kNoCompression = 0x0, - kSnappyCompression = 0x1, - kZlibCompression = 0x2, - kBZip2Compression = 0x3 + kNoCompression = 0x0, kSnappyCompression = 0x1, kZlibCompression = 0x2, + kBZip2Compression = 0x3, kLZ4Compression = 0x4, kLZ4HCCompression = 0x5 }; enum CompactionStyle : char { diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index e2304fdb6..dbd41fc9b 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -61,6 +61,10 @@ class Status { static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIncomplete, msg, msg2); } + static Status ShutdownInProgress(const Slice& msg, + const Slice& msg2 = Slice()) { + return Status(kShutdownInProgress, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -86,6 +90,9 @@ class Status { // Returns true iff the status indicates Incomplete bool IsIncomplete() const { return code() == kIncomplete; } + // Returns true iff the status indicates Incomplete + bool IsShutdownInProgress() const { return code() == kShutdownInProgress; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -99,7 +106,8 @@ class Status { kInvalidArgument = 4, kIOError = 5, kMergeInProgress = 6, - kIncomplete = 7 + kIncomplete = 7, + kShutdownInProgress = 8 }; // A nullptr state_ (which is always the case for OK) means the message diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 1d4b9e344..55b83f441 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -4,7 +4,7 @@ #pragma once #include -#include +#include #include "rocksdb/status.h" namespace rocksdb { @@ -14,7 +14,16 @@ namespace rocksdb { // collected properties. // The value of the user-collected properties are encoded as raw bytes -- // users have to interprete these values by themselves. -typedef std::unordered_map UserCollectedProperties; +// Note: To do prefix seek/scan in `UserCollectedProperties`, you can do +// something similar to: +// +// UserCollectedProperties props = ...; +// for (auto pos = props.lower_bound(prefix); +// pos != props.end() && pos->first.compare(0, prefix.size(), prefix) == 0; +// ++pos) { +// ... +// } +typedef std::map UserCollectedProperties; // TableProperties contains a bunch of read-only properties of its associated // table. diff --git a/port/port_posix.h b/port/port_posix.h index 839e89afe..aaea0b574 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -46,6 +46,11 @@ #include #endif +#if defined(LZ4) +#include +#include +#endif + #include #include #include @@ -353,8 +358,8 @@ inline bool BZip2_Compress(const CompressionOptions& opts, const char* input, return false; } -inline char* BZip2_Uncompress(const char* input_data, size_t input_length, - int* decompress_size) { +inline char* BZip2_Uncompress(const char* input_data, size_t input_length, + int* decompress_size) { #ifdef BZIP2 bz_stream _stream; memset(&_stream, 0, sizeof(bz_stream)); @@ -409,7 +414,64 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, return nullptr; } -inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { +inline bool LZ4_Compress(const CompressionOptions &opts, const char *input, + size_t length, ::std::string* output) { +#ifdef LZ4 + int compressBound = LZ4_compressBound(length); + output->resize(8 + compressBound); + char *p = const_cast(output->c_str()); + memcpy(p, &length, sizeof(length)); + size_t outlen; + outlen = LZ4_compress_limitedOutput(input, p + 8, length, compressBound); + if (outlen == 0) { + return false; + } + output->resize(8 + outlen); + return true; +#endif + return false; +} + +inline char* LZ4_Uncompress(const char* input_data, size_t input_length, + int* decompress_size) { +#ifdef LZ4 + if (input_length < 8) { + return nullptr; + } + int output_len; + memcpy(&output_len, input_data, sizeof(output_len)); + char *output = new char[output_len]; + *decompress_size = LZ4_decompress_safe_partial( + input_data + 8, output, input_length - 8, output_len, output_len); + if (*decompress_size < 0) { + delete[] output; + return nullptr; + } + return output; +#endif + return nullptr; +} + +inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input, + size_t length, ::std::string* output) { +#ifdef LZ4 + int compressBound = LZ4_compressBound(length); + output->resize(8 + compressBound); + char *p = const_cast(output->c_str()); + memcpy(p, &length, sizeof(length)); + size_t outlen; + outlen = LZ4_compressHC2_limitedOutput(input, p + 8, length, compressBound, + opts.level); + if (outlen == 0) { + return false; + } + output->resize(8 + outlen); + return true; +#endif + return false; +} + +inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) { return false; } diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index e5f3bd4d2..75f204670 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -233,6 +233,30 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, type = kNoCompression; } break; + case kLZ4Compression: + if (port::LZ4_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && + GoodCompressionRatio(compressed->size(), raw.size())) { + block_contents = *compressed; + } else { + // LZ4 not supported, or not good compression ratio, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; + case kLZ4HCCompression: + if (port::LZ4HC_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && + GoodCompressionRatio(compressed->size(), raw.size())) { + block_contents = *compressed; + } else { + // LZ4 not supported, or not good compression ratio, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; } WriteRawBlock(block_contents, type, handle); r->compressed_output.clear(); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index f4dd5b2ec..88ec65557 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -62,7 +62,7 @@ struct BlockBasedTable::Rep { unique_ptr index_block; unique_ptr filter; - TableProperties table_properties; + std::shared_ptr table_properties; }; BlockBasedTable::~BlockBasedTable() { @@ -255,9 +255,10 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, meta_iter->Seek(kPropertiesBlock); if (meta_iter->Valid() && meta_iter->key() == kPropertiesBlock) { s = meta_iter->status(); + TableProperties* table_properties = nullptr; if (s.ok()) { s = ReadProperties(meta_iter->value(), rep->file.get(), rep->options.env, - rep->options.info_log.get(), &rep->table_properties); + rep->options.info_log.get(), &table_properties); } if (!s.ok()) { @@ -265,6 +266,8 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, "[Warning] Encountered error while reading data from properties " "block " + s.ToString(); Log(rep->options.info_log, "%s", err_msg.c_str()); + } else { + rep->table_properties.reset(table_properties); } } @@ -339,7 +342,8 @@ void BlockBasedTable::SetupForCompaction() { compaction_optimized_ = true; } -const TableProperties& BlockBasedTable::GetTableProperties() { +std::shared_ptr BlockBasedTable::GetTableProperties() + const { return rep_->table_properties; } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 58e5b0716..c711e7036 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -86,7 +86,7 @@ class BlockBasedTable : public TableReader { // posix_fadvise void SetupForCompaction() override; - const TableProperties& GetTableProperties() override; + std::shared_ptr GetTableProperties() const override; ~BlockBasedTable(); diff --git a/table/format.cc b/table/format.cc index 561d1689a..f1adf97da 100644 --- a/table/format.cc +++ b/table/format.cc @@ -10,6 +10,7 @@ #include "table/format.h" #include +#include #include "port/port.h" #include "rocksdb/env.h" @@ -64,7 +65,8 @@ Status Footer::DecodeFrom(Slice* input) { if (magic != table_magic_number()) { char buffer[80]; snprintf(buffer, sizeof(buffer) - 1, - "not an sstable (bad magic number --- %lx)", magic); + "not an sstable (bad magic number --- %lx)", + (long)magic); return Status::InvalidArgument(buffer); } } else { @@ -228,6 +230,28 @@ Status UncompressBlockContents(const char* data, size_t n, result->heap_allocated = true; result->cachable = true; break; + case kLZ4Compression: + ubuf = port::LZ4_Uncompress(data, n, &decompress_size); + static char lz4_corrupt_msg[] = + "LZ4 not supported or corrupted LZ4 compressed block contents"; + if (!ubuf) { + return Status::Corruption(lz4_corrupt_msg); + } + result->data = Slice(ubuf, decompress_size); + result->heap_allocated = true; + result->cachable = true; + break; + case kLZ4HCCompression: + ubuf = port::LZ4_Uncompress(data, n, &decompress_size); + static char lz4hc_corrupt_msg[] = + "LZ4HC not supported or corrupted LZ4HC compressed block contents"; + if (!ubuf) { + return Status::Corruption(lz4hc_corrupt_msg); + } + result->data = Slice(ubuf, decompress_size); + result->heap_allocated = true; + result->cachable = true; + break; default: return Status::Corruption("bad block type"); } diff --git a/table/format.h b/table/format.h index 64fa3fbe8..ed292347e 100644 --- a/table/format.h +++ b/table/format.h @@ -109,7 +109,7 @@ class Footer { kEncodedLength = 2 * BlockHandle::kMaxEncodedLength + 8 }; - const uint64_t kInvalidTableMagicNumber = 0; + static const uint64_t kInvalidTableMagicNumber = 0; private: // Set the table_magic_number only when it was not previously diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index fac84a01c..fa84b5a38 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -133,12 +133,9 @@ bool NotifyCollectTableCollectorsOnFinish( return all_succeeded; } -Status ReadProperties( - const Slice& handle_value, - RandomAccessFile* file, - Env* env, - Logger* logger, - TableProperties* table_properties) { +Status ReadProperties(const Slice& handle_value, RandomAccessFile* file, + Env* env, Logger* logger, + TableProperties** table_properties) { assert(table_properties); Slice v = handle_value; @@ -161,18 +158,22 @@ Status ReadProperties( std::unique_ptr iter( properties_block.NewIterator(BytewiseComparator())); + auto new_table_properties = new TableProperties(); // All pre-defined properties of type uint64_t std::unordered_map predefined_uint64_properties = { - {TablePropertiesNames::kDataSize, &table_properties->data_size}, - {TablePropertiesNames::kIndexSize, &table_properties->index_size}, - {TablePropertiesNames::kFilterSize, &table_properties->filter_size}, - {TablePropertiesNames::kRawKeySize, &table_properties->raw_key_size}, - {TablePropertiesNames::kRawValueSize, &table_properties->raw_value_size}, + {TablePropertiesNames::kDataSize, &new_table_properties->data_size}, + {TablePropertiesNames::kIndexSize, &new_table_properties->index_size}, + {TablePropertiesNames::kFilterSize, &new_table_properties->filter_size}, + {TablePropertiesNames::kRawKeySize, &new_table_properties->raw_key_size}, + {TablePropertiesNames::kRawValueSize, + &new_table_properties->raw_value_size}, {TablePropertiesNames::kNumDataBlocks, - &table_properties->num_data_blocks}, - {TablePropertiesNames::kNumEntries, &table_properties->num_entries}, - {TablePropertiesNames::kFormatVersion, &table_properties->format_version}, - {TablePropertiesNames::kFixedKeyLen, &table_properties->fixed_key_len}}; + &new_table_properties->num_data_blocks}, + {TablePropertiesNames::kNumEntries, &new_table_properties->num_entries}, + {TablePropertiesNames::kFormatVersion, + &new_table_properties->format_version}, + {TablePropertiesNames::kFixedKeyLen, + &new_table_properties->fixed_key_len}, }; std::string last_key; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -203,24 +204,25 @@ Status ReadProperties( } *(pos->second) = val; } else if (key == TablePropertiesNames::kFilterPolicy) { - table_properties->filter_policy_name = raw_val.ToString(); + new_table_properties->filter_policy_name = raw_val.ToString(); } else { // handle user-collected properties - table_properties->user_collected_properties.insert( + new_table_properties->user_collected_properties.insert( {key, raw_val.ToString()}); } } + if (s.ok()) { + *table_properties = new_table_properties; + } else { + delete new_table_properties; + } return s; } -Status ReadTableProperties( - RandomAccessFile* file, - uint64_t file_size, - uint64_t table_magic_number, - Env* env, - Logger* info_log, - TableProperties* properties) { +Status ReadTableProperties(RandomAccessFile* file, uint64_t file_size, + uint64_t table_magic_number, Env* env, + Logger* info_log, TableProperties** properties) { // -- Read metaindex block Footer footer(table_magic_number); auto s = ReadFooterFromFile(file, file_size, &footer); diff --git a/table/meta_blocks.h b/table/meta_blocks.h index 8994b01f3..f74e66592 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -103,21 +103,20 @@ bool NotifyCollectTableCollectorsOnFinish( PropertyBlockBuilder* builder); // Read the properties from the table. -Status ReadProperties( - const Slice& handle_value, - RandomAccessFile* file, - Env* env, - Logger* logger, - TableProperties* table_properties); +// @returns a status to indicate if the operation succeeded. On success, +// *table_properties will point to a heap-allocated TableProperties +// object, otherwise value of `table_properties` will not be modified. +Status ReadProperties(const Slice& handle_value, RandomAccessFile* file, + Env* env, Logger* logger, + TableProperties** table_properties); // Directly read the properties from the properties block of a plain table. -Status ReadTableProperties( - RandomAccessFile* file, - uint64_t file_size, - uint64_t table_magic_number, - Env* env, - Logger* info_log, - TableProperties* properties); +// @returns a status to indicate if the operation succeeded. On success, +// *table_properties will point to a heap-allocated TableProperties +// object, otherwise value of `table_properties` will not be modified. +Status ReadTableProperties(RandomAccessFile* file, uint64_t file_size, + uint64_t table_magic_number, Env* env, + Logger* info_log, TableProperties** properties); // Read the magic number of the specified file directly. The magic number // of a valid sst table the last 8-byte of the file. diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index b07862bad..cf1025097 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -87,15 +87,15 @@ PlainTableReader::PlainTableReader(const EnvOptions& storage_options, const InternalKeyComparator& icomparator, uint64_t file_size, int bloom_bits_per_key, double hash_table_ratio, - const TableProperties& table_properties) + const TableProperties* table_properties) : soptions_(storage_options), internal_comparator_(icomparator), file_size_(file_size), kHashTableRatio(hash_table_ratio), kBloomBitsPerKey(bloom_bits_per_key), table_properties_(table_properties), - data_end_offset_(table_properties_.data_size), - user_key_len_(table_properties.fixed_key_len) {} + data_end_offset_(table_properties_->data_size), + user_key_len_(table_properties->fixed_key_len) {} PlainTableReader::~PlainTableReader() { delete[] hash_table_; @@ -117,17 +117,16 @@ Status PlainTableReader::Open(const Options& options, return Status::NotSupported("File is too large for PlainTableReader!"); } - TableProperties table_properties; + TableProperties* props = nullptr; auto s = ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber, - options.env, options.info_log.get(), - &table_properties); + options.env, options.info_log.get(), &props); if (!s.ok()) { return s; } - std::unique_ptr new_reader(new PlainTableReader( - soptions, internal_comparator, file_size, bloom_bits_per_key, - hash_table_ratio, table_properties)); + std::unique_ptr new_reader( + new PlainTableReader(soptions, internal_comparator, file_size, + bloom_bits_per_key, hash_table_ratio, props)); new_reader->file_ = std::move(file); new_reader->options_ = options; diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index 1abe4e35c..dd7b1e50f 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -64,13 +64,15 @@ class PlainTableReader: public TableReader { void SetupForCompaction(); - const TableProperties& GetTableProperties() { return table_properties_; } + std::shared_ptr GetTableProperties() const { + return table_properties_; + } PlainTableReader(const EnvOptions& storage_options, const InternalKeyComparator& internal_comparator, uint64_t file_size, int bloom_num_bits, double hash_table_ratio, - const TableProperties& table_properties); + const TableProperties* table_properties); ~PlainTableReader(); private: @@ -95,7 +97,7 @@ class PlainTableReader: public TableReader { const int kBloomBitsPerKey; DynamicBloom* bloom_ = nullptr; - TableProperties table_properties_; + std::shared_ptr table_properties_; const uint32_t data_start_offset_ = 0; const uint32_t data_end_offset_; const size_t user_key_len_; diff --git a/table/table_reader.h b/table/table_reader.h index 9acbb33d0..3d2738c9c 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include namespace rocksdb { @@ -47,7 +48,7 @@ class TableReader { // posix_fadvise virtual void SetupForCompaction() = 0; - virtual const TableProperties& GetTableProperties() = 0; + virtual std::shared_ptr GetTableProperties() const = 0; // Calls (*result_handler)(handle_context, ...) repeatedly, starting with // the entry found after a call to Seek(key), until result_handler returns diff --git a/table/table_test.cc b/table/table_test.cc index bac5a54ed..d31cb8396 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -488,30 +488,62 @@ class DBConstructor: public Constructor { }; static bool SnappyCompressionSupported() { +#ifdef SNAPPY std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; return port::Snappy_Compress(Options().compression_opts, in.data(), in.size(), &out); +#else + return false; +#endif } static bool ZlibCompressionSupported() { +#ifdef ZLIB std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; return port::Zlib_Compress(Options().compression_opts, in.data(), in.size(), &out); +#else + return false; +#endif } -#ifdef BZIP2 static bool BZip2CompressionSupported() { +#ifdef BZIP2 std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; return port::BZip2_Compress(Options().compression_opts, in.data(), in.size(), &out); +#else + return false; +#endif } + +static bool LZ4CompressionSupported() { +#ifdef LZ4 + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::LZ4_Compress(Options().compression_opts, in.data(), in.size(), + &out); +#else + return false; #endif +} + +static bool LZ4HCCompressionSupported() { +#ifdef LZ4 + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::LZ4HC_Compress(Options().compression_opts, in.data(), in.size(), + &out); +#else + return false; +#endif +} enum TestType { BLOCK_BASED_TABLE_TEST, @@ -539,24 +571,23 @@ static std::vector GenerateArgList() { std::vector restart_intervals = {16, 1, 1024}; // Only add compression if it is supported - std::vector compression_types = {kNoCompression}; -#ifdef SNAPPY + std::vector compression_types; + compression_types.push_back(kNoCompression); if (SnappyCompressionSupported()) { compression_types.push_back(kSnappyCompression); } -#endif - -#ifdef ZLIB if (ZlibCompressionSupported()) { compression_types.push_back(kZlibCompression); } -#endif - -#ifdef BZIP2 if (BZip2CompressionSupported()) { compression_types.push_back(kBZip2Compression); } -#endif + if (LZ4CompressionSupported()) { + compression_types.push_back(kLZ4Compression); + } + if (LZ4HCCompressionSupported()) { + compression_types.push_back(kLZ4HCCompression); + } for (auto test_type : test_types) { for (auto reverse_compare : reverse_compare_types) { @@ -908,6 +939,44 @@ class TableTest { class GeneralTableTest : public TableTest {}; class BlockBasedTableTest : public TableTest {}; class PlainTableTest : public TableTest {}; +class TablePropertyTest {}; + +// This test serves as the living tutorial for the prefix scan of user collected +// properties. +TEST(TablePropertyTest, PrefixScanTest) { + UserCollectedProperties props{{"num.111.1", "1"}, + {"num.111.2", "2"}, + {"num.111.3", "3"}, + {"num.333.1", "1"}, + {"num.333.2", "2"}, + {"num.333.3", "3"}, + {"num.555.1", "1"}, + {"num.555.2", "2"}, + {"num.555.3", "3"}, }; + + // prefixes that exist + for (const std::string& prefix : {"num.111", "num.333", "num.555"}) { + int num = 0; + for (auto pos = props.lower_bound(prefix); + pos != props.end() && + pos->first.compare(0, prefix.size(), prefix) == 0; + ++pos) { + ++num; + auto key = prefix + "." + std::to_string(num); + ASSERT_EQ(key, pos->first); + ASSERT_EQ(std::to_string(num), pos->second); + } + ASSERT_EQ(3, num); + } + + // prefixes that don't exist + for (const std::string& prefix : + {"num.000", "num.222", "num.444", "num.666"}) { + auto pos = props.lower_bound(prefix); + ASSERT_TRUE(pos == props.end() || + pos->first.compare(0, prefix.size(), prefix) != 0); + } +} // This test include all the basic checks except those for index size and block // size, which will be conducted in separated unit tests. @@ -933,7 +1002,7 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) { c.Finish(options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); - auto& props = c.table_reader()->GetTableProperties(); + auto& props = *c.table_reader()->GetTableProperties(); ASSERT_EQ(kvmap.size(), props.num_entries); auto raw_key_size = kvmap.size() * 2ul; @@ -964,7 +1033,7 @@ TEST(BlockBasedTableTest, FilterPolicyNameProperties) { c.Finish(options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); - auto& props = c.table_reader()->GetTableProperties(); + auto& props = *c.table_reader()->GetTableProperties(); ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name); } @@ -1006,8 +1075,7 @@ TEST(BlockBasedTableTest, IndexSizeStat) { c.Finish(options, GetPlainInternalComparator(options.comparator), &ks, &kvmap); - auto index_size = - c.table_reader()->GetTableProperties().index_size; + auto index_size = c.table_reader()->GetTableProperties()->index_size; ASSERT_GT(index_size, last_index_size); last_index_size = index_size; } @@ -1032,7 +1100,7 @@ TEST(BlockBasedTableTest, NumBlockStat) { c.Finish(options, GetPlainInternalComparator(options.comparator), &ks, &kvmap); ASSERT_EQ(kvmap.size(), - c.table_reader()->GetTableProperties().num_data_blocks); + c.table_reader()->GetTableProperties()->num_data_blocks); } class BlockCacheProperties { @@ -1238,18 +1306,19 @@ TEST(PlainTableTest, BasicPlainTableProperties) { StringSource source(sink.contents(), 72242, true); - TableProperties props; + TableProperties* props = nullptr; auto s = ReadTableProperties(&source, sink.contents().size(), kPlainTableMagicNumber, Env::Default(), nullptr, &props); + std::unique_ptr props_guard(props); ASSERT_OK(s); - ASSERT_EQ(0ul, props.index_size); - ASSERT_EQ(0ul, props.filter_size); - ASSERT_EQ(16ul * 26, props.raw_key_size); - ASSERT_EQ(28ul * 26, props.raw_value_size); - ASSERT_EQ(26ul, props.num_entries); - ASSERT_EQ(1ul, props.num_data_blocks); + ASSERT_EQ(0ul, props->index_size); + ASSERT_EQ(0ul, props->filter_size); + ASSERT_EQ(16ul * 26, props->raw_key_size); + ASSERT_EQ(28ul * 26, props->raw_value_size); + ASSERT_EQ(26ul, props->num_entries); + ASSERT_EQ(1ul, props->num_data_blocks); } TEST(GeneralTableTest, ApproximateOffsetOfPlain) { @@ -1307,24 +1376,42 @@ static void DoCompressionTest(CompressionType comp) { } TEST(GeneralTableTest, ApproximateOffsetOfCompressed) { - CompressionType compression_state[2]; - int valid = 0; + std::vector compression_state; if (!SnappyCompressionSupported()) { fprintf(stderr, "skipping snappy compression tests\n"); } else { - compression_state[valid] = kSnappyCompression; - valid++; + compression_state.push_back(kSnappyCompression); } if (!ZlibCompressionSupported()) { fprintf(stderr, "skipping zlib compression tests\n"); } else { - compression_state[valid] = kZlibCompression; - valid++; + compression_state.push_back(kZlibCompression); + } + + // TODO(kailiu) DoCompressionTest() doesn't work with BZip2. + /* + if (!BZip2CompressionSupported()) { + fprintf(stderr, "skipping bzip2 compression tests\n"); + } else { + compression_state.push_back(kBZip2Compression); + } + */ + + if (!LZ4CompressionSupported()) { + fprintf(stderr, "skipping lz4 compression tests\n"); + } else { + compression_state.push_back(kLZ4Compression); + } + + if (!LZ4HCCompressionSupported()) { + fprintf(stderr, "skipping lz4hc compression tests\n"); + } else { + compression_state.push_back(kLZ4HCCompression); } - for (int i = 0; i < valid; i++) { - DoCompressionTest(compression_state[i]); + for (auto state : compression_state) { + DoCompressionTest(state); } } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index bad2cf0d6..9bb581a5b 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -273,6 +273,10 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { return rocksdb::kZlibCompression; else if (!strcasecmp(ctype, "bzip2")) return rocksdb::kBZip2Compression; + else if (!strcasecmp(ctype, "lz4")) + return rocksdb::kLZ4Compression; + else if (!strcasecmp(ctype, "lz4hc")) + return rocksdb::kLZ4HCCompression; fprintf(stdout, "Cannot parse compression type '%s'\n", ctype); return rocksdb::kSnappyCompression; //default value @@ -1328,7 +1332,12 @@ class StressTest { case rocksdb::kBZip2Compression: compression = "bzip2"; break; - } + case rocksdb::kLZ4Compression: + compression = "lz4"; + case rocksdb::kLZ4HCCompression: + compression = "lz4hc"; + break; + } fprintf(stdout, "Compression : %s\n", compression); diff --git a/tools/sst_dump.cc b/tools/sst_dump.cc index 3b82571bf..7abcb2e5a 100644 --- a/tools/sst_dump.cc +++ b/tools/sst_dump.cc @@ -7,6 +7,7 @@ #include #include #include +#include #include "db/dbformat.h" #include "db/memtable.h" @@ -43,7 +44,8 @@ class SstFileReader { bool has_to, const std::string& to_key); - Status ReadTableProperties(TableProperties* table_properties); + Status ReadTableProperties( + std::shared_ptr* table_properties); uint64_t GetReadNumber() { return read_num_; } private: @@ -112,10 +114,11 @@ Status SstFileReader::NewTableReader(const std::string& file_path) { Status SstFileReader::SetTableOptionsByMagicNumber(uint64_t table_magic_number, RandomAccessFile* file, uint64_t file_size) { - TableProperties table_properties; + TableProperties* table_properties; Status s = rocksdb::ReadTableProperties(file, file_size, table_magic_number, options_.env, options_.info_log.get(), &table_properties); + std::unique_ptr props_guard(table_properties); if (!s.ok()) { return s; } @@ -126,13 +129,14 @@ Status SstFileReader::SetTableOptionsByMagicNumber(uint64_t table_magic_number, } else if (table_magic_number == kPlainTableMagicNumber) { options_.allow_mmap_reads = true; options_.table_factory = std::make_shared( - table_properties.fixed_key_len, 2, 0.8); + table_properties->fixed_key_len, 2, 0.8); options_.prefix_extractor = NewNoopTransform(); fprintf(stdout, "Sst file format: plain table\n"); } else { char error_msg_buffer[80]; snprintf(error_msg_buffer, sizeof(error_msg_buffer) - 1, - "Unsupported table magic number --- %lx)", table_magic_number); + "Unsupported table magic number --- %lx", + (long)table_magic_number); return Status::InvalidArgument(error_msg_buffer); } @@ -192,7 +196,8 @@ Status SstFileReader::ReadSequential(bool print_kv, return ret; } -Status SstFileReader::ReadTableProperties(TableProperties* table_properties) { +Status SstFileReader::ReadTableProperties( + std::shared_ptr* table_properties) { if (!table_reader_) { return init_result_; } @@ -335,18 +340,19 @@ int main(int argc, char** argv) { } } if (show_properties) { - rocksdb::TableProperties table_properties; + std::shared_ptr table_properties; st = reader.ReadTableProperties(&table_properties); if (!st.ok()) { fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str()); } else { fprintf(stdout, - "Table Properties:\n" - "------------------------------\n" - " %s", table_properties.ToString("\n ", ": ").c_str()); + "Table Properties:\n" + "------------------------------\n" + " %s", + table_properties->ToString("\n ", ": ").c_str()); fprintf(stdout, "# deleted keys: %zd\n", rocksdb::GetDeletedKeys( - table_properties.user_collected_properties)); + table_properties->user_collected_properties)); } } } diff --git a/util/blob_store.cc b/util/blob_store.cc index 9f0671281..76230679f 100644 --- a/util/blob_store.cc +++ b/util/blob_store.cc @@ -161,7 +161,7 @@ Status BlobStore::Put(const Slice& value, Blob* blob) { if (size_left > 0) { Delete(*blob); - return Status::IOError("Tried to write more data than fits in the blob"); + return Status::Corruption("Tried to write more data than fits in the blob"); } return Status::OK(); @@ -187,9 +187,13 @@ Status BlobStore::Get(const Blob& blob, chunk.size * block_size_, &result, &value->at(offset)); - if (!s.ok() || result.size() < chunk.size * block_size_) { + if (!s.ok()) { + value->clear(); + return s; + } + if (result.size() < chunk.size * block_size_) { value->clear(); - return Status::IOError("Could not read in from file"); + return Status::Corruption("Could not read in from file"); } offset += chunk.size * block_size_; } @@ -236,7 +240,7 @@ Status BlobStore::CreateNewBucket() { MutexLock l(&buckets_mutex_); if (buckets_size_ >= max_buckets_) { - return Status::IOError("Max size exceeded\n"); + return Status::NotSupported("Max size exceeded\n"); } int new_bucket_id = buckets_size_; diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 83f0f3d5a..4db624975 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -64,6 +64,10 @@ class HashLinkListRep : public MemTableRep { virtual size_t ApproximateMemoryUsage() override; + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override; + virtual ~HashLinkListRep(); virtual MemTableRep::Iterator* GetIterator() override; @@ -398,6 +402,19 @@ size_t HashLinkListRep::ApproximateMemoryUsage() { return 0; } +void HashLinkListRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + auto transformed = transform_->Transform(k.user_key()); + auto bucket = GetBucket(transformed); + if (bucket != nullptr) { + Iterator iter(this, bucket); + for (iter.Seek(k.internal_key(), nullptr); + iter.Valid() && callback_func(callback_args, iter.key()); + iter.Next()) { + } + } +} + MemTableRep::Iterator* HashLinkListRep::GetIterator() { auto list = new FullList(compare_, arena_); for (size_t i = 0; i < bucket_size_; ++i) { diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index aa070bc8b..61da5ae41 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -31,6 +31,10 @@ class HashSkipListRep : public MemTableRep { virtual size_t ApproximateMemoryUsage() override; + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override; + virtual ~HashSkipListRep(); virtual MemTableRep::Iterator* GetIterator() override; @@ -271,6 +275,19 @@ size_t HashSkipListRep::ApproximateMemoryUsage() { return sizeof(buckets_); } +void HashSkipListRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + auto transformed = transform_->Transform(k.user_key()); + auto bucket = GetBucket(transformed); + if (bucket != nullptr) { + Bucket::Iterator iter(bucket); + for (iter.Seek(k.memtable_key().data()); + iter.Valid() && callback_func(callback_args, iter.key()); + iter.Next()) { + } + } +} + MemTableRep::Iterator* HashSkipListRep::GetIterator() { auto list = new Bucket(compare_, arena_); for (size_t i = 0; i < bucket_size_; ++i) { diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 870abd01e..77c750456 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -245,6 +245,10 @@ Options LDBCommand::PrepareOptionsForOpenDB() { opt.compression = kZlibCompression; } else if (comp == "bzip2") { opt.compression = kBZip2Compression; + } else if (comp == "lz4") { + opt.compression = kLZ4Compression; + } else if (comp == "lz4hc") { + opt.compression = kLZ4HCCompression; } else { // Unknown compression. exec_state_ = LDBCommandExecuteResult::FAILED( diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index 6f1fb1a15..ab77e7f3a 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -32,6 +32,17 @@ public: return 0; } + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override { + SkipListRep::Iterator iter(&skip_list_); + Slice dummy_slice; + for (iter.Seek(dummy_slice, k.memtable_key().data()); + iter.Valid() && callback_func(callback_args, iter.key()); + iter.Next()) { + } + } + virtual ~SkipListRep() override { } // Iteration over the contents of a skip list diff --git a/util/status.cc b/util/status.cc index 69060a7cc..2a5f05a4b 100644 --- a/util/status.cc +++ b/util/status.cc @@ -60,7 +60,13 @@ std::string Status::ToString() const { type = "IO error: "; break; case kMergeInProgress: - type = "Merge In Progress: "; + type = "Merge in progress: "; + break; + case kIncomplete: + type = "Result incomplete: "; + break; + case kShutdownInProgress: + type = "Shutdown in progress: "; break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 4b8b3d552..e0f3d69b0 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -39,6 +39,10 @@ class VectorRep : public MemTableRep { virtual size_t ApproximateMemoryUsage() override; + virtual void Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, + const char* entry)) override; + virtual ~VectorRep() override { } class Iterator : public MemTableRep::Iterator { @@ -233,6 +237,25 @@ void VectorRep::Iterator::SeekToLast() { } } +void VectorRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + rwlock_.ReadLock(); + VectorRep* vector_rep; + std::shared_ptr bucket; + if (immutable_) { + vector_rep = this; + } else { + vector_rep = nullptr; + bucket.reset(new Bucket(*bucket_)); // make a copy + } + VectorRep::Iterator iter(vector_rep, immutable_ ? bucket_ : bucket, compare_); + rwlock_.Unlock(); + + for (iter.Seek(k.user_key(), k.memtable_key().data()); + iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) { + } +} + MemTableRep::Iterator* VectorRep::GetIterator() { ReadLock l(&rwlock_); // Do not sort here. The sorting would be done the first time diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index da225e22b..89051f25a 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -857,7 +857,6 @@ void BackupEngineImpl::BackupMeta::Delete() { // // // ... -// TODO: maybe add checksum? Status BackupEngineImpl::BackupMeta::LoadFromFile( const std::string& backup_dir) { assert(Empty()); @@ -873,7 +872,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( s = backup_meta_file->Read(max_backup_meta_file_size_, &data, buf.get()); if (!s.ok() || data.size() == max_backup_meta_file_size_) { - return s.ok() ? Status::IOError("File size too big") : s; + return s.ok() ? Status::Corruption("File size too big") : s; } buf[data.size()] = 0; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 5f8614bfe..75853e179 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -172,7 +172,7 @@ class TestEnv : public EnvWrapper { const EnvOptions& options) { written_files_.push_back(f); if (limit_written_files_ <= 0) { - return Status::IOError("Sorry, can't do this"); + return Status::NotSupported("Sorry, can't do this"); } limit_written_files_--; return EnvWrapper::NewWritableFile(f, r, options);