Ankit Gupta 10 years ago
commit 611e286b9d
  1. 1
      HISTORY.md
  2. 35
      Makefile
  3. 13
      db/column_family.cc
  4. 4
      db/column_family.h
  5. 2
      db/compaction_picker.cc
  6. 2
      db/db_bench.cc
  7. 50
      db/db_impl.cc
  8. 4
      db/version_set.h
  9. 5
      db/write_batch_test.cc
  10. 2
      include/rocksdb/immutable_options.h
  11. 16
      include/rocksdb/options.h
  12. 15
      include/rocksdb/utilities/write_batch_with_index.h
  13. 58
      java/org/rocksdb/NativeLibraryLoader.java
  14. 3
      java/org/rocksdb/Options.java
  15. 19
      java/org/rocksdb/RocksDB.java
  16. 5
      table/block_based_table_reader.cc
  17. 2
      table/cuckoo_table_reader_test.cc
  18. 3
      table/full_filter_block_test.cc
  19. 35
      util/env_test.cc
  20. 21
      util/log_buffer.cc
  21. 9
      util/log_buffer.h
  22. 2
      util/logging.cc
  23. 1
      util/logging.h
  24. 33
      util/options.cc
  25. 2
      util/options_helper.cc
  26. 2
      util/options_test.cc
  27. 79
      utilities/write_batch_with_index/write_batch_with_index.cc
  28. 114
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -8,6 +8,7 @@
* We have refactored our system of stalling writes. Any stall-related statistics' meanings are changed. Instead of per-write stall counts, we now count stalls per-epoch, where epochs are periods between flushes and compactions. You'll find more information in our Tuning Perf Guide once we release RocksDB 3.6.
* When disableDataSync=true, we no longer sync the MANIFEST file.
* Add identity_as_first_hash property to CuckooTable. SST file needs to be rebuilt to be opened by reader properly.
* Change target_file_size_base type to uint64_t from int.
----- Past Releases -----

