diff --git a/HISTORY.md b/HISTORY.md index f515e11a5..2ef40c4a9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -21,6 +21,7 @@ * CompactRange() will now skip bottommost level compaction for level based compaction if there is no compaction filter, bottommost_level_compaction is introduced in CompactRangeOptions to control when it's possbile to skip bottommost level compaction. This mean that if you want the compaction to produce a single file you need to set bottommost_level_compaction to BottommostLevelCompaction::kForce. * Add Cache.GetPinnedUsage() to get the size of memory occupied by entries that are in use by the system. * DB:Open() will fail if the compression specified in Options is not linked with the binary. If you see this failure, recompile RocksDB with compression libraries present on your system. Also, previously our default compression was snappy. This behavior is now changed. Now, the default compression is snappy only if it's available on the system. If it isn't we change the default to kNoCompression. +* We changed how we account for memory used in block cache. Previously, we only counted the sum of block sizes currently present in block cache. Now, we count the actual memory usage of the blocks. For example, a block of size 4.5KB will use 8KB memory with jemalloc. This might decrease your memory usage and possibly decrease performance. Increase block cache size if you see this happening after an upgrade. ## 3.11.0 (5/19/2015) ### New Features diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index d0769e338..6edc22a25 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -294,6 +294,18 @@ EOF JAVA_LDFLAGS="$JAVA_LDFLAGS -ltcmalloc" fi fi + + # Test whether malloc_usable_size is available + $CXX $CFLAGS -x c++ - -o /dev/null 2>/dev/null < + int main() { + size_t res = malloc_usable_size(0); + return 0; + } +EOF + if [ "$?" = 0 ]; then + COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_MALLOC_USABLE_SIZE" + fi fi # TODO(tec): Fix -Wshorten-64-to-32 errors on FreeBSD and enable the warning. diff --git a/build_tools/fb_compile_mongo.sh b/build_tools/fb_compile_mongo.sh new file mode 100755 index 000000000..c087f8161 --- /dev/null +++ b/build_tools/fb_compile_mongo.sh @@ -0,0 +1,55 @@ +#!/bin/sh + +# fail early +set -e + +if test -z $ROCKSDB_PATH; then + ROCKSDB_PATH=~/rocksdb +fi +source $ROCKSDB_PATH/build_tools/fbcode_config4.8.1.sh + +EXTRA_LDFLAGS="" + +if test -z $ALLOC; then + # default + ALLOC=tcmalloc +elif [[ $ALLOC == "jemalloc" ]]; then + ALLOC=system + EXTRA_LDFLAGS+=" -Wl,--whole-archive $JEMALLOC_LIB -Wl,--no-whole-archive" +fi + +# we need to force mongo to use static library, not shared +STATIC_LIB_DEP_DIR='build/static_library_dependencies' +test -d $STATIC_LIB_DEP_DIR || mkdir $STATIC_LIB_DEP_DIR +test -h $STATIC_LIB_DEP_DIR/`basename $SNAPPY_LIBS` || ln -s $SNAPPY_LIBS $STATIC_LIB_DEP_DIR +test -h $STATIC_LIB_DEP_DIR/`basename $LZ4_LIBS` || ln -s $LZ4_LIBS $STATIC_LIB_DEP_DIR + +EXTRA_LDFLAGS+=" -L $STATIC_LIB_DEP_DIR" + +set -x + +EXTRA_CMD="" +if ! test -e version.json; then + # this is Mongo 3.0 + EXTRA_CMD="--rocksdb \ + --variant-dir=linux2/norm + --cxx=${CXX} \ + --cc=${CC} \ + --use-system-zlib" # add this line back to normal code path + # when https://jira.mongodb.org/browse/SERVER-19123 is resolved +fi + +scons \ + LINKFLAGS="$EXTRA_LDFLAGS $EXEC_LDFLAGS $PLATFORM_LDFLAGS" \ + CCFLAGS="$CXXFLAGS -L $STATIC_LIB_DEP_DIR" \ + LIBS="lz4 gcc stdc++" \ + LIBPATH="$ROCKSDB_PATH" \ + CPPPATH="$ROCKSDB_PATH/include" \ + -j32 \ + --allocator=$ALLOC \ + --nostrip \ + --opt=on \ + --disable-minimum-compiler-version-enforcement \ + --use-system-snappy \ + --disable-warnings-as-errors \ + $EXTRA_CMD $* diff --git a/build_tools/fbcode_config.sh b/build_tools/fbcode_config.sh index 00c1c8a7e..5d7ff53eb 100644 --- a/build_tools/fbcode_config.sh +++ b/build_tools/fbcode_config.sh @@ -18,19 +18,6 @@ GLIBC_REV=7397bed99280af5d9543439cdb7d018af7542720 GLIBC_INCLUDE="/mnt/gvfs/third-party2/glibc/$GLIBC_REV/2.20/gcc-4.9-glibc-2.20/99df8fc/include" GLIBC_LIBS=" -L /mnt/gvfs/third-party2/glibc/$GLIBC_REV/2.20/gcc-4.9-glibc-2.20/99df8fc/lib" -# snappy and zlib depend are bundled with MongoDB so we wan't to pick up the bundled headers when -# building for it and disable block compressors supported by RocksDB but not used by MongoDB. - -if [[ -n $ROCKSDB_FOR_MONGO ]]; then - -MONGO_SRC="$ROCKSDB_FOR_MONGO/src/third_party" -SNAPPY_INCLUDE=" -I $MONGO_SRC/snappy-1.1.2" -CFLAGS+=" -DSNAPPY" -ZLIB_INCLUDE=" -I $MONGO_SRC/zlib-1.2.8" -CFLAGS+=" -DZLIB" - -else - SNAPPY_INCLUDE=" -I /mnt/gvfs/third-party2/snappy/b0f269b3ca47770121aa159b99e1d8d2ab260e1f/1.0.3/gcc-4.9-glibc-2.20/c32916f/include/" if test -z $PIC_BUILD; then @@ -56,7 +43,6 @@ if test -z $PIC_BUILD; then LZ4_LIBS=" /mnt/gvfs/third-party2/lz4/79d2943e2dd7208a3e0b06cf95e9f85f05fe9e1b/r124/gcc-4.9-glibc-2.20/4230243/lib/liblz4.a" CFLAGS+=" -DLZ4" fi -fi # location of gflags headers and libraries GFLAGS_INCLUDE=" -I /mnt/gvfs/third-party2/gflags/0fa60e2b88de3e469db6c482d6e6dac72f5d65f9/1.6/gcc-4.9-glibc-2.20/4230243/include/" @@ -125,7 +111,7 @@ else fi CFLAGS+=" $DEPS_INCLUDE" -CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT" +CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE" CXXFLAGS+=" $CFLAGS" EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $GFLAGS_LIBS $NUMA_LIB" diff --git a/build_tools/fbcode_config4.8.1.sh b/build_tools/fbcode_config4.8.1.sh index 46bfe1330..cfe500d40 100644 --- a/build_tools/fbcode_config4.8.1.sh +++ b/build_tools/fbcode_config4.8.1.sh @@ -87,13 +87,14 @@ else fi CFLAGS+=" $DEPS_INCLUDE" -CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT" +CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE" CFLAGS+=" -DSNAPPY -DGFLAGS=google -DZLIB -DBZIP2 -DLZ4 -DNUMA" CXXFLAGS+=" $CFLAGS" EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $GFLAGS_LIBS $NUMA_LIB" EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.8.1-glibc-2.17/lib/ld.so" EXEC_LDFLAGS+=" $LIBUNWIND" +EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.8.1-glibc-2.17/lib" PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++" diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 672d906e4..e70f916f3 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -98,10 +98,12 @@ class ColumnFamilyTest : public testing::Test { &db_); } +#ifndef ROCKSDB_LITE // ReadOnlyDB is not supported void AssertOpenReadOnly(std::vector cf, std::vector options = {}) { ASSERT_OK(OpenReadOnly(cf, options)); } +#endif // !ROCKSDB_LITE void Open(std::vector cf, @@ -186,10 +188,28 @@ class ColumnFamilyTest : public testing::Test { } void WaitForFlush(int cf) { +#ifndef ROCKSDB_LITE // TEST functions are not supported in lite ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf])); +#endif // !ROCKSDB_LITE } - void WaitForCompaction() { ASSERT_OK(dbfull()->TEST_WaitForCompact()); } + void WaitForCompaction() { +#ifndef ROCKSDB_LITE // TEST functions are not supported in lite + ASSERT_OK(dbfull()->TEST_WaitForCompact()); +#endif // !ROCKSDB_LITE + } + + uint64_t MaxTotalInMemoryState() { +#ifndef ROCKSDB_LITE + return dbfull()->TEST_MaxTotalInMemoryState(); +#else + return 0; +#endif // !ROCKSDB_LITE + } + + void AssertMaxTotalInMemoryState(uint64_t value) { + ASSERT_EQ(value, MaxTotalInMemoryState()); + } Status Put(int cf, const std::string& key, const std::string& value) { return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value)); @@ -229,6 +249,7 @@ class ColumnFamilyTest : public testing::Test { "rocksdb.num-files-at-level" + ToString(level)); } +#ifndef ROCKSDB_LITE // Return spread of files per level std::string FilesPerLevel(int cf) { std::string result; @@ -245,12 +266,27 @@ class ColumnFamilyTest : public testing::Test { result.resize(last_non_zero_offset); return result; } +#endif + void AssertFilesPerLevel(const std::string& value, int cf) { +#ifndef ROCKSDB_LITE + ASSERT_EQ(value, FilesPerLevel(cf)); +#endif + } + +#ifndef ROCKSDB_LITE // GetLiveFilesMetaData is not supported int CountLiveFiles() { std::vector metadata; db_->GetLiveFilesMetaData(&metadata); return static_cast(metadata.size()); } +#endif // !ROCKSDB_LITE + + void AssertCountLiveFiles(int expected_value) { +#ifndef ROCKSDB_LITE + ASSERT_EQ(expected_value, CountLiveFiles()); +#endif + } // Do n memtable flushes, each of which produces an sstable // covering the range [small,large]. @@ -263,6 +299,7 @@ class ColumnFamilyTest : public testing::Test { } } +#ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported int CountLiveLogFiles() { int micros_wait_for_log_deletion = 20000; env_->SleepForMicroseconds(micros_wait_for_log_deletion); @@ -289,15 +326,25 @@ class ColumnFamilyTest : public testing::Test { } } return ret; + return 0; + } +#endif // !ROCKSDB_LITE + + void AssertCountLiveLogFiles(int value) { +#ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported + ASSERT_EQ(value, CountLiveLogFiles()); +#endif // !ROCKSDB_LITE } void AssertNumberOfImmutableMemtables(std::vector num_per_cf) { assert(num_per_cf.size() == handles_.size()); +#ifndef ROCKSDB_LITE // GetProperty is not supported in lite for (size_t i = 0; i < num_per_cf.size(); ++i) { ASSERT_EQ(num_per_cf[i], GetProperty(static_cast(i), "rocksdb.num-immutable-mem-table")); } +#endif // !ROCKSDB_LITE } void CopyFile(const std::string& source, const std::string& destination, @@ -410,10 +457,10 @@ TEST_F(ColumnFamilyTest, DropTest) { } ASSERT_EQ("bar1", Get(1, "1")); - ASSERT_EQ(CountLiveFiles(), 1); + AssertCountLiveFiles(1); DropColumnFamilies({1}); // make sure that all files are deleted when we drop the column family - ASSERT_EQ(CountLiveFiles(), 0); + AssertCountLiveFiles(0); Destroy(); } } @@ -554,10 +601,9 @@ TEST_F(ColumnFamilyTest, FlushTest) { for (int i = 0; i < 3; ++i) { uint64_t max_total_in_memory_state = - dbfull()->TEST_MaxTotalInMemoryState(); + MaxTotalInMemoryState(); Flush(i); - ASSERT_EQ(dbfull()->TEST_MaxTotalInMemoryState(), - max_total_in_memory_state); + AssertMaxTotalInMemoryState(max_total_in_memory_state); } ASSERT_OK(Put(1, "foofoo", "bar")); ASSERT_OK(Put(0, "foofoo", "bar")); @@ -592,7 +638,7 @@ TEST_F(ColumnFamilyTest, LogDeletionTest) { // Each bracket is one log file. if number is in (), it means // we don't need it anymore (it's been flushed) // [] - ASSERT_EQ(CountLiveLogFiles(), 0); + AssertCountLiveLogFiles(0); PutRandomData(0, 1, 100); // [0] PutRandomData(1, 1, 100); @@ -600,53 +646,53 @@ TEST_F(ColumnFamilyTest, LogDeletionTest) { PutRandomData(1, 1000, 100); WaitForFlush(1); // [0, (1)] [1] - ASSERT_EQ(CountLiveLogFiles(), 2); + AssertCountLiveLogFiles(2); PutRandomData(0, 1, 100); // [0, (1)] [0, 1] - ASSERT_EQ(CountLiveLogFiles(), 2); + AssertCountLiveLogFiles(2); PutRandomData(2, 1, 100); // [0, (1)] [0, 1, 2] PutRandomData(2, 1000, 100); WaitForFlush(2); // [0, (1)] [0, 1, (2)] [2] - ASSERT_EQ(CountLiveLogFiles(), 3); + AssertCountLiveLogFiles(3); PutRandomData(2, 1000, 100); WaitForFlush(2); // [0, (1)] [0, 1, (2)] [(2)] [2] - ASSERT_EQ(CountLiveLogFiles(), 4); + AssertCountLiveLogFiles(4); PutRandomData(3, 1, 100); // [0, (1)] [0, 1, (2)] [(2)] [2, 3] PutRandomData(1, 1, 100); // [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3] - ASSERT_EQ(CountLiveLogFiles(), 4); + AssertCountLiveLogFiles(4); PutRandomData(1, 1000, 100); WaitForFlush(1); // [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1] - ASSERT_EQ(CountLiveLogFiles(), 5); + AssertCountLiveLogFiles(5); PutRandomData(0, 1000, 100); WaitForFlush(0); // [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0] // delete obsolete logs --> // [(1), 2, 3] [1, (0)] [0] - ASSERT_EQ(CountLiveLogFiles(), 3); + AssertCountLiveLogFiles(3); PutRandomData(0, 1000, 100); WaitForFlush(0); // [(1), 2, 3] [1, (0)], [(0)] [0] - ASSERT_EQ(CountLiveLogFiles(), 4); + AssertCountLiveLogFiles(4); PutRandomData(1, 1000, 100); WaitForFlush(1); // [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1] - ASSERT_EQ(CountLiveLogFiles(), 5); + AssertCountLiveLogFiles(5); PutRandomData(2, 1000, 100); WaitForFlush(2); // [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2] - ASSERT_EQ(CountLiveLogFiles(), 6); + AssertCountLiveLogFiles(6); PutRandomData(3, 1000, 100); WaitForFlush(3); // [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3] // delete obsolete logs --> // [0, (1)] [1, (2)], [2, (3)] [3] - ASSERT_EQ(CountLiveLogFiles(), 4); + AssertCountLiveLogFiles(4); Close(); } @@ -681,72 +727,73 @@ TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) { PutRandomData(0, 100, 1000); WaitForFlush(0); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); - ASSERT_EQ(CountLiveLogFiles(), 1); + AssertCountLiveLogFiles(1); PutRandomData(1, 200, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 0, 0}); - ASSERT_EQ(CountLiveLogFiles(), 2); + AssertCountLiveLogFiles(2); PutRandomData(2, 1000, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 1, 0}); - ASSERT_EQ(CountLiveLogFiles(), 3); + AssertCountLiveLogFiles(3); PutRandomData(2, 1000, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 2, 0}); - ASSERT_EQ(CountLiveLogFiles(), 4); + AssertCountLiveLogFiles(4); PutRandomData(3, 90, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 2, 1}); - ASSERT_EQ(CountLiveLogFiles(), 5); + AssertCountLiveLogFiles(5); PutRandomData(3, 90, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 2, 2}); - ASSERT_EQ(CountLiveLogFiles(), 6); + AssertCountLiveLogFiles(6); PutRandomData(3, 90, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 2, 3}); - ASSERT_EQ(CountLiveLogFiles(), 7); + AssertCountLiveLogFiles(7); PutRandomData(0, 100, 1000); WaitForFlush(0); AssertNumberOfImmutableMemtables({0, 1, 2, 3}); - ASSERT_EQ(CountLiveLogFiles(), 8); + AssertCountLiveLogFiles(8); PutRandomData(2, 100, 10000); WaitForFlush(2); AssertNumberOfImmutableMemtables({0, 1, 0, 3}); - ASSERT_EQ(CountLiveLogFiles(), 9); + AssertCountLiveLogFiles(9); PutRandomData(3, 90, 1000); WaitForFlush(3); AssertNumberOfImmutableMemtables({0, 1, 0, 0}); - ASSERT_EQ(CountLiveLogFiles(), 10); + AssertCountLiveLogFiles(10); PutRandomData(3, 90, 1000); env_->SleepForMicroseconds(micros_wait_for_flush); AssertNumberOfImmutableMemtables({0, 1, 0, 1}); - ASSERT_EQ(CountLiveLogFiles(), 11); + AssertCountLiveLogFiles(11); PutRandomData(1, 200, 1000); WaitForFlush(1); AssertNumberOfImmutableMemtables({0, 0, 0, 1}); - ASSERT_EQ(CountLiveLogFiles(), 5); + AssertCountLiveLogFiles(5); PutRandomData(3, 240, 1000); WaitForFlush(3); PutRandomData(3, 300, 1000); WaitForFlush(3); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); - ASSERT_EQ(CountLiveLogFiles(), 12); + AssertCountLiveLogFiles(12); PutRandomData(0, 100, 1000); WaitForFlush(0); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); - ASSERT_EQ(CountLiveLogFiles(), 12); + AssertCountLiveLogFiles(12); PutRandomData(2, 3*100, 10000); WaitForFlush(2); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); - ASSERT_EQ(CountLiveLogFiles(), 12); + AssertCountLiveLogFiles(12); PutRandomData(1, 2*200, 1000); WaitForFlush(1); AssertNumberOfImmutableMemtables({0, 0, 0, 0}); - ASSERT_EQ(CountLiveLogFiles(), 7); + AssertCountLiveLogFiles(7); Close(); } +#ifndef ROCKSDB_LITE // Cuckoo is not supported in lite TEST_F(ColumnFamilyTest, MemtableNotSupportSnapshot) { Open(); auto* s1 = dbfull()->GetSnapshot(); @@ -767,6 +814,7 @@ TEST_F(ColumnFamilyTest, MemtableNotSupportSnapshot) { ASSERT_TRUE(s3 == nullptr); Close(); } +#endif // !ROCKSDB_LITE TEST_F(ColumnFamilyTest, DifferentMergeOperators) { Open(); @@ -815,6 +863,7 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) { default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; + one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; @@ -832,14 +881,14 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) { for (int i = 0; i < one.level0_file_num_compaction_trigger - 1; ++i) { PutRandomData(1, 11, 10000); WaitForFlush(1); - ASSERT_EQ(ToString(i + 1), FilesPerLevel(1)); + AssertFilesPerLevel(ToString(i + 1), 1); } // SETUP column family "two" -- level style with 4 levels for (int i = 0; i < two.level0_file_num_compaction_trigger - 1; ++i) { PutRandomData(2, 15, 10000); WaitForFlush(2); - ASSERT_EQ(ToString(i + 1), FilesPerLevel(2)); + AssertFilesPerLevel(ToString(i + 1), 2); } // TRIGGER compaction "one" @@ -852,16 +901,17 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) { WaitForCompaction(); // VERIFY compaction "one" - ASSERT_EQ("1", FilesPerLevel(1)); + AssertFilesPerLevel("1", 1); // VERIFY compaction "two" - ASSERT_EQ("0,1", FilesPerLevel(2)); + AssertFilesPerLevel("0,1", 2); CompactAll(2); - ASSERT_EQ("0,1", FilesPerLevel(2)); + AssertFilesPerLevel("0,1", 2); Close(); } +#ifndef ROCKSDB_LITE // Tailing interator not supported namespace { std::string IterStatus(Iterator* iter) { std::string result; @@ -918,7 +968,9 @@ TEST_F(ColumnFamilyTest, NewIteratorsTest) { Destroy(); } } +#endif // !ROCKSDB_LITE +#ifndef ROCKSDB_LITE // ReadOnlyDB is not supported TEST_F(ColumnFamilyTest, ReadOnlyDBTest) { Open(); CreateColumnFamiliesAndReopen({"one", "two", "three", "four"}); @@ -968,6 +1020,7 @@ TEST_F(ColumnFamilyTest, ReadOnlyDBTest) { s = OpenReadOnly({"one", "four"}); ASSERT_TRUE(!s.ok()); } +#endif // !ROCKSDB_LITE TEST_F(ColumnFamilyTest, DontRollEmptyLogs) { Open(); @@ -983,7 +1036,7 @@ TEST_F(ColumnFamilyTest, DontRollEmptyLogs) { } for (int i = 0; i < 4; ++i) { - dbfull()->TEST_WaitForFlushMemTable(handles_[i]); + WaitForFlush(i); } int total_new_writable_files = env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start; @@ -1007,7 +1060,8 @@ TEST_F(ColumnFamilyTest, FlushStaleColumnFamilies) { for (int i = 0; i < 2; ++i) { PutRandomData(0, 100, 1000); // flush WaitForFlush(0); - ASSERT_EQ(i + 1, CountLiveFiles()); + + AssertCountLiveFiles(i + 1); } // third flush. now, CF [two] should be detected as stale and flushed // column family 1 should not be flushed since it's empty @@ -1016,7 +1070,7 @@ TEST_F(ColumnFamilyTest, FlushStaleColumnFamilies) { WaitForFlush(2); // 3 files for default column families, 1 file for column family [two], zero // files for column family [one], because it's empty - ASSERT_EQ(4, CountLiveFiles()); + AssertCountLiveFiles(4); Close(); } diff --git a/db/db_impl.cc b/db/db_impl.cc index e5bc6a9bc..260b1baef 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -270,18 +270,18 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) LogFlush(db_options_.info_log); } -// Will only lock the mutex_ and wait for completion if wait is true +// Will lock the mutex_, will wait for completion if wait is true void DBImpl::CancelAllBackgroundWork(bool wait) { + InstrumentedMutexLock l(&mutex_); shutting_down_.store(true, std::memory_order_release); + bg_cv_.SignalAll(); if (!wait) { return; } // Wait for background work to finish - mutex_.Lock(); while (bg_compaction_scheduled_ || bg_flush_scheduled_) { bg_cv_.Wait(); } - mutex_.Unlock(); } DBImpl::~DBImpl() { @@ -299,12 +299,11 @@ DBImpl::~DBImpl() { } versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); } - // CancelAllBackgroundWork called with false means we just set the - // shutdown marker, while holding the mutex_ here. After which we - // do a variant of the waiting after we release the lock and unschedule work + mutex_.Unlock(); + // CancelAllBackgroundWork called with false means we just set the shutdown + // marker. After this we do a variant of the waiting and unschedule work // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) CancelAllBackgroundWork(false); - mutex_.Unlock(); int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); mutex_.Lock(); @@ -2036,6 +2035,9 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) { + if (shutting_down_.load(std::memory_order_acquire)) { + return Status::ShutdownInProgress(); + } bg_cv_.Wait(); } if (!bg_error_.ok()) { diff --git a/db/db_iter.cc b/db/db_iter.cc index ce75f4386..7ed00365e 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -350,6 +350,9 @@ void DBIter::MergeValuesNewToOld() { void DBIter::Prev() { assert(valid_); if (direction_ == kForward) { + if (!iter_->Valid()) { + iter_->SeekToLast(); + } FindPrevUserKey(); direction_ = kReverse; } @@ -553,7 +556,7 @@ void DBIter::FindNextUserKey() { ParsedInternalKey ikey; FindParseableKey(&ikey, kForward); while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) { + user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { iter_->Next(); FindParseableKey(&ikey, kForward); } @@ -568,7 +571,7 @@ void DBIter::FindPrevUserKey() { ParsedInternalKey ikey; FindParseableKey(&ikey, kReverse); while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) { + user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) >= 0) { if (num_skipped >= max_skip_) { num_skipped = 0; IterKey last_key; @@ -664,7 +667,28 @@ void DBIter::SeekToLast() { PERF_TIMER_GUARD(seek_internal_seek_time); iter_->SeekToLast(); } + // When the iterate_upper_bound is set to a value, + // it will seek to the last key before the + // ReadOptions.iterate_upper_bound + if (iter_->Valid() && iterate_upper_bound_ != nullptr) { + saved_key_.SetKey(*iterate_upper_bound_); + std::string last_key; + AppendInternalKey(&last_key, + ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber, + kValueTypeForSeek)); + + iter_->Seek(last_key); + if (!iter_->Valid()) { + iter_->SeekToLast(); + } else { + iter_->Prev(); + if (!iter_->Valid()) { + valid_ = false; + return; + } + } + } PrevInternal(); } diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 18b38ac5d..e5c58e4d9 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -11,6 +11,7 @@ #include "db/dbformat.h" #include "rocksdb/comparator.h" #include "rocksdb/options.h" +#include "rocksdb/perf_context.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" #include "db/db_iter.h" @@ -184,6 +185,272 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { db_iter->Next(); ASSERT_TRUE(!db_iter->Valid()); } + // Test to check the SeekToLast() with iterate_upper_bound not set + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->Finish(); + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations)); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + } + + // Test to check the SeekToLast() with iterate_upper_bound set + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("d", "val_d"); + internal_iter->AddPut("e", "val_e"); + internal_iter->AddPut("f", "val_f"); + internal_iter->Finish(); + + Slice prefix("d"); + + ReadOptions ro; + ro.iterate_upper_bound = &prefix; + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound)); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + } + // Test to check the SeekToLast() iterate_upper_bound set to a key that + // is not Put yet + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("d", "val_d"); + internal_iter->Finish(); + + Slice prefix("z"); + + ReadOptions ro; + ro.iterate_upper_bound = &prefix; + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound)); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "d"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "d"); + + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + } + // Test to check the SeekToLast() with iterate_upper_bound set to the + // first key + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddPut("b", "val_b"); + internal_iter->Finish(); + + Slice prefix("a"); + + ReadOptions ro; + ro.iterate_upper_bound = &prefix; + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound)); + + db_iter->SeekToLast(); + ASSERT_TRUE(!db_iter->Valid()); + } + // Test case to check SeekToLast with iterate_upper_bound set + // (same key put may times - SeekToLast should start with the + // maximum sequence id of the upper bound) + + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("c", "val_c"); + internal_iter->AddPut("c", "val_c"); + internal_iter->Finish(); + + Slice prefix("c"); + + ReadOptions ro; + ro.iterate_upper_bound = &prefix; + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 7, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound)); + + SetPerfLevel(kEnableCount); + ASSERT_TRUE(GetPerfLevel() == kEnableCount); + + perf_context.Reset(); + db_iter->SeekToLast(); + + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(static_cast(perf_context.internal_key_skipped_count), 1); + ASSERT_EQ(db_iter->key().ToString(), "b"); + + SetPerfLevel(kDisable); + } + // Test to check the SeekToLast() with the iterate_upper_bound set + // (Checking the value of the key which has sequence ids greater than + // and less that the iterator's sequence id) + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + + internal_iter->AddPut("a", "val_a1"); + internal_iter->AddPut("a", "val_a2"); + internal_iter->AddPut("b", "val_b1"); + internal_iter->AddPut("c", "val_c1"); + internal_iter->AddPut("c", "val_c2"); + internal_iter->AddPut("c", "val_c3"); + internal_iter->AddPut("b", "val_b2"); + internal_iter->AddPut("d", "val_d1"); + internal_iter->Finish(); + + Slice prefix("c"); + + ReadOptions ro; + ro.iterate_upper_bound = &prefix; + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 4, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound)); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "b"); + ASSERT_EQ(db_iter->value().ToString(), "val_b1"); + } + + // Test to check the SeekToLast() with the iterate_upper_bound set to the + // key that is deleted + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddDeletion("a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->Finish(); + + Slice prefix("a"); + + ReadOptions ro; + ro.iterate_upper_bound = &prefix; + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound)); + + db_iter->SeekToLast(); + ASSERT_TRUE(!db_iter->Valid()); + } + // Test to check the SeekToLast() with the iterate_upper_bound set + // (Deletion cases) + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddDeletion("b"); + internal_iter->AddPut("c", "val_c"); + internal_iter->Finish(); + + Slice prefix("c"); + + ReadOptions ro; + ro.iterate_upper_bound = &prefix; + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound)); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + + db_iter->Next(); + ASSERT_TRUE(!db_iter->Valid()); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + } + // Test to check the SeekToLast() with iterate_upper_bound set + // (Deletion cases - Lot of internal keys after the upper_bound + // is deleted) + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "val_a"); + internal_iter->AddPut("b", "val_b"); + internal_iter->AddDeletion("c"); + internal_iter->AddDeletion("d"); + internal_iter->AddDeletion("e"); + internal_iter->AddDeletion("f"); + internal_iter->AddDeletion("g"); + internal_iter->AddDeletion("h"); + internal_iter->Finish(); + + Slice prefix("c"); + + ReadOptions ro; + ro.iterate_upper_bound = &prefix; + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 7, options.max_sequential_skip_in_iterations, ro.iterate_upper_bound)); + + SetPerfLevel(kEnableCount); + ASSERT_TRUE(GetPerfLevel() == kEnableCount); + + perf_context.Reset(); + db_iter->SeekToLast(); + + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(static_cast(perf_context.internal_delete_skipped_count), 0); + ASSERT_EQ(db_iter->key().ToString(), "b"); + + SetPerfLevel(kDisable); + } { TestIterator* internal_iter = new TestIterator(BytewiseComparator()); @@ -1401,6 +1668,81 @@ TEST_F(DBIteratorTest, DBIterator8) { ASSERT_EQ(db_iter->value().ToString(), "0"); } +TEST_F(DBIteratorTest, DBIterator9) { + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + { + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddMerge("a", "merge_1"); + internal_iter->AddMerge("a", "merge_2"); + internal_iter->AddMerge("b", "merge_3"); + internal_iter->AddMerge("b", "merge_4"); + internal_iter->AddMerge("d", "merge_5"); + internal_iter->AddMerge("d", "merge_6"); + internal_iter->Finish(); + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations)); + + db_iter->SeekToLast(); + ASSERT_TRUE(db_iter->Valid()); + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "b"); + ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4"); + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "d"); + ASSERT_EQ(db_iter->value().ToString(), "merge_5,merge_6"); + + db_iter->Seek("b"); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "b"); + ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4"); + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "a"); + ASSERT_EQ(db_iter->value().ToString(), "merge_1,merge_2"); + + db_iter->Seek("c"); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "d"); + ASSERT_EQ(db_iter->value().ToString(), "merge_5,merge_6"); + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "b"); + ASSERT_EQ(db_iter->value().ToString(), "merge_3,merge_4"); + } +} + +TEST_F(DBIteratorTest, DBIterator10) { + Options options; + + TestIterator* internal_iter = new TestIterator(BytewiseComparator()); + internal_iter->AddPut("a", "1"); + internal_iter->AddPut("b", "2"); + internal_iter->AddPut("c", "3"); + internal_iter->AddPut("d", "4"); + internal_iter->Finish(); + + std::unique_ptr db_iter(NewDBIterator( + env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, + 10, options.max_sequential_skip_in_iterations)); + + db_iter->Seek("c"); + ASSERT_TRUE(db_iter->Valid()); + db_iter->Prev(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "b"); + ASSERT_EQ(db_iter->value().ToString(), "2"); + + db_iter->Next(); + ASSERT_TRUE(db_iter->Valid()); + ASSERT_EQ(db_iter->key().ToString(), "c"); + ASSERT_EQ(db_iter->value().ToString(), "3"); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test.cc b/db/db_test.cc index 787b4d659..2fb6226f2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -78,21 +78,42 @@ static std::string RandomString(Random* rnd, int len) { namespace anon { class AtomicCounter { private: + Env* env_; port::Mutex mu_; + port::CondVar cond_count_; int count_; public: - AtomicCounter() : count_(0) { } + AtomicCounter(Env* env = NULL) : env_(env), cond_count_(&mu_), count_(0) {} void Increment() { MutexLock l(&mu_); count_++; + cond_count_.SignalAll(); } int Read() { MutexLock l(&mu_); return count_; } + bool WaitFor(int count) { + MutexLock l(&mu_); + + uint64_t start = env_->NowMicros(); + while (count_ < count) { + uint64_t now = env_->NowMicros(); + cond_count_.TimedWait(now + /*1s*/ 1 * 000 * 000); + if (env_->NowMicros() - start > /*10s*/ 10 * 000 * 000) { + return false; + } + if (count_ < count) { + GTEST_LOG_(WARNING) << "WaitFor is taking more time than usual"; + } + } + + return true; + } void Reset() { MutexLock l(&mu_); count_ = 0; + cond_count_.SignalAll(); } }; @@ -165,7 +186,11 @@ class SpecialEnv : public EnvWrapper { bool no_sleep_; explicit SpecialEnv(Env* base) - : EnvWrapper(base), rnd_(301), addon_time_(0), no_sleep_(false) { + : EnvWrapper(base), + rnd_(301), + sleep_counter_(this), + addon_time_(0), + no_sleep_(false) { delay_sstable_sync_.store(false, std::memory_order_release); drop_writes_.store(false, std::memory_order_release); no_space_.store(false, std::memory_order_release); @@ -7537,6 +7562,7 @@ TEST_F(DBTest, DropWrites) { // Force out-of-space errors env_->drop_writes_.store(true, std::memory_order_release); env_->sleep_counter_.Reset(); + env_->no_sleep_ = true; for (int i = 0; i < 5; i++) { if (option_config_ != kUniversalCompactionMultiLevel) { for (int level = 0; level < dbfull()->NumberLevels(); level++) { @@ -7559,7 +7585,7 @@ TEST_F(DBTest, DropWrites) { ASSERT_LT(CountFiles(), num_files + 3); // Check that compaction attempts slept after errors - ASSERT_GE(env_->sleep_counter_.Read(), 5); + ASSERT_TRUE(env_->sleep_counter_.WaitFor(5)); } while (ChangeCompactOptions()); } @@ -10817,6 +10843,19 @@ TEST_F(DBTest, DBIteratorBoundTest) { // should stop here... ASSERT_TRUE(!iter->Valid()); } + // Testing SeekToLast with iterate_upper_bound set + { + ReadOptions ro; + + Slice prefix("foo"); + ro.iterate_upper_bound = &prefix; + + std::unique_ptr iter(db_->NewIterator(ro)); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("a")), 0); + } // prefix is the first letter of the key options.prefix_extractor.reset(NewFixedPrefixTransform(1)); @@ -11316,6 +11355,17 @@ TEST_F(DBTest, PreShutdownManualCompaction) { } } +TEST_F(DBTest, PreShutdownFlush) { + Options options = CurrentOptions(); + options.max_background_flushes = 0; + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_OK(Put(1, "key", "value")); + CancelAllBackgroundWork(db_); + Status s = + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); + ASSERT_TRUE(s.IsShutdownInProgress()); +} + TEST_F(DBTest, PreShutdownMultipleCompaction) { const int kTestKeySize = 16; const int kTestValueSize = 984; @@ -14078,6 +14128,29 @@ TEST_F(DBTest, RowCache) { ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1); } +TEST_F(DBTest, PrevAfterMerge) { + Options options; + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreatePutOperator(); + DestroyAndReopen(options); + + // write three entries with different keys using Merge() + WriteOptions wopts; + db_->Merge(wopts, "1", "data1"); + db_->Merge(wopts, "2", "data2"); + db_->Merge(wopts, "3", "data3"); + + std::unique_ptr it(db_->NewIterator(ReadOptions())); + + it->Seek("2"); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("2", it->key().ToString()); + + it->Prev(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("1", it->key().ToString()); +} + } // namespace rocksdb #endif diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 6b38b299e..a10d46c4b 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -81,7 +81,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, ParseInternalKey(keys_.back(), &orig_ikey); bool hit_the_next_user_key = false; - std::string merge_result; // Temporary value for merge results if (steps) { ++(*steps); } @@ -118,6 +117,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => change the entry type to kTypeValue for keys_.back() // We are done! Return a success if the merge passes. + std::string merge_result; Status s = TimedFullMerge(ikey.user_key, nullptr, operands_, user_merge_operator_, stats, env_, logger_, &merge_result); @@ -130,7 +130,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key[0], original_key.size(), orig_ikey.sequence, orig_ikey.type); - swap(operands_.back(), merge_result); + operands_.back() = std::move(merge_result); } // move iter to the next entry (before doing anything else) @@ -148,6 +148,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => change the entry type to kTypeValue for keys_.back() // We are done! Success! const Slice val = iter->value(); + std::string merge_result; Status s = TimedFullMerge(ikey.user_key, &val, operands_, user_merge_operator_, stats, env_, logger_, &merge_result); @@ -160,7 +161,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key[0], original_key.size(), orig_ikey.sequence, orig_ikey.type); - swap(operands_.back(), merge_result); + operands_.back() = std::move(merge_result); } // move iter to the next entry @@ -210,6 +211,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, assert(kTypeMerge == orig_ikey.type); assert(operands_.size() >= 1); assert(operands_.size() == keys_.size()); + std::string merge_result; { StopWatchNano timer(env_, stats != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); @@ -224,7 +226,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, UpdateInternalKey(&original_key[0], original_key.size(), orig_ikey.sequence, orig_ikey.type); - swap(operands_.back(),merge_result); + operands_.back() = std::move(merge_result); } else { RecordTick(stats, NUMBER_MERGE_FAILURES); // Do nothing if not success_. Leave keys() and operands() as they are. @@ -237,6 +239,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, if (operands_.size() >= 2 && operands_.size() >= min_partial_merge_operands_) { bool merge_success = false; + std::string merge_result; { StopWatchNano timer(env_, stats != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); @@ -251,7 +254,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // Merging of operands (associative merge) was successful. // Replace operands with the merge result operands_.clear(); - operands_.push_front(std::move(merge_result)); + operands_.emplace_front(std::move(merge_result)); keys_.erase(keys_.begin(), keys_.end() - 1); } } diff --git a/db/merge_operator.cc b/db/merge_operator.cc index a14df8a87..c6645a910 100644 --- a/db/merge_operator.cc +++ b/db/merge_operator.cc @@ -20,11 +20,11 @@ bool MergeOperator::PartialMergeMulti(const Slice& key, Logger* logger) const { assert(operand_list.size() >= 2); // Simply loop through the operands - std::string temp_value; Slice temp_slice(operand_list[0]); for (size_t i = 1; i < operand_list.size(); ++i) { auto& operand = operand_list[i]; + std::string temp_value; if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) { return false; } @@ -48,9 +48,9 @@ bool AssociativeMergeOperator::FullMerge( // Simply loop through the operands Slice temp_existing; - std::string temp_value; for (const auto& operand : operand_list) { Slice value(operand); + std::string temp_value; if (!Merge(key, existing_value, value, &temp_value, logger)) { return false; } diff --git a/db/merge_test.cc b/db/merge_test.cc index 80aed6095..7502b6718 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -7,6 +7,7 @@ #include #include +#include "port/stack_trace.h" #include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" @@ -41,6 +42,7 @@ class CountMergeOperator : public AssociativeMergeOperator { const Slice& value, std::string* new_value, Logger* logger) const override { + assert(new_value->empty()); ++num_merge_operator_calls; if (existing_value == nullptr) { new_value->assign(value.data(), value.size()); @@ -59,6 +61,7 @@ class CountMergeOperator : public AssociativeMergeOperator { const std::deque& operand_list, std::string* new_value, Logger* logger) const override { + assert(new_value->empty()); ++num_partial_merge_calls; return mergeOperator_->PartialMergeMulti(key, operand_list, new_value, logger); @@ -498,6 +501,7 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) { int main(int argc, char *argv[]) { //TODO: Make this test like a general rocksdb unit-test + rocksdb::port::InstallStackTraceHandler(); runTest(argc, test::TmpDir() + "/merge_testdb"); runTest(argc, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database printf("Passed all tests!\n"); diff --git a/db/version_set.cc b/db/version_set.cc index 397e63fda..f7c2fc5cd 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -695,12 +695,14 @@ void Version::AddIterators(const ReadOptions& read_options, return; } + auto* arena = merge_iter_builder->GetArena(); + // Merge all level zero files together since they may overlap for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { const auto& file = storage_info_.LevelFilesBrief(0).files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, - false, merge_iter_builder->GetArena())); + false, arena)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -708,14 +710,16 @@ void Version::AddIterators(const ReadOptions& read_options, // lazily. for (int level = 1; level < storage_info_.num_non_empty_levels(); level++) { if (storage_info_.LevelFilesBrief(level).num_files != 0) { - merge_iter_builder->AddIterator(NewTwoLevelIterator( - new LevelFileIteratorState( - cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), false /* for_compaction */, - cfd_->ioptions()->prefix_extractor != nullptr), - new LevelFileNumIterator(cfd_->internal_comparator(), - &storage_info_.LevelFilesBrief(level)), - merge_iter_builder->GetArena())); + auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); + auto* state = new (mem) LevelFileIteratorState( + cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), false /* for_compaction */, + cfd_->ioptions()->prefix_extractor != nullptr); + mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); + auto* first_level_iter = new (mem) LevelFileNumIterator( + cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); + merge_iter_builder->AddIterator( + NewTwoLevelIterator(state, first_level_iter, arena, false)); } } } diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index 2ae64c1bc..c17dd8e71 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -54,7 +54,8 @@ class MergeOperator { // merge operation semantics // existing: (IN) null indicates that the key does not exist before this op // operand_list:(IN) the sequence of merge operations to apply, front() first. - // new_value:(OUT) Client is responsible for filling the merge result here + // new_value:(OUT) Client is responsible for filling the merge result here. + // The string that new_value is pointing to will be empty. // logger: (IN) Client could use this to log errors during merge. // // Return true on success. @@ -80,6 +81,8 @@ class MergeOperator { // DB::Merge(key, *new_value) would yield the same result as a call // to DB::Merge(key, left_op) followed by DB::Merge(key, right_op). // + // The string that new_value is pointing to will be empty. + // // The default implementation of PartialMergeMulti will use this function // as a helper, for backward compatibility. Any successor class of // MergeOperator should either implement PartialMerge or PartialMergeMulti, @@ -116,6 +119,8 @@ class MergeOperator { // the same result as subquential individual calls to DB::Merge(key, operand) // for each operand in operand_list from front() to back(). // + // The string that new_value is pointing to will be empty. + // // The PartialMergeMulti function will be called only when the list of // operands are long enough. The minimum amount of operands that will be // passed to the function are specified by the "min_partial_merge_operands" @@ -147,7 +152,8 @@ class AssociativeMergeOperator : public MergeOperator { // key: (IN) The key that's associated with this merge operation. // existing_value:(IN) null indicates the key does not exist before this op // value: (IN) the value to update/merge the existing_value with - // new_value: (OUT) Client is responsible for filling the merge result here + // new_value: (OUT) Client is responsible for filling the merge result + // here. The string that new_value is pointing to will be empty. // logger: (IN) Client could use this to log errors during merge. // // Return true on success. diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 56d01aaf3..82405276d 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -74,8 +74,13 @@ enum Tickers : uint32_t { NUMBER_KEYS_READ, // Number keys updated, if inplace update is enabled NUMBER_KEYS_UPDATED, - // Bytes written / read + // The number of uncompressed bytes issued by DB::Put(), DB::Delete(), + // DB::Merge(), and DB::Write(). BYTES_WRITTEN, + // The number of uncompressed bytes read from DB::Get(). It could be + // either from memtables, cache, or table files. + // For the number of logical bytes read from DB::MultiGet(), + // please use NUMBER_MULTIGET_BYTES_READ. BYTES_READ, NO_FILE_CLOSES, NO_FILE_OPENS, diff --git a/java/rocksjni/write_batch_with_index.cc b/java/rocksjni/write_batch_with_index.cc index 92f2ec068..7c57a0e06 100644 --- a/java/rocksjni/write_batch_with_index.cc +++ b/java/rocksjni/write_batch_with_index.cc @@ -368,11 +368,19 @@ void Java_org_rocksdb_WBWIRocksIterator_entry1( const rocksdb::WriteEntry& we = it->Entry(); jobject jwe = rocksdb::WBWIRocksIteratorJni::getWriteEntry(env, jobj); rocksdb::WriteEntryJni::setWriteType(env, jwe, we.type); - rocksdb::WriteEntryJni::setKey(env, jwe, &we.key); + + char* buf = new char[we.key.size()]; + memcpy(buf, we.key.data(), we.key.size()); + auto* key_slice = new rocksdb::Slice(buf, we.key.size()); + rocksdb::WriteEntryJni::setKey(env, jwe, key_slice); + if (we.type == rocksdb::kDeleteRecord || we.type == rocksdb::kLogDataRecord) { // set native handle of value slice to null if no value available - rocksdb::WriteEntryJni::setValue(env, jwe, NULL); + rocksdb::WriteEntryJni::setValue(env, jwe, nullptr); } else { - rocksdb::WriteEntryJni::setValue(env, jwe, &we.value); + char* value_buf = new char[we.value.size()]; + memcpy(value_buf, we.value.data(), we.value.size()); + auto* value_slice = new rocksdb::Slice(value_buf, we.value.size()); + rocksdb::WriteEntryJni::setValue(env, jwe, value_slice); } } diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index ae83683d6..31d2c5238 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -601,48 +601,73 @@ public class RocksDBTest { RocksDB db = null; Options opt = null; try { + final int NUM_KEYS_PER_L0_FILE = 100; + final int KEY_SIZE = 20; + final int VALUE_SIZE = 300; + final int L0_FILE_SIZE = + NUM_KEYS_PER_L0_FILE * (KEY_SIZE + VALUE_SIZE); + final int NUM_L0_FILES = 10; + final int TEST_SCALE = 5; + final int KEY_INTERVAL = 100; opt = new Options(). setCreateIfMissing(true). setCompactionStyle(CompactionStyle.LEVEL). - setNumLevels(4). - setWriteBufferSize(100<<10). - setLevelZeroFileNumCompactionTrigger(3). - setTargetFileSizeBase(200 << 10). - setTargetFileSizeMultiplier(1). - setMaxBytesForLevelBase(500 << 10). - setMaxBytesForLevelMultiplier(1). - setDisableAutoCompactions(false); - // open database - db = RocksDB.open(opt, - dbFolder.getRoot().getAbsolutePath()); - // fill database with key/value pairs - byte[] b = new byte[10000]; - for (int i = 0; i < 200; i++) { - rand.nextBytes(b); - db.put((String.valueOf(i)).getBytes(), b); - } - db.flush(new FlushOptions().setWaitForFlush(true)); - db.close(); - opt.setTargetFileSizeBase(Long.MAX_VALUE). + setNumLevels(5). + // a slightly bigger write buffer than L0 file + // so that we can ensure manual flush always + // go before background flush happens. + setWriteBufferSize(L0_FILE_SIZE * 2). + // Disable auto L0 -> L1 compaction + setLevelZeroFileNumCompactionTrigger(20). + setTargetFileSizeBase(L0_FILE_SIZE * 100). setTargetFileSizeMultiplier(1). - setMaxBytesForLevelBase(Long.MAX_VALUE). - setMaxBytesForLevelMultiplier(1). + // To disable auto compaction + setMaxBytesForLevelBase(NUM_L0_FILES * L0_FILE_SIZE * 100). + setMaxBytesForLevelMultiplier(2). setDisableAutoCompactions(true); - db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath()); + // fill database with key/value pairs + byte[] value = new byte[VALUE_SIZE]; + int int_key = 0; + for (int round = 0; round < 5; ++round) { + int initial_key = int_key; + for (int f = 1; f <= NUM_L0_FILES; ++f) { + for (int i = 0; i < NUM_KEYS_PER_L0_FILE; ++i) { + int_key += KEY_INTERVAL; + rand.nextBytes(value); - db.compactRange(true, 0, 0); - for (int i = 0; i < 4; i++) { - if (i == 0) { - assertThat( - db.getProperty("rocksdb.num-files-at-level" + i)). - isEqualTo("1"); - } else { + db.put(String.format("%020d", int_key).getBytes(), + value); + } + db.flush(new FlushOptions().setWaitForFlush(true)); + // Make sure we do create one more L0 files. assertThat( - db.getProperty("rocksdb.num-files-at-level" + i)). - isEqualTo("0"); + db.getProperty("rocksdb.num-files-at-level0")). + isEqualTo("" + f); } + + // Compact all L0 files we just created + db.compactRange( + String.format("%020d", initial_key).getBytes(), + String.format("%020d", int_key - 1).getBytes()); + // Making sure there isn't any L0 files. + assertThat( + db.getProperty("rocksdb.num-files-at-level0")). + isEqualTo("0"); + // Making sure there are some L1 files. + // Here we only use != 0 instead of a specific number + // as we don't want the test make any assumption on + // how compaction works. + assertThat( + db.getProperty("rocksdb.num-files-at-level1")). + isNotEqualTo("0"); + // Because we only compacted those keys we issued + // in this round, there shouldn't be any L1 -> L2 + // compaction. So we expect zero L2 files here. + assertThat( + db.getProperty("rocksdb.num-files-at-level2")). + isEqualTo("0"); } } finally { if (db != null) { @@ -662,6 +687,14 @@ public class RocksDBTest { List columnFamilyHandles = new ArrayList<>(); try { + final int NUM_KEYS_PER_L0_FILE = 100; + final int KEY_SIZE = 20; + final int VALUE_SIZE = 300; + final int L0_FILE_SIZE = + NUM_KEYS_PER_L0_FILE * (KEY_SIZE + VALUE_SIZE); + final int NUM_L0_FILES = 10; + final int TEST_SCALE = 5; + final int KEY_INTERVAL = 100; opt = new DBOptions(). setCreateIfMissing(true). setCreateMissingColumnFamilies(true); @@ -672,62 +705,73 @@ public class RocksDBTest { columnFamilyDescriptors.add(new ColumnFamilyDescriptor( "new_cf".getBytes(), new ColumnFamilyOptions(). - setDisableAutoCompactions(true). setCompactionStyle(CompactionStyle.LEVEL). - setNumLevels(4). - setWriteBufferSize(100 << 10). - setLevelZeroFileNumCompactionTrigger(3). - setTargetFileSizeBase(200 << 10). + setNumLevels(5). + // a slightly bigger write buffer than L0 file + // so that we can ensure manual flush always + // go before background flush happens. + setWriteBufferSize(L0_FILE_SIZE * 2). + // Disable auto L0 -> L1 compaction + setLevelZeroFileNumCompactionTrigger(20). + setTargetFileSizeBase(L0_FILE_SIZE * 100). setTargetFileSizeMultiplier(1). - setMaxBytesForLevelBase(500 << 10). - setMaxBytesForLevelMultiplier(1). - setDisableAutoCompactions(false))); + // To disable auto compaction + setMaxBytesForLevelBase(NUM_L0_FILES * L0_FILE_SIZE * 100). + setMaxBytesForLevelMultiplier(2). + setDisableAutoCompactions(true))); // open database db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); // fill database with key/value pairs - byte[] b = new byte[10000]; - for (int i = 0; i < 200; i++) { - rand.nextBytes(b); - db.put(columnFamilyHandles.get(1), - String.valueOf(i).getBytes(), b); - } - db.flush(new FlushOptions().setWaitForFlush(true), - columnFamilyHandles.get(1)); - // free column families - for (ColumnFamilyHandle handle : columnFamilyHandles) { - handle.dispose(); - } - // clear column family handles for reopen - columnFamilyHandles.clear(); - db.close(); - columnFamilyDescriptors.get(1). - columnFamilyOptions(). - setTargetFileSizeBase(Long.MAX_VALUE). - setTargetFileSizeMultiplier(1). - setMaxBytesForLevelBase(Long.MAX_VALUE). - setMaxBytesForLevelMultiplier(1). - setDisableAutoCompactions(true); - // reopen database - db = RocksDB.open(opt, - dbFolder.getRoot().getAbsolutePath(), - columnFamilyDescriptors, - columnFamilyHandles); - // compact new column family - db.compactRange(columnFamilyHandles.get(1), true, 0, 0); - // check if new column family is compacted to level zero - for (int i = 0; i < 4; i++) { - if (i == 0) { - assertThat(db.getProperty(columnFamilyHandles.get(1), - "rocksdb.num-files-at-level" + i)). - isEqualTo("1"); - } else { - assertThat(db.getProperty(columnFamilyHandles.get(1), - "rocksdb.num-files-at-level" + i)). - isEqualTo("0"); + byte[] value = new byte[VALUE_SIZE]; + int int_key = 0; + for (int round = 0; round < 5; ++round) { + int initial_key = int_key; + for (int f = 1; f <= NUM_L0_FILES; ++f) { + for (int i = 0; i < NUM_KEYS_PER_L0_FILE; ++i) { + int_key += KEY_INTERVAL; + rand.nextBytes(value); + + db.put(columnFamilyHandles.get(1), + String.format("%020d", int_key).getBytes(), + value); + } + db.flush(new FlushOptions().setWaitForFlush(true), + columnFamilyHandles.get(1)); + // Make sure we do create one more L0 files. + assertThat( + db.getProperty(columnFamilyHandles.get(1), + "rocksdb.num-files-at-level0")). + isEqualTo("" + f); } + + // Compact all L0 files we just created + db.compactRange( + columnFamilyHandles.get(1), + String.format("%020d", initial_key).getBytes(), + String.format("%020d", int_key - 1).getBytes()); + // Making sure there isn't any L0 files. + assertThat( + db.getProperty(columnFamilyHandles.get(1), + "rocksdb.num-files-at-level0")). + isEqualTo("0"); + // Making sure there are some L1 files. + // Here we only use != 0 instead of a specific number + // as we don't want the test make any assumption on + // how compaction works. + assertThat( + db.getProperty(columnFamilyHandles.get(1), + "rocksdb.num-files-at-level1")). + isNotEqualTo("0"); + // Because we only compacted those keys we issued + // in this round, there shouldn't be any L1 -> L2 + // compaction. So we expect zero L2 files here. + assertThat( + db.getProperty(columnFamilyHandles.get(1), + "rocksdb.num-files-at-level2")). + isEqualTo("0"); } } finally { for (ColumnFamilyHandle handle : columnFamilyHandles) { diff --git a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java index f7eed556a..ac68c9f07 100644 --- a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java +++ b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java @@ -231,6 +231,34 @@ public class WriteBatchWithIndexTest { } } + @Test + public void zeroByteTests() { + final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true); + byte[] zeroByteValue = new byte[] { 0, 0 }; + + //add zero byte value + wbwi.put(zeroByteValue, zeroByteValue); + + ByteBuffer buffer = ByteBuffer.allocateDirect(zeroByteValue.length); + buffer.put(zeroByteValue); + + WBWIRocksIterator.WriteEntry[] expected = { + new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.PUT, + new DirectSlice(buffer, zeroByteValue.length), + new DirectSlice(buffer, zeroByteValue.length)) + }; + WBWIRocksIterator it = null; + try { + it = wbwi.newIterator(); + it.seekToFirst(); + assertThat(it.entry().equals(expected[0])).isTrue(); + } finally { + if(it != null) { + it.dispose(); + } + } + } + private byte[] toArray(final ByteBuffer buf) { final byte[] ary = new byte[buf.remaining()]; buf.get(ary); diff --git a/table/block.cc b/table/block.cc index 6a5ede600..ebae8560c 100644 --- a/table/block.cc +++ b/table/block.cc @@ -359,7 +359,7 @@ void Block::SetBlockPrefixIndex(BlockPrefixIndex* prefix_index) { } size_t Block::ApproximateMemoryUsage() const { - size_t usage = size(); + size_t usage = usable_size(); if (hash_index_) { usage += hash_index_->ApproximateMemoryUsage(); } diff --git a/table/block.h b/table/block.h index 0187489bb..2ce48d3fd 100644 --- a/table/block.h +++ b/table/block.h @@ -10,6 +10,9 @@ #pragma once #include #include +#ifdef ROCKSDB_MALLOC_USABLE_SIZE +#include +#endif #include "rocksdb/iterator.h" #include "rocksdb/options.h" @@ -37,6 +40,14 @@ class Block { size_t size() const { return size_; } const char* data() const { return data_; } bool cachable() const { return contents_.cachable; } + size_t usable_size() const { +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + if (contents_.allocation.get() != nullptr) { + return malloc_usable_size(contents_.allocation.get()); + } +#endif // ROCKSDB_MALLOC_USABLE_SIZE + return size_; + } uint32_t NumRestarts() const; CompressionType compression_type() const { return contents_.compression_type; diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 805a78378..006be6fde 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -704,8 +704,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, (end - r->compressed_cache_key_prefix)); // Insert into compressed block cache. - cache_handle = block_cache_compressed->Insert(key, block, block->size(), - &DeleteCachedBlock); + cache_handle = block_cache_compressed->Insert( + key, block, block->usable_size(), &DeleteCachedBlock); block_cache_compressed->Release(cache_handle); // Invalidate OS cache. diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index ed7fb0ba5..47e9a6a30 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -147,6 +147,8 @@ class BlockBasedTable::IndexReader { // The size of the index. virtual size_t size() const = 0; + // Memory usage of the index block + virtual size_t usable_size() const = 0; // Report an approximation of how much memory has been used other than memory // that was allocated in block cache. @@ -187,6 +189,9 @@ class BinarySearchIndexReader : public IndexReader { } virtual size_t size() const override { return index_block_->size(); } + virtual size_t usable_size() const override { + return index_block_->usable_size(); + } virtual size_t ApproximateMemoryUsage() const override { assert(index_block_); @@ -295,6 +300,9 @@ class HashIndexReader : public IndexReader { } virtual size_t size() const override { return index_block_->size(); } + virtual size_t usable_size() const override { + return index_block_->usable_size(); + } virtual size_t ApproximateMemoryUsage() const override { assert(index_block_); @@ -702,9 +710,9 @@ Status BlockBasedTable::GetDataBlockFromCache( assert(block->value->compression_type() == kNoCompression); if (block_cache != nullptr && block->value->cachable() && read_options.fill_cache) { - block->cache_handle = - block_cache->Insert(block_cache_key, block->value, - block->value->size(), &DeleteCachedEntry); + block->cache_handle = block_cache->Insert(block_cache_key, block->value, + block->value->usable_size(), + &DeleteCachedEntry); assert(reinterpret_cast( block_cache->Value(block->cache_handle)) == block->value); } @@ -747,7 +755,7 @@ Status BlockBasedTable::PutDataBlockToCache( if (block_cache_compressed != nullptr && raw_block != nullptr && raw_block->cachable()) { auto cache_handle = block_cache_compressed->Insert( - compressed_block_cache_key, raw_block, raw_block->size(), + compressed_block_cache_key, raw_block, raw_block->usable_size(), &DeleteCachedEntry); block_cache_compressed->Release(cache_handle); RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); @@ -759,9 +767,9 @@ Status BlockBasedTable::PutDataBlockToCache( // insert into uncompressed block cache assert((block->value->compression_type() == kNoCompression)); if (block_cache != nullptr && block->value->cachable()) { - block->cache_handle = - block_cache->Insert(block_cache_key, block->value, block->value->size(), - &DeleteCachedEntry); + block->cache_handle = block_cache->Insert(block_cache_key, block->value, + block->value->usable_size(), + &DeleteCachedEntry); RecordTick(statistics, BLOCK_CACHE_ADD); assert(reinterpret_cast(block_cache->Value(block->cache_handle)) == block->value); @@ -913,8 +921,9 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options, } } - cache_handle = block_cache->Insert(key, index_reader, index_reader->size(), - &DeleteCachedEntry); + cache_handle = + block_cache->Insert(key, index_reader, index_reader->usable_size(), + &DeleteCachedEntry); RecordTick(statistics, BLOCK_CACHE_ADD); } diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 5d3e372dd..f540d3b16 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -22,11 +22,17 @@ namespace { class TwoLevelIterator: public Iterator { public: explicit TwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter); + Iterator* first_level_iter, + bool need_free_iter_and_state); virtual ~TwoLevelIterator() { - first_level_iter_.DeleteIter(false); + first_level_iter_.DeleteIter(!need_free_iter_and_state_); second_level_iter_.DeleteIter(false); + if (need_free_iter_and_state_) { + delete state_; + } else { + state_->~TwoLevelIteratorState(); + } } virtual void Seek(const Slice& target) override; @@ -65,9 +71,10 @@ class TwoLevelIterator: public Iterator { void SetSecondLevelIterator(Iterator* iter); void InitDataBlock(); - std::unique_ptr state_; + TwoLevelIteratorState* state_; IteratorWrapper first_level_iter_; IteratorWrapper second_level_iter_; // May be nullptr + bool need_free_iter_and_state_; Status status_; // If second_level_iter is non-nullptr, then "data_block_handle_" holds the // "index_value" passed to block_function_ to create the second_level_iter. @@ -75,8 +82,11 @@ class TwoLevelIterator: public Iterator { }; TwoLevelIterator::TwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter) - : state_(state), first_level_iter_(first_level_iter) {} + Iterator* first_level_iter, + bool need_free_iter_and_state) + : state_(state), + first_level_iter_(first_level_iter), + need_free_iter_and_state_(need_free_iter_and_state) {} void TwoLevelIterator::Seek(const Slice& target) { if (state_->check_prefix_may_match && @@ -186,12 +196,15 @@ void TwoLevelIterator::InitDataBlock() { } // namespace Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state, - Iterator* first_level_iter, Arena* arena) { + Iterator* first_level_iter, Arena* arena, + bool need_free_iter_and_state) { if (arena == nullptr) { - return new TwoLevelIterator(state, first_level_iter); + return new TwoLevelIterator(state, first_level_iter, + need_free_iter_and_state); } else { auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator)); - return new (mem) TwoLevelIterator(state, first_level_iter); + return new (mem) + TwoLevelIterator(state, first_level_iter, need_free_iter_and_state); } } diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 030193597..4c6b48c2c 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -43,8 +43,11 @@ struct TwoLevelIteratorState { // arena: If not null, the arena is used to allocate the Iterator. // When destroying the iterator, the destructor will destroy // all the states but those allocated in arena. +// need_free_iter_and_state: free `state` and `first_level_iter` if +// true. Otherwise, just call destructor. extern Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state, Iterator* first_level_iter, - Arena* arena = nullptr); + Arena* arena = nullptr, + bool need_free_iter_and_state = true); } // namespace rocksdb diff --git a/tools/dump/rocksdb_dump.cc b/tools/dump/rocksdb_dump.cc index e429db327..95b275b94 100644 --- a/tools/dump/rocksdb_dump.cc +++ b/tools/dump/rocksdb_dump.cc @@ -11,6 +11,11 @@ int main() { } #else +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include #include #include @@ -87,7 +92,7 @@ int main(int argc, char** argv) { status = env->GetAbsolutePath(argv[1], &abspath); snprintf(json, sizeof(json), "{ \"database-path\": \"%s\", \"hostname\": \"%s\", " - "\"creation-time\": %ld }", + "\"creation-time\": %" PRIi64 " }", abspath.c_str(), hostname, timesec); } diff --git a/utilities/merge_operators/string_append/stringappend_test.cc b/utilities/merge_operators/string_append/stringappend_test.cc index d48b9df2a..f0d3976d8 100644 --- a/utilities/merge_operators/string_append/stringappend_test.cc +++ b/utilities/merge_operators/string_append/stringappend_test.cc @@ -36,6 +36,7 @@ std::shared_ptr OpenNormalDb(char delim_char) { return std::shared_ptr(db); } +#ifndef ROCKSDB_LITE // TtlDb is not supported in Lite // Open a TtlDB with a non-associative StringAppendTESTOperator std::shared_ptr OpenTtlDb(char delim_char) { DBWithTTL* db; @@ -45,6 +46,7 @@ std::shared_ptr OpenTtlDb(char delim_char) { EXPECT_OK(DBWithTTL::Open(options, kDbName, &db, 123456)); return std::shared_ptr(db); } +#endif // !ROCKSDB_LITE } // namespace /// StringLists represents a set of string-lists, each with a key-index. @@ -585,12 +587,14 @@ int main(int argc, char** argv) { result = RUN_ALL_TESTS(); } +#ifndef ROCKSDB_LITE // TtlDb is not supported in Lite // Run with TTL { fprintf(stderr, "Running tests with ttl db and generic operator.\n"); StringAppendOperatorTest::SetOpenDbFunction(&OpenTtlDb); result |= RUN_ALL_TESTS(); } +#endif // !ROCKSDB_LITE return result; }