@ -198,7 +198,7 @@ endif # PLATFORM_SHARED_EXT
.PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \
release tags valgrind_check whitebox_crash_test format static_lib shared_lib all \
dbg install uninstall
dbg rocksdbjavastatic rocksdbjava install uninstall
all: $(LIBRARY) $(PROGRAMS) $(TESTS)
@ -268,7 +268,7 @@ unity: unity.cc unity.o
clean:
-rm -f $(PROGRAMS) $(TESTS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) build_config.mk unity.cc
-rm -rf ios-x86/* ios-arm/*
-find . -name "*.[od]" -exec rm {} \;
-find . -name "*.[oda]" -exec rm {} \;
-find . -type f -regex ".*\.\(\(gcda\)\|\(gcno\)\)" -exec rm {} \;
tags:
ctags * -R
@ -518,6 +518,37 @@ ROCKSDBJNILIB = librocksdbjni.jnilib
JAVA_INCLUDE = -I/System/Library/Frameworks/JavaVM.framework/Headers/
endif
libz.a:
-rm -rf zlib-1.2.8
curl -O http://zlib.net/zlib-1.2.8.tar.gz
tar xvzf zlib-1.2.8.tar.gz
cd zlib-1.2.8 && CFLAGS='-fPIC' ./configure --static && make
cp zlib-1.2.8/libz.a .
libbz2.a:
-rm -rf bzip2-1.0.6
curl -O http://www.bzip.org/1.0.6/bzip2-1.0.6.tar.gz
tar xvzf bzip2-1.0.6.tar.gz
cd bzip2-1.0.6 && make CFLAGS='-fPIC -Wall -Winline -O2 -g -D_FILE_OFFSET_BITS=64'
cp bzip2-1.0.6/libbz2.a .
libsnappy.a:
-rm -rf snappy-1.1.1
curl -O https://snappy.googlecode.com/files/snappy-1.1.1.tar.gz
tar xvzf snappy-1.1.1.tar.gz
cd snappy-1.1.1 && ./configure --with-pic --enable-static
cd snappy-1.1.1 && make
cp snappy-1.1.1/.libs/libsnappy.a .
rocksdbjavastatic: libz.a libbz2.a libsnappy.a
OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j
cd java;$(MAKE) java;
rm -f ./java/$(ROCKSDBJNILIB)
$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o ./java/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(LIBOBJECTS) $(COVERAGEFLAGS) libz.a libbz2.a libsnappy.a
cd java;jar -cf $(ROCKSDB_JAR) org/rocksdb/*.class org/rocksdb/util/*.class HISTORY*.md $(ROCKSDBJNILIB)
rocksdbjava:
OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32
cd java;$(MAKE) java;

@ -86,6 +86,10 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
return cfd()->user_comparator();
}
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src;
@ -726,4 +730,13 @@ uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
return column_family_id;
}
const Comparator* GetColumnFamilyUserComparator(
ColumnFamilyHandle* column_family) {
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return cfh->user_comparator();
}
return nullptr;
}
} // namespace rocksdb

@ -49,6 +49,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual const Comparator* user_comparator() const;
virtual uint32_t GetID() const;
@ -448,4 +449,7 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family);
extern const Comparator* GetColumnFamilyUserComparator(
ColumnFamilyHandle* column_family);
} // namespace rocksdb

@ -575,7 +575,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
return nullptr;
}
Version::FileSummaryStorage tmp;
LogToBuffer(log_buffer, "[%s] Universal: candidate files(%zu): %s\n",
LogToBuffer(log_buffer, 3072, "[%s] Universal: candidate files(%zu): %s\n",
version->cfd_->GetName().c_str(), version->files_[level].size(),
version->LevelFileSummary(&tmp, 0));

@ -307,7 +307,7 @@ DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");
DEFINE_int32(num_levels, 7, "The total number of levels");
DEFINE_int32(target_file_size_base, 2 * 1048576, "Target file size at level-1");
DEFINE_int64(target_file_size_base, 2 * 1048576, "Target file size at level-1");
DEFINE_int32(target_file_size_multiplier, 1,
"A multiplier to compute target level-N file size (N >= 2)");

@ -3117,9 +3117,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
const uint64_t start_micros = env_->NowMicros();
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
input->SeekToFirst();
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact->compaction));
backup_input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
@ -3132,14 +3129,30 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
auto compaction_filter_v2 =
compaction_filter_from_factory_v2.get();
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
// iterate through input,
// 1) buffer ineligible keys and value keys into 2 separate buffers;
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
if (compaction_filter_v2) {
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
false,
log_buffer);
} else {
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
// iterate through input,
// 1) buffer ineligible keys and value keys into 2 separate buffers;
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact->compaction));
backup_input->SeekToFirst();
while (backup_input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) {
// FLUSH preempts compaction
@ -3267,21 +3280,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
log_buffer);
} // checking for compaction filter v2
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
false,
log_buffer);
}
if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) {
status = Status::ShutdownInProgress(
"Database shutdown or Column family drop during compaction");

@ -183,10 +183,10 @@ class Version {
// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
struct LevelSummaryStorage {
char buffer[100];
char buffer[1000];
};
struct FileSummaryStorage {
char buffer[1000];
char buffer[3000];
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;
// Return a human-readable short (single-line) summary of files

@ -289,6 +289,9 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
uint32_t GetID() const override { return id_; }
const Comparator* user_comparator() const override {
return BytewiseComparator();
}
private:
uint32_t id_;
@ -320,7 +323,7 @@ TEST(WriteBatchTest, ColumnFamiliesBatchTest) {
}
TEST(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
WriteBatchWithIndex batch(BytewiseComparator(), 20);
WriteBatchWithIndex batch;
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Put(&two, Slice("twofoo"), Slice("bar2"));

@ -77,6 +77,8 @@ struct ImmutableCFOptions {
std::vector<CompressionType> compression_per_level;
CompressionOptions compression_opts;
Options::AccessHint access_hint_on_compaction_start;
};
} // namespace rocksdb

@ -58,6 +58,7 @@ enum CompactionStyle : char {
kCompactionStyleFIFO = 0x2, // FIFO compaction style
};
struct CompactionOptionsFIFO {
// once the total sum of table files reaches this, we will delete the oldest
// table file
@ -287,7 +288,7 @@ struct ColumnFamilyOptions {
// and each file on level-3 will be 200MB.
// by default target_file_size_base is 2MB.
int target_file_size_base;
uint64_t target_file_size_base;
// by default target_file_size_multiplier is 1, which means
// by default files in different levels will have similar size.
int target_file_size_multiplier;
@ -783,12 +784,13 @@ struct DBOptions {
// Specify the file access pattern once a compaction is started.
// It will be applied to all input files of a compaction.
// Default: NORMAL
enum {
NONE,
NORMAL,
SEQUENTIAL,
WILLNEED
} access_hint_on_compaction_start;
enum AccessHint {
NONE,
NORMAL,
SEQUENTIAL,
WILLNEED
};
AccessHint access_hint_on_compaction_start;
// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not

@ -11,8 +11,9 @@
#pragma once
#include "rocksdb/status.h"
#include "rocksdb/comparator.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/write_batch.h"
namespace rocksdb {
@ -56,12 +57,14 @@ class WBWIIterator {
// A user can call NewIterator() to create an iterator.
class WriteBatchWithIndex {
public:
// index_comparator indicates the order when iterating data in the write
// batch. Technically, it doesn't have to be the same as the one used in
// the DB.
// backup_index_comparator: the backup comparator used to compare keys
// within the same column family, if column family is not given in the
// interface, or we can't find a column family from the column family handle
// passed in, backup_index_comparator will be used for the column family.
// reserved_bytes: reserved bytes in underlying WriteBatch
explicit WriteBatchWithIndex(const Comparator* index_comparator,
size_t reserved_bytes = 0);
explicit WriteBatchWithIndex(
const Comparator* backup_index_comparator = BytewiseComparator(),
size_t reserved_bytes = 0);
virtual ~WriteBatchWithIndex();
WriteBatch* GetWriteBatch();

@ -0,0 +1,58 @@
package org.rocksdb;
import java.io.*;
/**
* This class is used to load the RocksDB shared library from within the jar.
* The shared library is extracted to a temp folder and loaded from there.
*/
public class NativeLibraryLoader {
private static String sharedLibraryName = "librocksdbjni.so";
private static String tempFilePrefix = "librocksdbjni";
private static String tempFileSuffix = ".so";
public static void loadLibraryFromJar(String tmpDir)
throws IOException {
File temp;
if(tmpDir == null || tmpDir.equals(""))
temp = File.createTempFile(tempFilePrefix, tempFileSuffix);
else
temp = new File(tmpDir + "/" + sharedLibraryName);
temp.deleteOnExit();
if (!temp.exists()) {
throw new RuntimeException("File " + temp.getAbsolutePath() + " does not exist.");
}
byte[] buffer = new byte[102400];
int readBytes;
InputStream is = ClassLoader.getSystemClassLoader().getResourceAsStream(sharedLibraryName);
if (is == null) {
throw new RuntimeException(sharedLibraryName + " was not found inside JAR.");
}
OutputStream os = null;
try {
os = new FileOutputStream(temp);
while ((readBytes = is.read(buffer)) != -1) {
os.write(buffer, 0, readBytes);
}
} finally {
if(os != null)
os.close();
if(is != null)
is.close();
}
System.load(temp.getAbsolutePath());
}
/**
* Private constructor to disallow instantiation
*/
private NativeLibraryLoader() {
}
}

@ -13,6 +13,9 @@ package org.rocksdb;
* native resources will be released as part of the process.
*/
public class Options extends RocksObject {
static {
RocksDB.loadLibrary();
}
static final long DEFAULT_CACHE_SIZE = 8 << 20;
static final int DEFAULT_NUM_SHARD_BITS = -1;
/**

@ -11,6 +11,7 @@ import java.util.HashMap;
import java.io.Closeable;
import java.io.IOException;
import org.rocksdb.util.Environment;
import org.rocksdb.NativeLibraryLoader;
/**
* A RocksDB is a persistent ordered map from keys to values. It is safe for
@ -23,11 +24,19 @@ public class RocksDB extends RocksObject {
private static final String[] compressionLibs_ = {
"snappy", "z", "bzip2", "lz4", "lz4hc"};
static {
RocksDB.loadLibrary();
}
/**
* Loads the necessary library files.
* Calling this method twice will have no effect.
* By default the method extracts the shared library for loading at
* java.io.tmpdir, however, you can override this temporary location by
* setting the environment variable ROCKSDB_SHAREDLIB_DIR.
*/
public static synchronized void loadLibrary() {
String tmpDir = System.getenv("ROCKSDB_SHAREDLIB_DIR");
// loading possibly necessary libraries.
for (String lib : compressionLibs_) {
try {
@ -36,8 +45,14 @@ public class RocksDB extends RocksObject {
// since it may be optional, we ignore its loading failure here.
}
}
// However, if any of them is required. We will see error here.
System.loadLibrary("rocksdbjni");
try
{
NativeLibraryLoader.loadLibraryFromJar(tmpDir);
}
catch (IOException e)
{
throw new RuntimeException("Unable to load the RocksDB shared library" + e);
}
}
/**

@ -498,7 +498,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// pre-load these blocks, which will kept in member variables in Rep
// and with a same life-time as this table object.
IndexReader* index_reader = nullptr;
// TODO: we never really verify check sum for index block
s = new_table->CreateIndexReader(&index_reader, meta_iter.get());
if (s.ok()) {
@ -533,8 +532,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
}
void BlockBasedTable::SetupForCompaction() {
/*
switch (.access_hint_on_compaction_start) {
switch (rep_->ioptions.access_hint_on_compaction_start) {
case Options::NONE:
break;
case Options::NORMAL:
@ -550,7 +548,6 @@ void BlockBasedTable::SetupForCompaction() {
assert(false);
}
compaction_optimized_ = true;
*/
}
std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()

@ -522,7 +522,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
float time_per_op = (env->NowMicros() - start_time) * 1.0 / num;
fprintf(stderr,
"Time taken per op is %.3fus (%.1f Mqps) with batch size of %u, "
"# of found keys %ld\n",
"# of found keys %" PRId64 "\n",
time_per_op, 1.0 / time_per_op, batch_size, found_count);
}
} // namespace.

@ -30,7 +30,8 @@ class TestFilterBitsBuilder : public FilterBitsBuilder {
for (size_t i = 0; i < hash_entries_.size(); i++) {
EncodeFixed32(data + i * 4, hash_entries_[i]);
}
buf->reset(data);
const char* const_data = data;
buf->reset(const_data);
return Slice(data, len);
}

@ -768,6 +768,41 @@ TEST(EnvPosixTest, LogBufferTest) {
ASSERT_EQ(10, test_logger.char_x_count);
}
class TestLogger2 : public Logger {
public:
explicit TestLogger2(size_t max_log_size) : max_log_size_(max_log_size) {}
virtual void Logv(const char* format, va_list ap) override {
char new_format[2000];
std::fill_n(new_format, sizeof(new_format), '2');
{
va_list backup_ap;
va_copy(backup_ap, ap);
int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap);
// 48 bytes for extra information + bytes allocated
ASSERT_TRUE(
n <= 48 + static_cast<int>(max_log_size_ - sizeof(struct timeval)));
ASSERT_TRUE(n > static_cast<int>(max_log_size_ - sizeof(struct timeval)));
va_end(backup_ap);
}
}
size_t max_log_size_;
};
TEST(EnvPosixTest, LogBufferMaxSizeTest) {
char bytes9000[9000];
std::fill_n(bytes9000, sizeof(bytes9000), '1');
bytes9000[sizeof(bytes9000) - 1] = '\0';
for (size_t max_log_size = 256; max_log_size <= 1024;
max_log_size += 1024 - 256) {
TestLogger2 test_logger(max_log_size);
test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger);
LogToBuffer(&log_buffer, max_log_size, "%s", bytes9000);
log_buffer.FlushBufferToLog();
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -13,17 +13,17 @@ LogBuffer::LogBuffer(const InfoLogLevel log_level,
Logger*info_log)
: log_level_(log_level), info_log_(info_log) {}
void LogBuffer::AddLogToBuffer(const char* format, va_list ap) {
void LogBuffer::AddLogToBuffer(size_t max_log_size, const char* format,
va_list ap) {
if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) {
// Skip the level because of its level.
return;
}
const size_t kLogSizeLimit = 512;
char* alloc_mem = arena_.AllocateAligned(kLogSizeLimit);
char* alloc_mem = arena_.AllocateAligned(max_log_size);
BufferedLog* buffered_log = new (alloc_mem) BufferedLog();
char* p = buffered_log->message;
char* limit = alloc_mem + kLogSizeLimit - 1;
char* limit = alloc_mem + max_log_size - 1;
// store the time
gettimeofday(&(buffered_log->now_tv), nullptr);
@ -61,11 +61,22 @@ void LogBuffer::FlushBufferToLog() {
logs_.clear();
}
void LogToBuffer(LogBuffer* log_buffer, size_t max_log_size, const char* format,
...) {
if (log_buffer != nullptr) {
va_list ap;
va_start(ap, format);
log_buffer->AddLogToBuffer(max_log_size, format, ap);
va_end(ap);
}
}
void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) {
const size_t kDefaultMaxLogSize = 512;
if (log_buffer != nullptr) {
va_list ap;
va_start(ap, format);
log_buffer->AddLogToBuffer(format, ap);
log_buffer->AddLogToBuffer(kDefaultMaxLogSize, format, ap);
va_end(ap);
}
}

@ -21,8 +21,9 @@ class LogBuffer {
// info_log: logger to write the logs to
LogBuffer(const InfoLogLevel log_level, Logger* info_log);
// Add a log entry to the buffer.
void AddLogToBuffer(const char* format, va_list ap);
// Add a log entry to the buffer. Use default max_log_size.
// max_log_size indicates maximize log size, including some metadata.
void AddLogToBuffer(size_t max_log_size, const char* format, va_list ap);
size_t IsEmpty() const { return logs_.empty(); }
@ -44,6 +45,10 @@ class LogBuffer {
// Add log to the LogBuffer for a delayed info logging. It can be used when
// we want to add some logs inside a mutex.
// max_log_size indicates maximize log size, including some metadata.
extern void LogToBuffer(LogBuffer* log_buffer, size_t max_log_size,
const char* format, ...);
// Same as previous function, but with default max log size.
extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...);
} // namespace rocksdb

@ -45,7 +45,7 @@ int AppendHumanBytes(uint64_t bytes, char* output, int len) {
void AppendNumberTo(std::string* str, uint64_t num) {
char buf[30];
snprintf(buf, sizeof(buf), "%llu", (unsigned long long) num);
snprintf(buf, sizeof(buf), "%" PRIu64, num);
str->append(buf);
}

@ -19,7 +19,6 @@
namespace rocksdb {
class Slice;
class WritableFile;
// Append a human-readable size in bytes
int AppendHumanBytes(uint64_t bytes, char* output, int len);

@ -59,7 +59,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
use_fsync(options.use_fsync),
compression(options.compression),
compression_per_level(options.compression_per_level),
compression_opts(options.compression_opts) {}
compression_opts(options.compression_opts),
access_hint_on_compaction_start(options.access_hint_on_compaction_start) {}
ColumnFamilyOptions::ColumnFamilyOptions()
: comparator(BytewiseComparator()),
@ -273,8 +274,8 @@ void DBOptions::Dump(Logger* log) const {
Log(log, " Options.disableDataSync: %d", disableDataSync);
Log(log, " Options.use_fsync: %d", use_fsync);
Log(log, " Options.max_log_file_size: %zu", max_log_file_size);
Log(log, "Options.max_manifest_file_size: %lu",
(unsigned long)max_manifest_file_size);
Log(log, "Options.max_manifest_file_size: %" PRIu64,
max_manifest_file_size);
Log(log, " Options.log_file_time_to_roll: %zu", log_file_time_to_roll);
Log(log, " Options.keep_log_file_num: %zu", keep_log_file_num);
Log(log, " Options.allow_os_buffer: %d", allow_os_buffer);
@ -290,16 +291,16 @@ void DBOptions::Dump(Logger* log) const {
table_cache_numshardbits);
Log(log, " Options.table_cache_remove_scan_count_limit: %d",
table_cache_remove_scan_count_limit);
Log(log, " Options.delete_obsolete_files_period_micros: %lu",
(unsigned long)delete_obsolete_files_period_micros);
Log(log, " Options.delete_obsolete_files_period_micros: %" PRIu64,
delete_obsolete_files_period_micros);
Log(log, " Options.max_background_compactions: %d",
max_background_compactions);
Log(log, " Options.max_background_flushes: %d",
max_background_flushes);
Log(log, " Options.WAL_ttl_seconds: %lu",
(unsigned long)WAL_ttl_seconds);
Log(log, " Options.WAL_size_limit_MB: %lu",
(unsigned long)WAL_size_limit_MB);
Log(log, " Options.WAL_ttl_seconds: %" PRIu64,
WAL_ttl_seconds);
Log(log, " Options.WAL_size_limit_MB: %" PRIu64,
WAL_size_limit_MB);
Log(log, " Options.manifest_preallocation_size: %zu",
manifest_preallocation_size);
Log(log, " Options.allow_os_buffer: %d",
@ -322,8 +323,8 @@ void DBOptions::Dump(Logger* log) const {
use_adaptive_mutex);
Log(log, " Options.rate_limiter: %p",
rate_limiter.get());
Log(log, " Options.bytes_per_sync: %lu",
(unsigned long)bytes_per_sync);
Log(log, " Options.bytes_per_sync: %" PRIu64,
bytes_per_sync);
} // DBOptions::Dump
void ColumnFamilyOptions::Dump(Logger* log) const {
@ -371,20 +372,20 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
level0_stop_writes_trigger);
Log(log," Options.max_mem_compaction_level: %d",
max_mem_compaction_level);
Log(log," Options.target_file_size_base: %d",
Log(log," Options.target_file_size_base: %" PRIu64,
target_file_size_base);
Log(log," Options.target_file_size_multiplier: %d",
target_file_size_multiplier);
Log(log," Options.max_bytes_for_level_base: %lu",
(unsigned long)max_bytes_for_level_base);
Log(log," Options.max_bytes_for_level_base: %" PRIu64,
max_bytes_for_level_base);
Log(log," Options.max_bytes_for_level_multiplier: %d",
max_bytes_for_level_multiplier);
for (int i = 0; i < num_levels; i++) {
Log(log,"Options.max_bytes_for_level_multiplier_addtl[%d]: %d",
i, max_bytes_for_level_multiplier_additional[i]);
}
Log(log," Options.max_sequential_skip_in_iterations: %lu",
(unsigned long)max_sequential_skip_in_iterations);
Log(log," Options.max_sequential_skip_in_iterations: %" PRIu64,
max_sequential_skip_in_iterations);
Log(log," Options.expanded_compaction_factor: %d",
expanded_compaction_factor);
Log(log," Options.source_compaction_factor: %d",

@ -177,7 +177,7 @@ bool GetOptionsFromStrings(
} else if (o.first == "max_mem_compaction_level") {
new_options->max_mem_compaction_level = ParseInt(o.second);
} else if (o.first == "target_file_size_base") {
new_options->target_file_size_base = ParseInt(o.second);
new_options->target_file_size_base = ParseUint64(o.second);
} else if (o.first == "target_file_size_multiplier") {
new_options->target_file_size_multiplier = ParseInt(o.second);
} else if (o.first == "max_bytes_for_level_base") {

@ -177,7 +177,7 @@ TEST(OptionsTest, GetOptionsFromStringsTest) {
ASSERT_EQ(new_opt.level0_slowdown_writes_trigger, 9);
ASSERT_EQ(new_opt.level0_stop_writes_trigger, 10);
ASSERT_EQ(new_opt.max_mem_compaction_level, 11);
ASSERT_EQ(new_opt.target_file_size_base, 12);
ASSERT_EQ(new_opt.target_file_size_base, static_cast<uint64_t>(12));
ASSERT_EQ(new_opt.target_file_size_multiplier, 13);
ASSERT_EQ(new_opt.max_bytes_for_level_base, 14U);
ASSERT_EQ(new_opt.max_bytes_for_level_multiplier, 15);

@ -20,7 +20,6 @@ class ReadableWriteBatch : public WriteBatch {
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
Slice* value, Slice* blob) const;
};
} // namespace
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
struct WriteBatchIndexEntry {
@ -38,44 +37,28 @@ struct WriteBatchIndexEntry {
class WriteBatchEntryComparator {
public:
WriteBatchEntryComparator(const Comparator* comparator,
WriteBatchEntryComparator(const Comparator* default_comparator,
const ReadableWriteBatch* write_batch)
: comparator_(comparator), write_batch_(write_batch) {}
: default_comparator_(default_comparator), write_batch_(write_batch) {}
// Compare a and b. Return a negative value if a is less than b, 0 if they
// are equal, and a positive value if a is greater than b
int operator()(const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const;
void SetComparatorForCF(uint32_t column_family_id,
const Comparator* comparator) {
cf_comparator_map_[column_family_id] = comparator;
}
private:
const Comparator* comparator_;
const Comparator* default_comparator_;
std::unordered_map<uint32_t, const Comparator*> cf_comparator_map_;
const ReadableWriteBatch* write_batch_;
};
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
WriteBatchEntrySkipList;
struct WriteBatchWithIndex::Rep {
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0)
: write_batch(reserved_bytes),
comparator(index_comparator, &write_batch),
skip_list(comparator, &arena) {}
ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator;
Arena arena;
WriteBatchEntrySkipList skip_list;
WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) {
return GetEntryWithCfId(GetColumnFamilyID(column_family));
}
WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) {
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
auto* index_entry = new (mem)
WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id);
return index_entry;
}
};
class WBWIIteratorImpl : public WBWIIterator {
public:
WBWIIteratorImpl(uint32_t column_family_id,
@ -138,6 +121,35 @@ class WBWIIteratorImpl : public WBWIIterator {
}
}
};
} // namespace
struct WriteBatchWithIndex::Rep {
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0)
: write_batch(reserved_bytes),
comparator(index_comparator, &write_batch),
skip_list(comparator, &arena) {}
ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator;
Arena arena;
WriteBatchEntrySkipList skip_list;
WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) {
uint32_t cf_id = GetColumnFamilyID(column_family);
const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
if (cf_cmp != nullptr) {
comparator.SetComparatorForCF(cf_id, cf_cmp);
}
return GetEntryWithCfId(cf_id);
}
WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) {
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
auto* index_entry = new (mem)
WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id);
return index_entry;
}
};
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
@ -179,9 +191,9 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
return Status::OK();
}
WriteBatchWithIndex::WriteBatchWithIndex(const Comparator* index_comparator,
size_t reserved_bytes)
: rep(new Rep(index_comparator, reserved_bytes)) {}
WriteBatchWithIndex::WriteBatchWithIndex(
const Comparator* default_index_comparator, size_t reserved_bytes)
: rep(new Rep(default_index_comparator, reserved_bytes)) {}
WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }
@ -287,7 +299,14 @@ int WriteBatchEntryComparator::operator()(
key2 = *(entry2->search_key);
}
int cmp = comparator_->Compare(key1, key2);
int cmp;
auto comparator_for_cf = cf_comparator_map_.find(entry1->column_family);
if (comparator_for_cf != cf_comparator_map_.end()) {
cmp = comparator_for_cf->second->Compare(key1, key2);
} else {
cmp = default_comparator_->Compare(key1, key2);
}
if (cmp != 0) {
return cmp;
} else if (entry1->offset > entry2->offset) {

@ -19,12 +19,16 @@ namespace rocksdb {
namespace {
class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
public:
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
explicit ColumnFamilyHandleImplDummy(int id, const Comparator* comparator)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr),
id_(id),
comparator_(comparator) {}
uint32_t GetID() const override { return id_; }
const Comparator* user_comparator() const override { return comparator_; }
private:
uint32_t id_;
const Comparator* comparator_;
};
struct Entry {
@ -90,8 +94,9 @@ TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
index_map[e.value].push_back(&e);
}
WriteBatchWithIndex batch(BytewiseComparator(), 20);
ColumnFamilyHandleImplDummy data(6), index(8);
WriteBatchWithIndex batch(nullptr, 20);
ColumnFamilyHandleImplDummy data(6, BytewiseComparator());
ColumnFamilyHandleImplDummy index(8, BytewiseComparator());
for (auto& e : entries) {
if (e.type == kPutRecord) {
batch.Put(&data, e.key, e.value);
@ -230,6 +235,107 @@ TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
}
}
class ReverseComparator : public Comparator {
public:
ReverseComparator() {}
virtual const char* Name() const override {
return "rocksdb.ReverseComparator";
}
virtual int Compare(const Slice& a, const Slice& b) const override {
return 0 - BytewiseComparator()->Compare(a, b);
}
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const {}
virtual void FindShortSuccessor(std::string* key) const {}
};
TEST(WriteBatchWithIndexTest, TestComparatorForCF) {
ReverseComparator reverse_cmp;
ColumnFamilyHandleImplDummy cf1(6, nullptr);
ColumnFamilyHandleImplDummy reverse_cf(66, &reverse_cmp);
ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20);
batch.Put(&cf1, "ddd", "");
batch.Put(&cf2, "aaa", "");
batch.Put(&cf2, "eee", "");
batch.Put(&cf1, "ccc", "");
batch.Put(&reverse_cf, "a11", "");
batch.Put(&cf1, "bbb", "");
batch.Put(&reverse_cf, "a33", "");
batch.Put(&reverse_cf, "a22", "");
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbb", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("ccc", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("ddd", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf2));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("aaa", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("eee", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&reverse_cf));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("z");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a33", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a22", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a11", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("a22");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a22", iter->Entry().key.ToString());
iter->Seek("a13");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a11", iter->Entry().key.ToString());
}
}
} // namespace
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

Loading…
Cancel
Save