Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/db_test.cc
	db/internal_stats.cc
	db/internal_stats.h
	db/version_edit.cc
	db/version_edit.h
	db/version_set.cc
	include/rocksdb/options.h
	util/options.cc
main
Igor Canadi 11 years ago
commit ddbd1ece88
  1. 3
      HISTORY.md
  2. 35
      Makefile
  3. 2
      db/column_family.cc
  4. 1
      db/column_family_test.cc
  5. 2
      db/corruption_test.cc
  6. 13
      db/db_bench.cc
  7. 3
      db/db_impl.cc
  8. 1
      db/db_impl.h
  9. 29
      db/db_test.cc
  10. 8
      db/internal_stats.cc
  11. 2
      db/internal_stats.h
  12. 23
      db/memtable.cc
  13. 4
      db/merge_helper.cc
  14. 2
      db/merge_operator.cc
  15. 26
      db/table_cache.cc
  16. 2
      db/table_cache.h
  17. 15
      db/version_edit.cc
  18. 10
      db/version_edit.h
  19. 169
      db/version_set.cc
  20. 6
      include/rocksdb/compaction_filter.h
  21. 19
      include/rocksdb/options.h
  22. 103
      include/utilities/geo_db.h
  23. 17
      java/Makefile
  24. 79
      java/RocksDBSample.java
  25. 103
      java/org/rocksdb/RocksDB.java
  26. 24
      java/org/rocksdb/RocksDBException.java
  27. 81
      java/rocksjni/portal.h
  28. 185
      java/rocksjni/rocksjni.cc
  29. 27
      port/port_posix.cc
  30. 9
      port/port_posix.h
  31. 4
      table/plain_table_reader.cc
  32. 38
      util/dynamic_bloom.cc
  33. 61
      util/dynamic_bloom.h
  34. 187
      util/dynamic_bloom_test.cc
  35. 5
      util/env_posix.cc
  36. 4
      util/env_test.cc
  37. 14
      util/options.cc
  38. 4
      utilities/backupable/backupable_db_test.cc
  39. 427
      utilities/geodb/geodb_impl.cc
  40. 187
      utilities/geodb/geodb_impl.h
  41. 123
      utilities/geodb/geodb_test.cc

@ -16,7 +16,7 @@
* Added "virtual void WaitForJoin()" in class Env. Default operation is no-op. * Added "virtual void WaitForJoin()" in class Env. Default operation is no-op.
* Removed BackupEngine::DeleteBackupsNewerThan() function * Removed BackupEngine::DeleteBackupsNewerThan() function
* Added new option -- verify_checksums_in_compaction * Added new option -- verify_checksums_in_compaction
* Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership) * Changed Options.prefix_extractor from raw pointer to shared_ptr (take ownership)
Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly) Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly)
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools * Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
* Added a command "checkconsistency" in ldb tool, which checks * Added a command "checkconsistency" in ldb tool, which checks
@ -28,6 +28,7 @@
we will ignore it. We assume that writers of these records were interrupted we will ignore it. We assume that writers of these records were interrupted
and that we can safely ignore it. and that we can safely ignore it.
* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. * Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB.
* Geo-spatial support for locations and radial-search.
## 2.7.0 (01/28/2014) ## 2.7.0 (01/28/2014)

@ -94,7 +94,8 @@ TESTS = \
write_batch_test\ write_batch_test\
deletefile_test \ deletefile_test \
table_test \ table_test \
thread_local_test thread_local_test \
geodb_test
TOOLS = \ TOOLS = \
sst_dump \ sst_dump \
@ -370,6 +371,9 @@ merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS)
deletefile_test: db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) deletefile_test: db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(CXX) db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
geodb_test: utilities/geodb/geodb_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) utilities/geodb/geodb_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
$(MEMENVLIBRARY) : $(MEMENVOBJECTS) $(MEMENVLIBRARY) : $(MEMENVOBJECTS)
rm -f $@ rm -f $@
$(AR) -rs $@ $(MEMENVOBJECTS) $(AR) -rs $@ $(MEMENVOBJECTS)
@ -398,6 +402,31 @@ sst_dump: tools/sst_dump.o $(LIBOBJECTS)
ldb: tools/ldb.o $(LIBOBJECTS) ldb: tools/ldb.o $(LIBOBJECTS)
$(CXX) tools/ldb.o $(LIBOBJECTS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) $(CXX) tools/ldb.o $(LIBOBJECTS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
# ---------------------------------------------------------------------------
# Jni stuff
# ---------------------------------------------------------------------------
JNI_NATIVE_SOURCES = ./java/rocksjni/rocksjni.cc
JAVA_INCLUDE = -I/usr/lib/jvm/java-openjdk/include/ -I/usr/lib/jvm/java-openjdk/include/linux
ROCKSDBJNILIB = ./java/librocksdbjni.so
ifeq ($(PLATFORM), OS_MACOSX)
ROCKSDBJNILIB = ./java/librocksdbjni.jnilib
JAVA_INCLUDE = -I/System/Library/Frameworks/JavaVM.framework/Headers/
endif
jni: clean
OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32
cd java;$(MAKE) java;
$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o $(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(LIBOBJECTS) $(LDFLAGS) $(COVERAGEFLAGS)
jclean:
cd java;$(MAKE) clean;
rm -f $(ROCKSDBJNILIB)
jtest:
cd java;$(MAKE) sample;
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Platform-specific compilation # Platform-specific compilation
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -461,6 +490,10 @@ depend: $(DEPFILES)
# working solution. # working solution.
ifneq ($(MAKECMDGOALS),clean) ifneq ($(MAKECMDGOALS),clean)
ifneq ($(MAKECMDGOALS),format) ifneq ($(MAKECMDGOALS),format)
ifneq ($(MAKECMDGOALS),jclean)
ifneq ($(MAKECMDGOALS),jtest)
-include $(DEPFILES) -include $(DEPFILES)
endif endif
endif endif
endif
endif

@ -300,12 +300,12 @@ Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
SuperVersion* ColumnFamilyData::InstallSuperVersion( SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex) { SuperVersion* new_superversion, port::Mutex* db_mutex) {
new_superversion->db_mutex = db_mutex;
new_superversion->Init(mem_, imm_.current(), current_); new_superversion->Init(mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_; SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion; super_version_ = new_superversion;
++super_version_number_; ++super_version_number_;
super_version_->version_number = super_version_number_; super_version_->version_number = super_version_number_;
super_version_->db_mutex = db_mutex;
if (old_superversion != nullptr && old_superversion->Unref()) { if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup(); old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex return old_superversion; // will let caller delete outside of mutex

@ -704,6 +704,7 @@ TEST(ColumnFamilyTest, DifferentCompactionStyles) {
default_cf.filter_policy = nullptr; default_cf.filter_policy = nullptr;
default_cf.no_block_cache = true; default_cf.no_block_cache = true;
default_cf.source_compaction_factor = 100; default_cf.source_compaction_factor = 100;
default_cf.disable_seek_compaction = false;
one.compaction_style = kCompactionStyleUniversal; one.compaction_style = kCompactionStyleUniversal;
// trigger compaction if there are >= 4 files // trigger compaction if there are >= 4 files

@ -387,7 +387,7 @@ TEST(CorruptionTest, FileSystemStateCorrupted) {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
std::vector<LiveFileMetaData> metadata; std::vector<LiveFileMetaData> metadata;
dbi->GetLiveFilesMetaData(&metadata); dbi->GetLiveFilesMetaData(&metadata);
ASSERT_GT(metadata.size(), 0); ASSERT_GT(metadata.size(), size_t(0));
std::string filename = dbname_ + metadata[0].name; std::string filename = dbname_ + metadata[0].name;
delete db_; delete db_;

@ -134,6 +134,8 @@ DEFINE_int64(read_range, 1, "When ==1 reads use ::Get, when >1 reads use"
DEFINE_bool(use_prefix_blooms, false, "Whether to place prefixes in blooms"); DEFINE_bool(use_prefix_blooms, false, "Whether to place prefixes in blooms");
DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");
DEFINE_bool(use_prefix_api, false, "Whether to set ReadOptions.prefix for" DEFINE_bool(use_prefix_api, false, "Whether to set ReadOptions.prefix for"
" prefixscanrandom. If true, use_prefix_blooms must also be true."); " prefixscanrandom. If true, use_prefix_blooms must also be true.");
@ -1543,6 +1545,7 @@ class Benchmark {
NewFixedPrefixTransform(FLAGS_prefix_size)); NewFixedPrefixTransform(FLAGS_prefix_size));
} }
options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits;
options.bloom_locality = FLAGS_bloom_locality;
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
options.statistics = dbstats; options.statistics = dbstats;
options.env = FLAGS_env; options.env = FLAGS_env;
@ -1916,7 +1919,7 @@ class Benchmark {
Duration duration(FLAGS_duration, reads_); Duration duration(FLAGS_duration, reads_);
int64_t found = 0; int64_t found = 0;
int64_t read = 0;
if (FLAGS_use_multiget) { // MultiGet if (FLAGS_use_multiget) { // MultiGet
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = reads_; long keys_left = reads_;
@ -1924,6 +1927,7 @@ class Benchmark {
// Recalculate number of keys per group, and call MultiGet until done // Recalculate number of keys per group, and call MultiGet until done
long num_keys; long num_keys;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
read += num_keys;
found += found +=
MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ""); MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, "");
thread->stats.FinishedSingleOp(db_); thread->stats.FinishedSingleOp(db_);
@ -1937,8 +1941,9 @@ class Benchmark {
std::string key = GenerateKeyFromInt(k, FLAGS_num); std::string key = GenerateKeyFromInt(k, FLAGS_num);
iter->Seek(key); iter->Seek(key);
read++;
if (iter->Valid() && iter->key().compare(Slice(key)) == 0) { if (iter->Valid() && iter->key().compare(Slice(key)) == 0) {
++found; found++;
} }
thread->stats.FinishedSingleOp(db_); thread->stats.FinishedSingleOp(db_);
@ -1957,6 +1962,7 @@ class Benchmark {
} }
if (FLAGS_read_range < 2) { if (FLAGS_read_range < 2) {
read++;
if (db_->Get(options, key, &value).ok()) { if (db_->Get(options, key, &value).ok()) {
found++; found++;
} }
@ -1972,6 +1978,7 @@ class Benchmark {
db_->GetApproximateSizes(&range, 1, &sizes); db_->GetApproximateSizes(&range, 1, &sizes);
} }
read += FLAGS_read_range;
for (iter->Seek(key); for (iter->Seek(key);
iter->Valid() && count <= FLAGS_read_range; iter->Valid() && count <= FLAGS_read_range;
++count, iter->Next()) { ++count, iter->Next()) {
@ -1992,7 +1999,7 @@ class Benchmark {
char msg[100]; char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, reads_); found, read);
thread->stats.AddMessage(msg); thread->stats.AddMessage(msg);

@ -70,7 +70,6 @@
namespace rocksdb { namespace rocksdb {
const std::string default_column_family_name("default"); const std::string default_column_family_name("default");
const std::string kNullString = "NULL";
void DumpLeveldbBuildVersion(Logger * log); void DumpLeveldbBuildVersion(Logger * log);
@ -466,7 +465,6 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
Status DBImpl::NewDB() { Status DBImpl::NewDB() {
VersionEdit new_db; VersionEdit new_db;
new_db.SetVersionNumber();
new_db.SetLogNumber(0); new_db.SetLogNumber(0);
new_db.SetNextFile(2); new_db.SetNextFile(2);
new_db.SetLastSequence(0); new_db.SetLastSequence(0);
@ -2826,7 +2824,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
assert(compact); assert(compact);
compact->CleanupBatchBuffer(); compact->CleanupBatchBuffer();
compact->CleanupMergedBuffer(); compact->CleanupMergedBuffer();
compact->cur_prefix_ = kNullString;
bool prefix_initialized = false; bool prefix_initialized = false;
int64_t imm_micros = 0; // Micros spent doing imm_ compactions int64_t imm_micros = 0; // Micros spent doing imm_ compactions

@ -269,6 +269,7 @@ class DBImpl : public DB {
private: private:
friend class DB; friend class DB;
friend class InternalStats;
friend class TailingIterator; friend class TailingIterator;
friend struct SuperVersion; friend struct SuperVersion;
struct CompactionState; struct CompactionState;

@ -432,6 +432,7 @@ class DBTest {
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
break; break;
case kCompressedBlockCache: case kCompressedBlockCache:
options.allow_mmap_writes = true;
options.block_cache_compressed = NewLRUCache(8*1024*1024); options.block_cache_compressed = NewLRUCache(8*1024*1024);
break; break;
case kInfiniteMaxOpenFiles: case kInfiniteMaxOpenFiles:
@ -2185,6 +2186,8 @@ TEST(DBTest, NumImmutableMemTable) {
ASSERT_EQ(1, (int) perf_context.get_from_memtable_count); ASSERT_EQ(1, (int) perf_context.get_from_memtable_count);
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "k3", big_value)); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "k3", big_value));
ASSERT_TRUE(dbfull()->GetProperty(
handles_[1], "rocksdb.cur-size-active-mem-table", &num));
ASSERT_TRUE(dbfull()->GetProperty(handles_[1], ASSERT_TRUE(dbfull()->GetProperty(handles_[1],
"rocksdb.num-immutable-mem-table", &num)); "rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "2"); ASSERT_EQ(num, "2");
@ -2202,6 +2205,11 @@ TEST(DBTest, NumImmutableMemTable) {
ASSERT_TRUE(dbfull()->GetProperty(handles_[1], ASSERT_TRUE(dbfull()->GetProperty(handles_[1],
"rocksdb.num-immutable-mem-table", &num)); "rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "0"); ASSERT_EQ(num, "0");
ASSERT_TRUE(dbfull()->GetProperty(
handles_[1], "rocksdb.cur-size-active-mem-table", &num));
// "208" is the size of the metadata of an empty skiplist, this would
// break if we change the default skiplist implementation
ASSERT_EQ(num, "208");
SetPerfLevel(kDisable); SetPerfLevel(kDisable);
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
@ -3481,6 +3489,7 @@ TEST(DBTest, InPlaceUpdateCallbackNoAction) {
TEST(DBTest, CompactionFilter) { TEST(DBTest, CompactionFilter) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_open_files = -1;
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0; options.max_mem_compaction_level = 0;
options.compaction_filter_factory = std::make_shared<KeepFilterFactory>(); options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
@ -3848,9 +3857,11 @@ TEST(DBTest, CompactionFilterV2) {
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0; options.max_mem_compaction_level = 0;
// extract prefix // extract prefix
auto prefix_extractor = NewFixedPrefixTransform(8); std::unique_ptr<const SliceTransform> prefix_extractor;
prefix_extractor.reset(NewFixedPrefixTransform(8));
options.compaction_filter_factory_v2 options.compaction_filter_factory_v2
= std::make_shared<KeepFilterFactoryV2>(prefix_extractor); = std::make_shared<KeepFilterFactoryV2>(prefix_extractor.get());
// In a testing environment, we can only flush the application // In a testing environment, we can only flush the application
// compaction filter buffer using universal compaction // compaction filter buffer using universal compaction
option_config_ = kUniversalCompaction; option_config_ = kUniversalCompaction;
@ -3898,7 +3909,7 @@ TEST(DBTest, CompactionFilterV2) {
// create a new database with the compaction // create a new database with the compaction
// filter in such a way that it deletes all keys // filter in such a way that it deletes all keys
options.compaction_filter_factory_v2 = options.compaction_filter_factory_v2 =
std::make_shared<DeleteFilterFactoryV2>(prefix_extractor); std::make_shared<DeleteFilterFactoryV2>(prefix_extractor.get());
options.create_if_missing = true; options.create_if_missing = true;
DestroyAndReopen(&options); DestroyAndReopen(&options);
@ -3933,9 +3944,10 @@ TEST(DBTest, CompactionFilterV2WithValueChange) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0; options.max_mem_compaction_level = 0;
auto prefix_extractor = NewFixedPrefixTransform(8); std::unique_ptr<const SliceTransform> prefix_extractor;
prefix_extractor.reset(NewFixedPrefixTransform(8));
options.compaction_filter_factory_v2 = options.compaction_filter_factory_v2 =
std::make_shared<ChangeFilterFactoryV2>(prefix_extractor); std::make_shared<ChangeFilterFactoryV2>(prefix_extractor.get());
// In a testing environment, we can only flush the application // In a testing environment, we can only flush the application
// compaction filter buffer using universal compaction // compaction filter buffer using universal compaction
option_config_ = kUniversalCompaction; option_config_ = kUniversalCompaction;
@ -3973,9 +3985,10 @@ TEST(DBTest, CompactionFilterV2NULLPrefix) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0; options.max_mem_compaction_level = 0;
auto prefix_extractor = NewFixedPrefixTransform(8); std::unique_ptr<const SliceTransform> prefix_extractor;
prefix_extractor.reset(NewFixedPrefixTransform(8));
options.compaction_filter_factory_v2 = options.compaction_filter_factory_v2 =
std::make_shared<ChangeFilterFactoryV2>(prefix_extractor); std::make_shared<ChangeFilterFactoryV2>(prefix_extractor.get());
// In a testing environment, we can only flush the application // In a testing environment, we can only flush the application
// compaction filter buffer using universal compaction // compaction filter buffer using universal compaction
option_config_ = kUniversalCompaction; option_config_ = kUniversalCompaction;
@ -4713,6 +4726,7 @@ TEST(DBTest, NoSpace) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
options.paranoid_checks = false;
Reopen(&options); Reopen(&options);
ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("foo", "v1"));
@ -5506,6 +5520,7 @@ TEST(DBTest, ReadCompaction) {
options.filter_policy = nullptr; options.filter_policy = nullptr;
options.block_size = 4096; options.block_size = 4096;
options.no_block_cache = true; options.no_block_cache = true;
options.disable_seek_compaction = false;
CreateAndReopenWithCF({"pikachu"}, &options); CreateAndReopenWithCF({"pikachu"}, &options);

@ -35,6 +35,8 @@ DBPropertyType GetPropertyType(const Slice& property) {
return kCompactionPending; return kCompactionPending;
} else if (in == "background-errors") { } else if (in == "background-errors") {
return kBackgroundErrors; return kBackgroundErrors;
} else if (in == "cur-size-active-mem-table") {
return kCurSizeActiveMemTable;
} }
return kUnknown; return kUnknown;
} }
@ -339,12 +341,14 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
// 0 otherwise, // 0 otherwise,
*value = std::to_string(current->NeedsCompaction() ? 1 : 0); *value = std::to_string(current->NeedsCompaction() ? 1 : 0);
return true; return true;
/////////////
case kBackgroundErrors: case kBackgroundErrors:
// Accumulated number of errors in background flushes or compactions. // Accumulated number of errors in background flushes or compactions.
*value = std::to_string(GetBackgroundErrorCount()); *value = std::to_string(GetBackgroundErrorCount());
return true; return true;
///////// case kCurSizeActiveMemTable:
// Current size of the active memtable
*value = std::to_string(cfd->mem()->ApproximateMemoryUsage());
return true;
default: default:
return false; return false;
} }

@ -21,6 +21,7 @@ class ColumnFamilyData;
namespace rocksdb { namespace rocksdb {
class MemTableList; class MemTableList;
class DBImpl;
enum DBPropertyType { enum DBPropertyType {
kNumFilesAtLevel, // Number of files at a specific level kNumFilesAtLevel, // Number of files at a specific level
@ -33,6 +34,7 @@ enum DBPropertyType {
// 0. // 0.
kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0. kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0.
kBackgroundErrors, // Return accumulated background errors encountered. kBackgroundErrors, // Return accumulated background errors encountered.
kCurSizeActiveMemTable, // Return current size of the active memtable
kUnknown, kUnknown,
}; };

@ -52,6 +52,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
assert(!should_flush_); assert(!should_flush_);
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits, prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
options.bloom_locality,
options.memtable_prefix_bloom_probes)); options.memtable_prefix_bloom_probes));
} }
} }
@ -154,22 +155,24 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator { class MemTableIterator: public Iterator {
public: public:
MemTableIterator(const MemTable& mem, const ReadOptions& options) MemTableIterator(const MemTable& mem, const ReadOptions& options)
: mem_(mem), iter_(), dynamic_prefix_seek_(false), valid_(false) { : bloom_(nullptr),
prefix_extractor_(mem.prefix_extractor_),
iter_(),
valid_(false) {
if (options.prefix) { if (options.prefix) {
iter_.reset(mem_.table_->GetPrefixIterator(*options.prefix)); iter_.reset(mem.table_->GetPrefixIterator(*options.prefix));
} else if (options.prefix_seek) { } else if (options.prefix_seek) {
dynamic_prefix_seek_ = true; bloom_ = mem.prefix_bloom_.get();
iter_.reset(mem_.table_->GetDynamicPrefixIterator()); iter_.reset(mem.table_->GetDynamicPrefixIterator());
} else { } else {
iter_.reset(mem_.table_->GetIterator()); iter_.reset(mem.table_->GetIterator());
} }
} }
virtual bool Valid() const { return valid_; } virtual bool Valid() const { return valid_; }
virtual void Seek(const Slice& k) { virtual void Seek(const Slice& k) {
if (dynamic_prefix_seek_ && mem_.prefix_bloom_ && if (bloom_ != nullptr &&
!mem_.prefix_bloom_->MayContain( !bloom_->MayContain(prefix_extractor_->Transform(ExtractUserKey(k)))) {
mem_.prefix_extractor_->Transform(ExtractUserKey(k)))) {
valid_ = false; valid_ = false;
return; return;
} }
@ -207,9 +210,9 @@ class MemTableIterator: public Iterator {
virtual Status status() const { return Status::OK(); } virtual Status status() const { return Status::OK(); }
private: private:
const MemTable& mem_; DynamicBloom* bloom_;
const SliceTransform* const prefix_extractor_;
std::shared_ptr<MemTableRep::Iterator> iter_; std::shared_ptr<MemTableRep::Iterator> iter_;
bool dynamic_prefix_seek_;
bool valid_; bool valid_;
// No copying allowed // No copying allowed

@ -40,12 +40,12 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
ParseInternalKey(keys_.back(), &orig_ikey); ParseInternalKey(keys_.back(), &orig_ikey);
bool hit_the_next_user_key = false; bool hit_the_next_user_key = false;
ParsedInternalKey ikey;
std::string merge_result; // Temporary value for merge results std::string merge_result; // Temporary value for merge results
if (steps) { if (steps) {
++(*steps); ++(*steps);
} }
for (iter->Next(); iter->Valid(); iter->Next()) { for (iter->Next(); iter->Valid(); iter->Next()) {
ParsedInternalKey ikey;
assert(operands_.size() >= 1); // Should be invariants! assert(operands_.size() >= 1); // Should be invariants!
assert(keys_.size() == operands_.size()); assert(keys_.size() == operands_.size());
@ -194,7 +194,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
if (operands_.size() >= 2 && if (operands_.size() >= 2 &&
operands_.size() >= min_partial_merge_operands_ && operands_.size() >= min_partial_merge_operands_ &&
user_merge_operator_->PartialMergeMulti( user_merge_operator_->PartialMergeMulti(
ikey.user_key, orig_ikey.user_key,
std::deque<Slice>(operands_.begin(), operands_.end()), std::deque<Slice>(operands_.begin(), operands_.end()),
&merge_result, logger_)) { &merge_result, logger_)) {
// Merging of operands (associative merge) was successful. // Merging of operands (associative merge) was successful.

@ -23,7 +23,7 @@ bool MergeOperator::PartialMergeMulti(const Slice& key,
std::string temp_value; std::string temp_value;
Slice temp_slice(operand_list[0]); Slice temp_slice(operand_list[0]);
for (int i = 1; i < operand_list.size(); ++i) { for (size_t i = 1; i < operand_list.size(); ++i) {
auto& operand = operand_list[i]; auto& operand = operand_list[i];
if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) { if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
return false; return false;

@ -187,17 +187,27 @@ Status TableCache::GetTableProperties(
bool TableCache::PrefixMayMatch(const ReadOptions& options, bool TableCache::PrefixMayMatch(const ReadOptions& options,
const InternalKeyComparator& icomparator, const InternalKeyComparator& icomparator,
uint64_t file_number, uint64_t file_size, const FileMetaData& file_meta,
const Slice& internal_prefix, bool* table_io) { const Slice& internal_prefix, bool* table_io) {
Cache::Handle* handle = nullptr;
Status s = FindTable(storage_options_, icomparator, file_number, file_size,
&handle, table_io);
bool may_match = true; bool may_match = true;
if (s.ok()) { auto table_handle = file_meta.table_reader_handle;
TableReader* t = GetTableReaderFromHandle(handle); if (table_handle == nullptr) {
may_match = t->PrefixMayMatch(internal_prefix); // Need to get table handle from file number
ReleaseHandle(handle); Status s = FindTable(storage_options_, icomparator, file_meta.number,
file_meta.file_size, &table_handle, table_io);
if (!s.ok()) {
return may_match;
}
} }
auto table = GetTableReaderFromHandle(table_handle);
may_match = table->PrefixMayMatch(internal_prefix);
if (file_meta.table_reader_handle == nullptr) {
// Need to release handle if it is generated from here.
ReleaseHandle(table_handle);
}
return may_match; return may_match;
} }

@ -58,7 +58,7 @@ class TableCache {
// the table index or blooms are not in memory, this may cause an I/O // the table index or blooms are not in memory, this may cause an I/O
bool PrefixMayMatch(const ReadOptions& options, bool PrefixMayMatch(const ReadOptions& options,
const InternalKeyComparator& internal_comparator, const InternalKeyComparator& internal_comparator,
uint64_t file_number, uint64_t file_size, const FileMetaData& file_meta,
const Slice& internal_prefix, bool* table_io); const Slice& internal_prefix, bool* table_io);
// Evict any entry for the specified file number // Evict any entry for the specified file number

@ -30,7 +30,6 @@ enum Tag {
// these are new formats divergent from open source leveldb // these are new formats divergent from open source leveldb
kNewFile2 = 100, // store smallest & largest seqno kNewFile2 = 100, // store smallest & largest seqno
kVersionNumber = 101, // manifest version number, available after 2.8
kColumnFamily = 200, // specify column family for version edit kColumnFamily = 200, // specify column family for version edit
kColumnFamilyAdd = 201, kColumnFamilyAdd = 201,
@ -39,7 +38,6 @@ enum Tag {
}; };
void VersionEdit::Clear() { void VersionEdit::Clear() {
version_number_ = 0;
comparator_.clear(); comparator_.clear();
max_level_ = 0; max_level_ = 0;
log_number_ = 0; log_number_ = 0;
@ -47,7 +45,6 @@ void VersionEdit::Clear() {
last_sequence_ = 0; last_sequence_ = 0;
next_file_number_ = 0; next_file_number_ = 0;
max_column_family_ = 0; max_column_family_ = 0;
has_version_number_ = false;
has_comparator_ = false; has_comparator_ = false;
has_log_number_ = false; has_log_number_ = false;
has_prev_log_number_ = false; has_prev_log_number_ = false;
@ -63,10 +60,6 @@ void VersionEdit::Clear() {
} }
void VersionEdit::EncodeTo(std::string* dst) const { void VersionEdit::EncodeTo(std::string* dst) const {
if (has_version_number_) {
PutVarint32(dst, kVersionNumber);
PutVarint32(dst, version_number_);
}
if (has_comparator_) { if (has_comparator_) {
PutVarint32(dst, kComparator); PutVarint32(dst, kComparator);
PutLengthPrefixedSlice(dst, comparator_); PutLengthPrefixedSlice(dst, comparator_);
@ -164,14 +157,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
while (msg == nullptr && GetVarint32(&input, &tag)) { while (msg == nullptr && GetVarint32(&input, &tag)) {
switch (tag) { switch (tag) {
case kVersionNumber:
if (GetVarint32(&input, &version_number_)) {
has_version_number_ = true;
} else {
msg = "version number";
}
break;
case kComparator: case kComparator:
if (GetLengthPrefixedSlice(&input, &str)) { if (GetLengthPrefixedSlice(&input, &str)) {
comparator_ = str.ToString(); comparator_ = str.ToString();

@ -50,10 +50,6 @@ class VersionEdit {
void Clear(); void Clear();
void SetVersionNumber() {
has_version_number_ = true;
version_number_ = kManifestVersion;
}
void SetComparatorName(const Slice& name) { void SetComparatorName(const Slice& name) {
has_comparator_ = true; has_comparator_ = true;
comparator_ = name.ToString(); comparator_ = name.ToString();
@ -147,14 +143,12 @@ class VersionEdit {
bool GetLevel(Slice* input, int* level, const char** msg); bool GetLevel(Slice* input, int* level, const char** msg);
int max_level_; int max_level_;
uint32_t version_number_;
std::string comparator_; std::string comparator_;
uint64_t log_number_; uint64_t log_number_;
uint64_t prev_log_number_; uint64_t prev_log_number_;
uint64_t next_file_number_; uint64_t next_file_number_;
uint32_t max_column_family_; uint32_t max_column_family_;
SequenceNumber last_sequence_; SequenceNumber last_sequence_;
bool has_version_number_;
bool has_comparator_; bool has_comparator_;
bool has_log_number_; bool has_log_number_;
bool has_prev_log_number_; bool has_prev_log_number_;
@ -174,10 +168,6 @@ class VersionEdit {
bool is_column_family_drop_; bool is_column_family_drop_;
bool is_column_family_add_; bool is_column_family_add_;
std::string column_family_name_; std::string column_family_name_;
enum {
kManifestVersion = 1
};
}; };
} // namespace rocksdb } // namespace rocksdb

@ -184,18 +184,14 @@ class Version::LevelFileNumIterator : public Iterator {
} }
Slice value() const { Slice value() const {
assert(Valid()); assert(Valid());
EncodeFixed64(value_buf_, (*flist_)[index_]->number); return Slice(reinterpret_cast<const char*>((*flist_)[index_]),
EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size); sizeof(FileMetaData));
return Slice(value_buf_, sizeof(value_buf_));
} }
virtual Status status() const { return Status::OK(); } virtual Status status() const { return Status::OK(); }
private: private:
const InternalKeyComparator icmp_; const InternalKeyComparator icmp_;
const std::vector<FileMetaData*>* const flist_; const std::vector<FileMetaData*>* const flist_;
uint32_t index_; uint32_t index_;
// Backing store for value(). Holds the file number and size.
mutable char value_buf_[16];
}; };
static Iterator* GetFileIterator(void* arg, const ReadOptions& options, static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
@ -203,7 +199,7 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
const InternalKeyComparator& icomparator, const InternalKeyComparator& icomparator,
const Slice& file_value, bool for_compaction) { const Slice& file_value, bool for_compaction) {
TableCache* cache = reinterpret_cast<TableCache*>(arg); TableCache* cache = reinterpret_cast<TableCache*>(arg);
if (file_value.size() != 16) { if (file_value.size() != sizeof(FileMetaData)) {
return NewErrorIterator( return NewErrorIterator(
Status::Corruption("FileReader invoked with unexpected value")); Status::Corruption("FileReader invoked with unexpected value"));
} else { } else {
@ -214,11 +210,12 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
options_copy = options; options_copy = options;
options_copy.prefix = nullptr; options_copy.prefix = nullptr;
} }
FileMetaData meta(DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8)); const FileMetaData* meta_file =
reinterpret_cast<const FileMetaData*>(file_value.data());
return cache->NewIterator( return cache->NewIterator(
options.prefix ? options_copy : options, soptions, icomparator, meta, options.prefix ? options_copy : options, soptions, icomparator,
nullptr /* don't need reference to table*/, for_compaction); *meta_file, nullptr /* don't need reference to table*/, for_compaction);
} }
} }
@ -237,10 +234,11 @@ bool Version::PrefixMayMatch(const ReadOptions& options,
// key() will always be the biggest value for this SST? // key() will always be the biggest value for this SST?
may_match = true; may_match = true;
} else { } else {
const FileMetaData* meta_file =
reinterpret_cast<const FileMetaData*>(level_iter->value().data());
may_match = cfd_->table_cache()->PrefixMayMatch( may_match = cfd_->table_cache()->PrefixMayMatch(
options, cfd_->internal_comparator(), options, cfd_->internal_comparator(), *meta_file, internal_prefix,
DecodeFixed64(level_iter->value().data()),
DecodeFixed64(level_iter->value().data() + 8), internal_prefix,
nullptr); nullptr);
} }
return may_match; return may_match;
@ -437,17 +435,30 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
return false; return false;
} }
static bool NewestFirst(FileMetaData* a, FileMetaData* b) { namespace {
bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number; return a->number > b->number;
} }
static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
if (a->smallest_seqno > b->smallest_seqno) { if (a->smallest_seqno != b->smallest_seqno) {
assert(a->largest_seqno > b->largest_seqno); return a->smallest_seqno > b->smallest_seqno;
return true;
} }
assert(a->largest_seqno <= b->largest_seqno); if (a->largest_seqno != b->largest_seqno) {
return false; return a->largest_seqno > b->largest_seqno;
}
// Break ties by file number
return NewestFirst(a, b);
}
bool BySmallestKey(FileMetaData* a, FileMetaData* b,
const InternalKeyComparator* cmp) {
int r = cmp->Compare(a->smallest, b->smallest);
if (r != 0) {
return (r < 0);
}
// Break ties by file number
return (a->number < b->number);
} }
} // anonymous namespace
Version::Version(ColumnFamilyData* cfd, VersionSet* vset, Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
uint64_t version_number) uint64_t version_number)
@ -1186,22 +1197,33 @@ struct VersionSet::ManifestWriter {
// Versions that contain full copies of the intermediate state. // Versions that contain full copies of the intermediate state.
class VersionSet::Builder { class VersionSet::Builder {
private: private:
// Helper to sort by v->files_[file_number].smallest // Helper to sort v->files_
struct BySmallestKey { // kLevel0LevelCompaction -- NewestFirst
// kLevel0UniversalCompaction -- NewestFirstBySeqNo
// kLevelNon0 -- BySmallestKey
struct FileComparator {
enum SortMethod {
kLevel0LevelCompaction = 0,
kLevel0UniversalCompaction = 1,
kLevelNon0 = 2,
} sort_method;
const InternalKeyComparator* internal_comparator; const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const { bool operator()(FileMetaData* f1, FileMetaData* f2) const {
int r = internal_comparator->Compare(f1->smallest, f2->smallest); switch (sort_method) {
if (r != 0) { case kLevel0LevelCompaction:
return (r < 0); return NewestFirst(f1, f2);
} else { case kLevel0UniversalCompaction:
// Break ties by file number return NewestFirstBySeqNo(f1, f2);
return (f1->number < f2->number); case kLevelNon0:
return BySmallestKey(f1, f2, internal_comparator);
} }
assert(false);
return false;
} }
}; };
typedef std::set<FileMetaData*, BySmallestKey> FileSet; typedef std::set<FileMetaData*, FileComparator> FileSet;
struct LevelState { struct LevelState {
std::set<uint64_t> deleted_files; std::set<uint64_t> deleted_files;
FileSet* added_files; FileSet* added_files;
@ -1210,15 +1232,23 @@ class VersionSet::Builder {
ColumnFamilyData* cfd_; ColumnFamilyData* cfd_;
Version* base_; Version* base_;
LevelState* levels_; LevelState* levels_;
FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_;
public: public:
Builder(ColumnFamilyData* cfd) : cfd_(cfd), base_(cfd->current()) { Builder(ColumnFamilyData* cfd) : cfd_(cfd), base_(cfd->current()) {
base_->Ref(); base_->Ref();
levels_ = new LevelState[base_->NumberLevels()]; levels_ = new LevelState[base_->NumberLevels()];
BySmallestKey cmp; level_zero_cmp_.sort_method =
cmp.internal_comparator = &cfd_->internal_comparator(); (cfd_->options()->compaction_style == kCompactionStyleUniversal)
for (int level = 0; level < base_->NumberLevels(); level++) { ? FileComparator::kLevel0UniversalCompaction
levels_[level].added_files = new FileSet(cmp); : FileComparator::kLevel0LevelCompaction;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
level_nonzero_cmp_.internal_comparator = &cfd->internal_comparator();
levels_[0].added_files = new FileSet(level_zero_cmp_);
for (int level = 1; level < base_->NumberLevels(); level++) {
levels_[level].added_files = new FileSet(level_nonzero_cmp_);
} }
} }
@ -1251,16 +1281,25 @@ class VersionSet::Builder {
void CheckConsistency(Version* v) { void CheckConsistency(Version* v) {
#ifndef NDEBUG #ifndef NDEBUG
// make sure the files are sorted correctly
for (int level = 0; level < v->NumberLevels(); level++) { for (int level = 0; level < v->NumberLevels(); level++) {
// Make sure there is no overlap in levels > 0 for (size_t i = 1; i < v->files_[level].size(); i++) {
if (level > 0) { auto f1 = v->files_[level][i - 1];
for (uint32_t i = 1; i < v->files_[level].size(); i++) { auto f2 = v->files_[level][i];
const InternalKey& prev_end = v->files_[level][i-1]->largest; if (level == 0) {
const InternalKey& this_begin = v->files_[level][i]->smallest; assert(level_zero_cmp_(f1, f2));
if (cfd_->internal_comparator().Compare(prev_end, this_begin) >= 0) { if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
assert(f1->largest_seqno > f2->largest_seqno);
}
} else {
assert(level_nonzero_cmp_(f1, f2));
// Make sure there is no overlap in levels > 0
if (cfd_->internal_comparator().Compare(f1->largest, f2->smallest) >=
0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(), (f1->largest).DebugString().c_str(),
this_begin.DebugString().c_str()); (f2->smallest).DebugString().c_str());
abort(); abort();
} }
} }
@ -1359,9 +1398,9 @@ class VersionSet::Builder {
void SaveTo(Version* v) { void SaveTo(Version* v) {
CheckConsistency(base_); CheckConsistency(base_);
CheckConsistency(v); CheckConsistency(v);
BySmallestKey cmp;
cmp.internal_comparator = &cfd_->internal_comparator();
for (int level = 0; level < base_->NumberLevels(); level++) { for (int level = 0; level < base_->NumberLevels(); level++) {
const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
// Merge the set of added files with the set of pre-existing files. // Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v. // Drop any deleted files. Store the result in *v.
const auto& base_files = base_->files_[level]; const auto& base_files = base_->files_[level];
@ -1387,13 +1426,6 @@ class VersionSet::Builder {
} }
} }
// TODO(icanadi) do it in the loop above, which already sorts the files
// Pre-sort level0 for Get()
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirstBySeqNo);
} else {
std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirst);
}
CheckConsistency(v); CheckConsistency(v);
} }
@ -1585,6 +1617,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
DescriptorFileName(dbname_, pending_manifest_file_number_), DescriptorFileName(dbname_, pending_manifest_file_number_),
&descriptor_file, env_->OptimizeForManifestWrite(storage_options_)); &descriptor_file, env_->OptimizeForManifestWrite(storage_options_));
if (s.ok()) { if (s.ok()) {
descriptor_file->SetPreallocationBlockSize(
options_->manifest_preallocation_size);
descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); descriptor_log_.reset(new log::Writer(std::move(descriptor_file)));
s = WriteSnapshot(descriptor_log_.get()); s = WriteSnapshot(descriptor_log_.get());
} }
@ -1806,8 +1840,6 @@ Status VersionSet::Recover(
return s; return s;
} }
bool have_version_number = false;
bool log_number_decrease = false;
bool have_log_number = false; bool have_log_number = false;
bool have_prev_log_number = false; bool have_prev_log_number = false;
bool have_next_file = false; bool have_next_file = false;
@ -1924,7 +1956,9 @@ Status VersionSet::Recover(
if (cfd != nullptr) { if (cfd != nullptr) {
if (edit.has_log_number_) { if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) { if (cfd->GetLogNumber() > edit.log_number_) {
log_number_decrease = true; Log(options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
} else { } else {
cfd->SetLogNumber(edit.log_number_); cfd->SetLogNumber(edit.log_number_);
have_log_number = true; have_log_number = true;
@ -1939,10 +1973,6 @@ Status VersionSet::Recover(
} }
} }
if (edit.has_version_number_) {
have_version_number = true;
}
if (edit.has_prev_log_number_) { if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_; prev_log_number = edit.prev_log_number_;
have_prev_log_number = true; have_prev_log_number = true;
@ -1962,23 +1992,6 @@ Status VersionSet::Recover(
have_last_sequence = true; have_last_sequence = true;
} }
} }
if (s.ok() && log_number_decrease) {
// Since release 2.8, version number is added into MANIFEST file.
// Prior release 2.8, a bug in LogAndApply() can cause log_number
// to be smaller than the one from previous edit. To ensure backward
// compatibility, only fail for MANIFEST genearated by release 2.8
// and after.
if (have_version_number) {
s = Status::Corruption(
"MANIFEST corruption - Log numbers in records NOT "
"monotonically increasing");
} else {
Log(options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
}
}
} }
if (s.ok()) { if (s.ok()) {
@ -2402,8 +2415,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// WARNING: This method doesn't hold a mutex!! // WARNING: This method doesn't hold a mutex!!
bool first_record = false;
// This is done without DB mutex lock held, but only within single-threaded // This is done without DB mutex lock held, but only within single-threaded
// LogAndApply. Column family manipulations can only happen within LogAndApply // LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate. // (the same single thread), so we're safe to iterate.
@ -2411,10 +2422,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
{ {
// Store column family info // Store column family info
VersionEdit edit; VersionEdit edit;
if (first_record) {
edit.SetVersionNumber();
first_record = false;
}
if (cfd->GetID() != 0) { if (cfd->GetID() != 0) {
// default column family is always there, // default column family is always there,
// no need to explicitly write it // no need to explicitly write it

@ -139,6 +139,7 @@ class DefaultCompactionFilterFactory : public CompactionFilterFactory {
// //
class CompactionFilterFactoryV2 { class CompactionFilterFactoryV2 {
public: public:
// NOTE: CompactionFilterFactoryV2 will not delete prefix_extractor
explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor) explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor)
: prefix_extractor_(prefix_extractor) { } : prefix_extractor_(prefix_extractor) { }
@ -169,9 +170,8 @@ class CompactionFilterFactoryV2 {
// return any filter // return any filter
class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 { class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 {
public: public:
explicit DefaultCompactionFilterFactoryV2( explicit DefaultCompactionFilterFactoryV2()
const SliceTransform* prefix_extractor) : CompactionFilterFactoryV2(nullptr) { }
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2> virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2( CreateCompactionFilterV2(

@ -127,6 +127,8 @@ struct ColumnFamilyOptions {
// Version TWO of the compaction_filter_factory // Version TWO of the compaction_filter_factory
// It supports rolling compaction // It supports rolling compaction
//
// Default: a factory that doesn't provide any object
std::shared_ptr<CompactionFilterFactoryV2> compaction_filter_factory_v2; std::shared_ptr<CompactionFilterFactoryV2> compaction_filter_factory_v2;
// ------------------- // -------------------
@ -493,6 +495,17 @@ struct ColumnFamilyOptions {
// number of hash probes per key // number of hash probes per key
uint32_t memtable_prefix_bloom_probes; uint32_t memtable_prefix_bloom_probes;
// Control locality of bloom filter probes to improve cache miss rate.
// This option only applies to memtable prefix bloom and plaintable
// prefix bloom. It essentially limits the max number of cache lines each
// bloom filter check can touch.
// This optimization is turned off when set to 0. The number should never
// be greater than number of probes. This option can boost performance
// for in-memory workload but should use with care since it can cause
// higher false positive rate.
// Default: 0
uint32_t bloom_locality;
// Maximum number of successive merge operations on a key in the memtable. // Maximum number of successive merge operations on a key in the memtable.
// //
// When a merge operation is added to the memtable and the maximum number of // When a merge operation is added to the memtable and the maximum number of
@ -538,7 +551,7 @@ struct DBOptions {
// If any of the writes to the database fails (Put, Delete, Merge, Write), // If any of the writes to the database fails (Put, Delete, Merge, Write),
// the database will switch to read-only mode and fail all other // the database will switch to read-only mode and fail all other
// Write operations. // Write operations.
// Default: false // Default: true
bool paranoid_checks; bool paranoid_checks;
// Use the specified object to interact with the environment, // Use the specified object to interact with the environment,
@ -559,7 +572,7 @@ struct DBOptions {
// files opened are always kept open. You can estimate number of files based // files opened are always kept open. You can estimate number of files based
// on target_file_size_base and target_file_size_multiplier for level-based // on target_file_size_base and target_file_size_multiplier for level-based
// compaction. For universal-style compaction, you can usually set it to -1. // compaction. For universal-style compaction, you can usually set it to -1.
// Default: 1000 // Default: 5000
int max_open_files; int max_open_files;
// If non-null, then we should collect metrics about database operations // If non-null, then we should collect metrics about database operations
@ -696,7 +709,7 @@ struct DBOptions {
// Allow the OS to mmap file for reading sst tables. Default: false // Allow the OS to mmap file for reading sst tables. Default: false
bool allow_mmap_reads; bool allow_mmap_reads;
// Allow the OS to mmap file for writing. Default: true // Allow the OS to mmap file for writing. Default: false
bool allow_mmap_writes; bool allow_mmap_writes;
// Disable child process inherit open files. Default: true // Disable child process inherit open files. Default: true

@ -0,0 +1,103 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#include <string>
#include <vector>
#include "utilities/stackable_db.h"
#include "rocksdb/status.h"
namespace rocksdb {
//
// Configurable options needed for setting up a Geo database
//
struct GeoDBOptions {
// Backup info and error messages will be written to info_log
// if non-nullptr.
// Default: nullptr
Logger* info_log;
explicit GeoDBOptions(Logger* _info_log = nullptr):info_log(_info_log) { }
};
//
// A position in the earth's geoid
//
class GeoPosition {
public:
double latitude;
double longitude;
explicit GeoPosition(double la = 0, double lo = 0) :
latitude(la), longitude(lo) {
}
};
//
// Description of an object on the Geoid. It is located by a GPS location,
// and is identified by the id. The value associated with this object is
// an opaque string 'value'. Different objects identified by unique id's
// can have the same gps-location associated with them.
//
class GeoObject {
public:
GeoPosition position;
std::string id;
std::string value;
GeoObject() {}
GeoObject(const GeoPosition& pos, const std::string& i,
const std::string& val) :
position(pos), id(i), value(val) {
}
};
//
// Stack your DB with GeoDB to be able to get geo-spatial support
//
class GeoDB : public StackableDB {
public:
// GeoDBOptions have to be the same as the ones used in a previous
// incarnation of the DB
//
// GeoDB owns the pointer `DB* db` now. You should not delete it or
// use it after the invocation of GeoDB
// GeoDB(DB* db, const GeoDBOptions& options) : StackableDB(db) {}
GeoDB(DB* db, const GeoDBOptions& options) : StackableDB(db) {}
virtual ~GeoDB() {}
// Insert a new object into the location database. The object is
// uniquely identified by the id. If an object with the same id already
// exists in the db, then the old one is overwritten by the new
// object being inserted here.
virtual Status Insert(const GeoObject& object) = 0;
// Retrieve the value of the object located at the specified GPS
// location and is identified by the 'id'.
virtual Status GetByPosition(const GeoPosition& pos,
const Slice& id, std::string* value) = 0;
// Retrieve the value of the object identified by the 'id'. This method
// could be potentially slower than GetByPosition
virtual Status GetById(const Slice& id, GeoObject* object) = 0;
// Delete the specified object
virtual Status Remove(const Slice& id) = 0;
// Returns a list of all items within a circular radius from the
// specified gps location. If 'number_of_values' is specified,
// then this call returns at most that many number of objects.
// The radius is specified in 'meters'.
virtual Status SearchRadial(const GeoPosition& pos,
double radius,
std::vector<GeoObject>* values,
int number_of_values = INT_MAX) = 0;
};
} // namespace rocksdb

@ -0,0 +1,17 @@
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB
NATIVE_INCLUDE = ./include
ROCKSDB_JAR = rocksdbjni.jar
clean:
-find . -name "*.class" -exec rm {} \;
-find . -name "hs*.log" -exec rm {} \;
rm -f $(ROCKSDB_JAR)
java:
javac org/rocksdb/*.java
jar -cf $(ROCKSDB_JAR) org/rocksdb/*.class
javah -d $(NATIVE_INCLUDE) -jni $(NATIVE_JAVA_CLASSES)
sample:
javac -cp $(ROCKSDB_JAR) RocksDBSample.java
java -ea -Djava.library.path=.:../ -cp ".:./*" RocksDBSample /tmp/rocksdbjni/

@ -0,0 +1,79 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
import java.util.*;
import java.lang.*;
import org.rocksdb.*;
import java.io.IOException;
public class RocksDBSample {
static {
System.loadLibrary("rocksdbjni");
}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("usage: RocksDBSample db_path");
return;
}
String db_path = args[0];
System.out.println("RocksDBSample");
try {
RocksDB db = RocksDB.open(db_path);
db.put("hello".getBytes(), "world".getBytes());
byte[] value = db.get("hello".getBytes());
System.out.format("Get('hello') = %s\n",
new String(value));
for (int i = 1; i <= 9; ++i) {
for (int j = 1; j <= 9; ++j) {
db.put(String.format("%dx%d", i, j).getBytes(),
String.format("%d", i * j).getBytes());
}
}
for (int i = 1; i <= 9; ++i) {
for (int j = 1; j <= 9; ++j) {
System.out.format("%s ", new String(db.get(
String.format("%dx%d", i, j).getBytes())));
}
System.out.println("");
}
value = db.get("1x1".getBytes());
assert(value != null);
value = db.get("world".getBytes());
assert(value == null);
byte[] testKey = "asdf".getBytes();
byte[] testValue =
"asdfghjkl;'?><MNBVCXZQWERTYUIOP{+_)(*&^%$#@".getBytes();
db.put(testKey, testValue);
byte[] testResult = db.get(testKey);
assert(testResult != null);
assert(Arrays.equals(testValue, testResult));
assert(new String(testValue).equals(new String(testResult)));
byte[] insufficientArray = new byte[10];
byte[] enoughArray = new byte[50];
int len;
len = db.get(testKey, insufficientArray);
assert(len > insufficientArray.length);
len = db.get("asdfjkl;".getBytes(), enoughArray);
assert(len == RocksDB.NOT_FOUND);
len = db.get(testKey, enoughArray);
assert(len == testValue.length);
try {
db.close();
} catch (IOException e) {
System.err.println(e);
}
} catch (RocksDBException e) {
System.err.println(e);
}
}
}

@ -0,0 +1,103 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import java.lang.*;
import java.util.*;
import java.io.Closeable;
import java.io.IOException;
/**
* A RocksDB is a persistent ordered map from keys to values. It is safe for
* concurrent access from multiple threads without any external synchronization.
* All methods of this class could potentially throw RocksDBException, which
* indicates sth wrong at the rocksdb library side and the call failed.
*/
public class RocksDB implements Closeable {
public static final int NOT_FOUND = -1;
/**
* The factory constructor of RocksDB that opens a RocksDB instance given
* the path to the database.
*
* @param path the path to the rocksdb.
* @param status an out value indicating the status of the Open().
* @return a rocksdb instance on success, null if the specified rocksdb can
* not be opened.
*/
public static RocksDB open(String path) throws RocksDBException {
RocksDB db = new RocksDB();
db.open0(path);
return db;
}
@Override public void close() throws IOException {
if (nativeHandle != 0) {
close0();
}
}
/**
* Set the database entry for "key" to "value".
*
* @param key the specified key to be inserted.
* @param value the value associated with the specified key.
*/
public void put(byte[] key, byte[] value) throws RocksDBException {
put(key, key.length, value, value.length);
}
/**
* Get the value associated with the specified key.
*
* @param key the key to retrieve the value.
* @param value the out-value to receive the retrieved value.
* @return The size of the actual value that matches the specified
* {@code key} in byte. If the return value is greater than the
* length of {@code value}, then it indicates that the size of the
* input buffer {@code value} is insufficient and partial result will
* be returned. RocksDB.NOT_FOUND will be returned if the value not
* found.
*/
public int get(byte[] key, byte[] value) throws RocksDBException {
return get(key, key.length, value, value.length);
}
/**
* The simplified version of get which returns a new byte array storing
* the value associated with the specified input key if any. null will be
* returned if the specified key is not found.
*
* @param key the key retrieve the value.
* @return a byte array storing the value associated with the input key if
* any. null if it does not find the specified key.
*
* @see RocksDBException
*/
public byte[] get(byte[] key) throws RocksDBException {
return get(key, key.length);
}
/**
* Private constructor.
*/
private RocksDB() {
nativeHandle = -1;
}
// native methods
private native void open0(String path) throws RocksDBException;
private native void put(
byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
private native int get(
byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
private native byte[] get(
byte[] key, int keyLen) throws RocksDBException;
private native void close0();
private long nativeHandle;
}

@ -0,0 +1,24 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import java.lang.*;
import java.util.*;
/**
* A RocksDBException encapsulates the error of an operation. This exception
* type is used to describe an internal error from the c++ rocksdb library.
*/
public class RocksDBException extends Exception {
/**
* The private construct used by a set of public static factory method.
*
* @param msg the specified error message.
*/
public RocksDBException(String msg) {
super(msg);
}
}

@ -0,0 +1,81 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This file is designed for caching those frequently used IDs and provide
// efficient portal (i.e, a set of static functions) to access java code
// from c++.
#ifndef JAVA_ROCKSJNI_PORTAL_H_
#define JAVA_ROCKSJNI_PORTAL_H_
#include <jni.h>
#include "rocksdb/db.h"
namespace rocksdb {
// The portal class for org.rocksdb.RocksDB
class RocksDBJni {
public:
// Get the java class id of org.rocksdb.RocksDB.
static jclass getJClass(JNIEnv* env) {
static jclass jclazz = env->FindClass("org/rocksdb/RocksDB");
assert(jclazz != nullptr);
return jclazz;
}
// Get the field id of the member variable of org.rocksdb.RocksDB
// that stores the pointer to rocksdb::DB.
static jfieldID getHandleFieldID(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "nativeHandle", "J");
assert(fid != nullptr);
return fid;
}
// Get the pointer to rocksdb::DB of the specified org.rocksdb.RocksDB.
static rocksdb::DB* getHandle(JNIEnv* env, jobject jdb) {
return reinterpret_cast<rocksdb::DB*>(
env->GetLongField(jdb, getHandleFieldID(env)));
}
// Pass the rocksdb::DB pointer to the java side.
static void setHandle(JNIEnv* env, jobject jdb, rocksdb::DB* db) {
env->SetLongField(
jdb, getHandleFieldID(env),
reinterpret_cast<jlong>(db));
}
};
// The portal class for org.rocksdb.RocksDBException
class RocksDBExceptionJni {
public:
// Get the jclass of org.rocksdb.RocksDBException
static jclass getJClass(JNIEnv* env) {
static jclass jclazz = env->FindClass("org/rocksdb/RocksDBException");
assert(jclazz != nullptr);
return jclazz;
}
// Create and throw a java exception by converting the input
// Status to an RocksDBException.
//
// In case s.ok() is true, then this function will not throw any
// exception.
static void ThrowNew(JNIEnv* env, Status s) {
if (s.ok()) {
return;
}
jstring msg = env->NewStringUTF(s.ToString().c_str());
// get the constructor id of org.rocksdb.RocksDBException
static jmethodID mid = env->GetMethodID(
getJClass(env), "<init>", "(Ljava/lang/String;)V");
assert(mid != nullptr);
env->Throw((jthrowable)env->NewObject(getJClass(env), mid, msg));
}
};
} // namespace rocksdb
#endif // JAVA_ROCKSJNI_PORTAL_H_

@ -0,0 +1,185 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::DB methods from Java side.
#include <stdio.h>
#include <stdlib.h>
#include <jni.h>
#include <string>
#include "include/org_rocksdb_RocksDB.h"
#include "rocksjni/portal.h"
#include "rocksdb/db.h"
/*
* Class: org_rocksdb_RocksDB
* Method: open0
* Signature: (Ljava/lang/String;)V
*/
void Java_org_rocksdb_RocksDB_open0(
JNIEnv* env, jobject java_db, jstring jdb_path) {
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
jboolean isCopy = false;
const char* db_path = env->GetStringUTFChars(jdb_path, &isCopy);
rocksdb::Status s = rocksdb::DB::Open(options, db_path, &db);
env->ReleaseStringUTFChars(jdb_path, db_path);
if (s.ok()) {
rocksdb::RocksDBJni::setHandle(env, java_db, db);
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_RocksDB
* Method: put
* Signature: ([BI[BI)V
*/
void Java_org_rocksdb_RocksDB_put(
JNIEnv* env, jobject jdb,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) {
rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb);
jboolean isCopy;
jbyte* key = env->GetByteArrayElements(jkey, &isCopy);
jbyte* value = env->GetByteArrayElements(jvalue, &isCopy);
rocksdb::Slice key_slice(
reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(
reinterpret_cast<char*>(value), jvalue_len);
rocksdb::Status s = db->Put(
rocksdb::WriteOptions(), key_slice, value_slice);
// trigger java unref on key and value.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT);
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: ([BI)[B
*/
jbyteArray Java_org_rocksdb_RocksDB_get___3BI(
JNIEnv* env, jobject jdb, jbyteArray jkey, jint jkey_len) {
rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb);
jboolean isCopy;
jbyte* key = env->GetByteArrayElements(jkey, &isCopy);
rocksdb::Slice key_slice(
reinterpret_cast<char*>(key), jkey_len);
std::string value;
rocksdb::Status s = db->Get(
rocksdb::ReadOptions(),
key_slice, &value);
// trigger java unref on key.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
if (s.IsNotFound()) {
return nullptr;
}
if (s.ok()) {
jbyteArray jvalue = env->NewByteArray(value.size());
env->SetByteArrayRegion(
jvalue, 0, value.size(),
reinterpret_cast<const jbyte*>(value.c_str()));
return jvalue;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
return nullptr;
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: ([BI[BI)I
*/
jint Java_org_rocksdb_RocksDB_get___3BI_3BI(
JNIEnv* env, jobject jdb,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) {
static const int kNotFound = -1;
static const int kStatusError = -2;
rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb);
jboolean isCopy;
jbyte* key = env->GetByteArrayElements(jkey, &isCopy);
jbyte* value = env->GetByteArrayElements(jvalue, &isCopy);
rocksdb::Slice key_slice(
reinterpret_cast<char*>(key), jkey_len);
// TODO(yhchiang): we might save one memory allocation here by adding
// a DB::Get() function which takes preallocated jbyte* as input.
std::string cvalue;
rocksdb::Status s = db->Get(
rocksdb::ReadOptions(), key_slice, &cvalue);
// trigger java unref on key.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
if (s.IsNotFound()) {
env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT);
return kNotFound;
} else if (!s.ok()) {
// Here since we are throwing a Java exception from c++ side.
// As a result, c++ does not know calling this function will in fact
// throwing an exception. As a result, the execution flow will
// not stop here, and codes after this throw will still be
// executed.
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
// Return a dummy const value to avoid compilation error, although
// java side might not have a chance to get the return value :)
return kStatusError;
}
int cvalue_len = static_cast<int>(cvalue.size());
int length = std::min(jvalue_len, cvalue_len);
memcpy(value, cvalue.c_str(), length);
env->ReleaseByteArrayElements(jvalue, value, JNI_COMMIT);
if (cvalue_len > length) {
return static_cast<jint>(cvalue_len);
}
return length;
}
/*
* Class: org_rocksdb_RocksDB
* Method: close0
* Signature: ()V
*/
void Java_org_rocksdb_RocksDB_close0(
JNIEnv* env, jobject java_db) {
rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, java_db);
delete db;
db = nullptr;
rocksdb::RocksDBJni::setHandle(env, java_db, db);
}

@ -11,6 +11,7 @@
#include <cstdlib> #include <cstdlib>
#include <stdio.h> #include <stdio.h>
#include <assert.h>
#include <string.h> #include <string.h>
#include "util/logging.h" #include "util/logging.h"
@ -45,9 +46,25 @@ Mutex::Mutex(bool adaptive) {
Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); } Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); }
void Mutex::Lock() { PthreadCall("lock", pthread_mutex_lock(&mu_)); } void Mutex::Lock() {
PthreadCall("lock", pthread_mutex_lock(&mu_));
#ifndef NDEBUG
locked_ = true;
#endif
}
void Mutex::Unlock() {
#ifndef NDEBUG
locked_ = false;
#endif
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } void Mutex::AssertHeld() {
#ifndef NDEBUG
assert(locked_);
#endif
}
CondVar::CondVar(Mutex* mu) CondVar::CondVar(Mutex* mu)
: mu_(mu) { : mu_(mu) {
@ -57,7 +74,13 @@ CondVar::CondVar(Mutex* mu)
CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); }
void CondVar::Wait() { void CondVar::Wait() {
#ifndef NDEBUG
mu_->locked_ = false;
#endif
PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_));
#ifndef NDEBUG
mu_->locked_ = true;
#endif
} }
void CondVar::Signal() { void CondVar::Signal() {

@ -97,11 +97,16 @@ class Mutex {
void Lock(); void Lock();
void Unlock(); void Unlock();
void AssertHeld() { } // this will assert if the mutex is not locked
// it does NOT verify that mutex is held by a calling thread
void AssertHeld();
private: private:
friend class CondVar; friend class CondVar;
pthread_mutex_t mu_; pthread_mutex_t mu_;
#ifndef NDEBUG
bool locked_;
#endif
// No copying // No copying
Mutex(const Mutex&); Mutex(const Mutex&);
@ -475,6 +480,8 @@ inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) {
return false; return false;
} }
#define CACHE_LINE_SIZE 64U
} // namespace port } // namespace port
} // namespace rocksdb } // namespace rocksdb

@ -270,7 +270,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) {
if (options_.prefix_extractor != nullptr) { if (options_.prefix_extractor != nullptr) {
uint32_t bloom_total_bits = num_prefixes * kBloomBitsPerKey; uint32_t bloom_total_bits = num_prefixes * kBloomBitsPerKey;
if (bloom_total_bits > 0) { if (bloom_total_bits > 0) {
bloom_.reset(new DynamicBloom(bloom_total_bits)); bloom_.reset(new DynamicBloom(bloom_total_bits, options_.bloom_locality));
} }
} }
@ -388,7 +388,7 @@ Status PlainTableReader::PopulateIndex() {
if (IsTotalOrderMode()) { if (IsTotalOrderMode()) {
uint32_t num_bloom_bits = table_properties_->num_entries * kBloomBitsPerKey; uint32_t num_bloom_bits = table_properties_->num_entries * kBloomBitsPerKey;
if (num_bloom_bits > 0) { if (num_bloom_bits > 0) {
bloom_.reset(new DynamicBloom(num_bloom_bits)); bloom_.reset(new DynamicBloom(num_bloom_bits, options_.bloom_locality));
} }
} }

@ -5,6 +5,9 @@
#include "dynamic_bloom.h" #include "dynamic_bloom.h"
#include <algorithm>
#include "port/port.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "util/hash.h" #include "util/hash.h"
@ -17,20 +20,31 @@ static uint32_t BloomHash(const Slice& key) {
} }
DynamicBloom::DynamicBloom(uint32_t total_bits, DynamicBloom::DynamicBloom(uint32_t total_bits,
uint32_t (*hash_func)(const Slice& key), uint32_t cl_per_block,
uint32_t num_probes) uint32_t num_probes,
: hash_func_(hash_func), uint32_t (*hash_func)(const Slice& key))
kTotalBits((total_bits + 7) / 8 * 8), : kBlocked(cl_per_block > 0),
kNumProbes(num_probes) { kBitsPerBlock(std::min(cl_per_block, num_probes) * CACHE_LINE_SIZE * 8),
assert(hash_func_); kTotalBits((kBlocked ? (total_bits + kBitsPerBlock - 1) / kBitsPerBlock
* kBitsPerBlock :
total_bits + 7) / 8 * 8),
kNumBlocks(kBlocked ? kTotalBits / kBitsPerBlock : 1),
kNumProbes(num_probes),
hash_func_(hash_func == nullptr ? &BloomHash : hash_func) {
assert(kBlocked ? kTotalBits > 0 : kTotalBits >= kBitsPerBlock);
assert(kNumProbes > 0); assert(kNumProbes > 0);
assert(kTotalBits > 0);
data_.reset(new unsigned char[kTotalBits / 8]());
}
DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t sz = kTotalBits / 8;
uint32_t num_probes) if (kBlocked) {
: DynamicBloom(total_bits, &BloomHash, num_probes) { sz += CACHE_LINE_SIZE - 1;
}
raw_ = new unsigned char[sz]();
if (kBlocked && (reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE)) {
data_ = raw_ + CACHE_LINE_SIZE -
reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE;
} else {
data_ = raw_;
}
} }
} // rocksdb } // rocksdb

@ -15,13 +15,17 @@ class Slice;
class DynamicBloom { class DynamicBloom {
public: public:
// total_bits: fixed total bits for the bloom // total_bits: fixed total bits for the bloom
// hash_func: customized hash function
// num_probes: number of hash probes for a single key // num_probes: number of hash probes for a single key
DynamicBloom(uint32_t total_bits, // cl_per_block: block size in cache lines. When this is non-zero, a
uint32_t (*hash_func)(const Slice& key), // query/set is done within a block to improve cache locality.
uint32_t num_probes = 6); // hash_func: customized hash function
explicit DynamicBloom(uint32_t total_bits, uint32_t cl_per_block = 0,
uint32_t num_probes = 6,
uint32_t (*hash_func)(const Slice& key) = nullptr);
explicit DynamicBloom(uint32_t total_bits, uint32_t num_probes = 6); ~DynamicBloom() {
delete[] raw_;
}
// Assuming single threaded access to this function. // Assuming single threaded access to this function.
void Add(const Slice& key); void Add(const Slice& key);
@ -36,10 +40,15 @@ class DynamicBloom {
bool MayContainHash(uint32_t hash); bool MayContainHash(uint32_t hash);
private: private:
uint32_t (*hash_func_)(const Slice& key); const bool kBlocked;
const uint32_t kBitsPerBlock;
const uint32_t kTotalBits; const uint32_t kTotalBits;
const uint32_t kNumBlocks;
const uint32_t kNumProbes; const uint32_t kNumProbes;
std::unique_ptr<unsigned char[]> data_;
uint32_t (*hash_func_)(const Slice& key);
unsigned char* data_;
unsigned char* raw_;
}; };
inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); } inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); }
@ -50,22 +59,42 @@ inline bool DynamicBloom::MayContain(const Slice& key) {
inline bool DynamicBloom::MayContainHash(uint32_t h) { inline bool DynamicBloom::MayContainHash(uint32_t h) {
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (uint32_t i = 0; i < kNumProbes; i++) { if (kBlocked) {
const uint32_t bitpos = h % kTotalBits; uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock;
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { for (uint32_t i = 0; i < kNumProbes; ++i) {
return false; const uint32_t bitpos = b + h % kBitsPerBlock;
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
return false;
}
h += delta;
}
} else {
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = h % kTotalBits;
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
return false;
}
h += delta;
} }
h += delta;
} }
return true; return true;
} }
inline void DynamicBloom::AddHash(uint32_t h) { inline void DynamicBloom::AddHash(uint32_t h) {
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (uint32_t i = 0; i < kNumProbes; i++) { if (kBlocked) {
const uint32_t bitpos = h % kTotalBits; uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock;
data_[bitpos / 8] |= (1 << (bitpos % 8)); for (uint32_t i = 0; i < kNumProbes; ++i) {
h += delta; const uint32_t bitpos = b + h % kBitsPerBlock;
data_[bitpos / 8] |= (1 << (bitpos % 8));
h += delta;
}
} else {
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = h % kTotalBits;
data_[bitpos / 8] |= (1 << (bitpos % 8));
h += delta;
}
} }
} }

@ -3,19 +3,23 @@
// LICENSE file in the root directory of this source tree. An additional grant // LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
#include <algorithm>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "dynamic_bloom.h" #include "dynamic_bloom.h"
#include "port/port.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
#include "util/stop_watch.h"
DEFINE_int32(bits_per_key, 10, ""); DEFINE_int32(bits_per_key, 10, "");
DEFINE_int32(num_probes, 6, ""); DEFINE_int32(num_probes, 6, "");
DEFINE_bool(enable_perf, false, "");
namespace rocksdb { namespace rocksdb {
static Slice Key(int i, char* buffer) { static Slice Key(uint64_t i, char* buffer) {
memcpy(buffer, &i, sizeof(i)); memcpy(buffer, &i, sizeof(i));
return Slice(buffer, sizeof(i)); return Slice(buffer, sizeof(i));
} }
@ -24,36 +28,48 @@ class DynamicBloomTest {
}; };
TEST(DynamicBloomTest, EmptyFilter) { TEST(DynamicBloomTest, EmptyFilter) {
DynamicBloom bloom(100, 2); DynamicBloom bloom1(100, 0, 2);
ASSERT_TRUE(! bloom.MayContain("hello")); ASSERT_TRUE(!bloom1.MayContain("hello"));
ASSERT_TRUE(! bloom.MayContain("world")); ASSERT_TRUE(!bloom1.MayContain("world"));
DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
ASSERT_TRUE(!bloom2.MayContain("hello"));
ASSERT_TRUE(!bloom2.MayContain("world"));
} }
TEST(DynamicBloomTest, Small) { TEST(DynamicBloomTest, Small) {
DynamicBloom bloom(100, 2); DynamicBloom bloom1(100, 0, 2);
bloom.Add("hello"); bloom1.Add("hello");
bloom.Add("world"); bloom1.Add("world");
ASSERT_TRUE(bloom.MayContain("hello")); ASSERT_TRUE(bloom1.MayContain("hello"));
ASSERT_TRUE(bloom.MayContain("world")); ASSERT_TRUE(bloom1.MayContain("world"));
ASSERT_TRUE(! bloom.MayContain("x")); ASSERT_TRUE(!bloom1.MayContain("x"));
ASSERT_TRUE(! bloom.MayContain("foo")); ASSERT_TRUE(!bloom1.MayContain("foo"));
DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
bloom2.Add("hello");
bloom2.Add("world");
ASSERT_TRUE(bloom2.MayContain("hello"));
ASSERT_TRUE(bloom2.MayContain("world"));
ASSERT_TRUE(!bloom2.MayContain("x"));
ASSERT_TRUE(!bloom2.MayContain("foo"));
} }
static int NextLength(int length) { static uint32_t NextNum(uint32_t num) {
if (length < 10) { if (num < 10) {
length += 1; num += 1;
} else if (length < 100) { } else if (num < 100) {
length += 10; num += 10;
} else if (length < 1000) { } else if (num < 1000) {
length += 100; num += 100;
} else { } else {
length += 1000; num += 1000;
} }
return length; return num;
} }
TEST(DynamicBloomTest, VaryingLengths) { TEST(DynamicBloomTest, VaryingLengths) {
char buffer[sizeof(int)]; char buffer[sizeof(uint64_t)];
// Count number of filters that significantly exceed the false positive rate // Count number of filters that significantly exceed the false positive rate
int mediocre_filters = 0; int mediocre_filters = 0;
@ -62,47 +78,116 @@ TEST(DynamicBloomTest, VaryingLengths) {
fprintf(stderr, "bits_per_key: %d num_probes: %d\n", fprintf(stderr, "bits_per_key: %d num_probes: %d\n",
FLAGS_bits_per_key, FLAGS_num_probes); FLAGS_bits_per_key, FLAGS_num_probes);
for (int length = 1; length <= 10000; length = NextLength(length)) { for (uint32_t cl_per_block = 0; cl_per_block < FLAGS_num_probes;
uint32_t bloom_bits = std::max(length * FLAGS_bits_per_key, 64); ++cl_per_block) {
DynamicBloom bloom(bloom_bits, FLAGS_num_probes); for (uint32_t num = 1; num <= 10000; num = NextNum(num)) {
for (int i = 0; i < length; i++) { uint32_t bloom_bits = 0;
bloom.Add(Key(i, buffer)); if (cl_per_block == 0) {
ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); bloom_bits = std::max(num * FLAGS_bits_per_key, 64U);
} } else {
bloom_bits = std::max(num * FLAGS_bits_per_key,
cl_per_block * CACHE_LINE_SIZE * 8);
}
DynamicBloom bloom(bloom_bits, cl_per_block, FLAGS_num_probes);
for (uint64_t i = 0; i < num; i++) {
bloom.Add(Key(i, buffer));
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)));
}
// All added keys must match // All added keys must match
for (int i = 0; i < length; i++) { for (uint64_t i = 0; i < num; i++) {
ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) ASSERT_TRUE(bloom.MayContain(Key(i, buffer)))
<< "Length " << length << "; key " << i; << "Num " << num << "; key " << i;
} }
// Check false positive rate // Check false positive rate
int result = 0; int result = 0;
for (int i = 0; i < 10000; i++) { for (uint64_t i = 0; i < 10000; i++) {
if (bloom.MayContain(Key(i + 1000000000, buffer))) { if (bloom.MayContain(Key(i + 1000000000, buffer))) {
result++; result++;
}
} }
double rate = result / 10000.0;
fprintf(stderr, "False positives: %5.2f%% @ num = %6u, bloom_bits = %6u, "
"cl per block = %u\n", rate*100.0, num, bloom_bits, cl_per_block);
if (rate > 0.0125)
mediocre_filters++; // Allowed, but not too often
else
good_filters++;
} }
double rate = result / 10000.0;
fprintf(stderr, "False positives: %5.2f%% @ length = %6d ; \n", fprintf(stderr, "Filters: %d good, %d mediocre\n",
rate*100.0, length); good_filters, mediocre_filters);
ASSERT_LE(mediocre_filters, good_filters/5);
}
}
TEST(DynamicBloomTest, perf) {
StopWatchNano timer(Env::Default());
//ASSERT_LE(rate, 0.02); // Must not be over 2% if (!FLAGS_enable_perf) {
if (rate > 0.0125) return;
mediocre_filters++; // Allowed, but not too often
else
good_filters++;
} }
fprintf(stderr, "Filters: %d good, %d mediocre\n", for (uint64_t m = 1; m <= 8; ++m) {
good_filters, mediocre_filters); const uint64_t num_keys = m * 8 * 1024 * 1024;
fprintf(stderr, "testing %luM keys\n", m * 8);
ASSERT_LE(mediocre_filters, good_filters/5); DynamicBloom std_bloom(num_keys * 10, 0, FLAGS_num_probes);
}
// Different bits-per-byte timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
std_bloom.Add(Slice(reinterpret_cast<const char*>(&i), 8));
}
uint64_t elapsed = timer.ElapsedNanos();
fprintf(stderr, "standard bloom, avg add latency %lu\n",
elapsed / num_keys);
uint64_t count = 0;
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
if (std_bloom.MayContain(Slice(reinterpret_cast<const char*>(&i), 8))) {
++count;
}
}
elapsed = timer.ElapsedNanos();
fprintf(stderr, "standard bloom, avg query latency %lu\n",
elapsed / count);
ASSERT_TRUE(count == num_keys);
for (int cl_per_block = 1; cl_per_block <= FLAGS_num_probes;
++cl_per_block) {
DynamicBloom blocked_bloom(num_keys * 10, cl_per_block, FLAGS_num_probes);
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
blocked_bloom.Add(Slice(reinterpret_cast<const char*>(&i), 8));
}
uint64_t elapsed = timer.ElapsedNanos();
fprintf(stderr, "blocked bloom(%d), avg add latency %lu\n",
cl_per_block, elapsed / num_keys);
uint64_t count = 0;
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
if (blocked_bloom.MayContain(
Slice(reinterpret_cast<const char*>(&i), 8))) {
++count;
}
}
elapsed = timer.ElapsedNanos();
fprintf(stderr, "blocked bloom(%d), avg query latency %lu\n",
cl_per_block, elapsed / count);
ASSERT_TRUE(count == num_keys);
}
}
}
} // namespace rocksdb } // namespace rocksdb

@ -1363,7 +1363,10 @@ class PosixEnv : public Env {
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options) const { EnvOptions OptimizeForLogWrite(const EnvOptions& env_options) const {
EnvOptions optimized = env_options; EnvOptions optimized = env_options;
optimized.use_mmap_writes = false; optimized.use_mmap_writes = false;
optimized.fallocate_with_keep_size = false; // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
// breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
// test and make this false
optimized.fallocate_with_keep_size = true;
return optimized; return optimized;
} }

@ -481,9 +481,9 @@ class TestLogger : public Logger {
if (new_format[0] == '[') { if (new_format[0] == '[') {
// "[DEBUG] " // "[DEBUG] "
ASSERT_TRUE(n <= 56 + (512 - sizeof(struct timeval))); ASSERT_TRUE(n <= 56 + (512 - static_cast<int>(sizeof(struct timeval))));
} else { } else {
ASSERT_TRUE(n <= 48 + (512 - sizeof(struct timeval))); ASSERT_TRUE(n <= 48 + (512 - static_cast<int>(sizeof(struct timeval))));
} }
va_end(backup_ap); va_end(backup_ap);
} }

@ -33,7 +33,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
compaction_filter_factory(std::shared_ptr<CompactionFilterFactory>( compaction_filter_factory(std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())), new DefaultCompactionFilterFactory())),
compaction_filter_factory_v2( compaction_filter_factory_v2(
new DefaultCompactionFilterFactoryV2(NewFixedPrefixTransform(8))), new DefaultCompactionFilterFactoryV2()),
write_buffer_size(4 << 20), write_buffer_size(4 << 20),
max_write_buffer_number(2), max_write_buffer_number(2),
min_write_buffer_number_to_merge(1), min_write_buffer_number_to_merge(1),
@ -47,8 +47,8 @@ ColumnFamilyOptions::ColumnFamilyOptions()
whole_key_filtering(true), whole_key_filtering(true),
num_levels(7), num_levels(7),
level0_file_num_compaction_trigger(4), level0_file_num_compaction_trigger(4),
level0_slowdown_writes_trigger(8), level0_slowdown_writes_trigger(20),
level0_stop_writes_trigger(12), level0_stop_writes_trigger(24),
max_mem_compaction_level(2), max_mem_compaction_level(2),
target_file_size_base(2 * 1048576), target_file_size_base(2 * 1048576),
target_file_size_multiplier(1), target_file_size_multiplier(1),
@ -58,7 +58,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
expanded_compaction_factor(25), expanded_compaction_factor(25),
source_compaction_factor(1), source_compaction_factor(1),
max_grandparent_overlap_factor(10), max_grandparent_overlap_factor(10),
disable_seek_compaction(false), disable_seek_compaction(true),
soft_rate_limit(0.0), soft_rate_limit(0.0),
hard_rate_limit(0.0), hard_rate_limit(0.0),
rate_limit_delay_max_milliseconds(1000), rate_limit_delay_max_milliseconds(1000),
@ -151,11 +151,11 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
DBOptions::DBOptions() DBOptions::DBOptions()
: create_if_missing(false), : create_if_missing(false),
error_if_exists(false), error_if_exists(false),
paranoid_checks(false), paranoid_checks(true),
env(Env::Default()), env(Env::Default()),
info_log(nullptr), info_log(nullptr),
info_log_level(INFO), info_log_level(INFO),
max_open_files(1000), max_open_files(5000),
statistics(nullptr), statistics(nullptr),
disableDataSync(false), disableDataSync(false),
use_fsync(false), use_fsync(false),
@ -176,7 +176,7 @@ DBOptions::DBOptions()
manifest_preallocation_size(4 * 1024 * 1024), manifest_preallocation_size(4 * 1024 * 1024),
allow_os_buffer(true), allow_os_buffer(true),
allow_mmap_reads(false), allow_mmap_reads(false),
allow_mmap_writes(true), allow_mmap_writes(false),
is_fd_close_on_exec(true), is_fd_close_on_exec(true),
skip_log_error_on_recovery(false), skip_log_error_on_recovery(false),
stats_dump_period_sec(3600), stats_dump_period_sec(3600),

@ -846,7 +846,7 @@ TEST(BackupableDBTest, RateLimiting) {
auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) /
backupable_options_->backup_rate_limit; backupable_options_->backup_rate_limit;
ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time); ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time);
ASSERT_LT(backup_time, 1.3 * rate_limited_backup_time); ASSERT_LT(backup_time, 1.5 * rate_limited_backup_time);
CloseBackupableDB(); CloseBackupableDB();
@ -858,7 +858,7 @@ TEST(BackupableDBTest, RateLimiting) {
auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) /
backupable_options_->restore_rate_limit; backupable_options_->restore_rate_limit;
ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time);
ASSERT_LT(restore_time, 1.3 * rate_limited_restore_time); ASSERT_LT(restore_time, 1.5 * rate_limited_restore_time);
AssertBackupConsistency(0, 0, 100000, 100010); AssertBackupConsistency(0, 0, 100000, 100010);
} }

@ -0,0 +1,427 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "utilities/geodb/geodb_impl.h"
#define __STDC_FORMAT_MACROS
#include <vector>
#include <map>
#include <string>
#include <limits>
#include "db/filename.h"
#include "util/coding.h"
//
// There are two types of keys. The first type of key-values
// maps a geo location to the set of object ids and their values.
// Table 1
// key : p + : + $quadkey + : + $id +
// : + $latitude + : + $longitude
// value : value of the object
// This table can be used to find all objects that reside near
// a specified geolocation.
//
// Table 2
// key : 'k' + : + $id
// value: $quadkey
namespace rocksdb {
GeoDBImpl::GeoDBImpl(DB* db, const GeoDBOptions& options) :
GeoDB(db, options), db_(db), options_(options) {
}
GeoDBImpl::~GeoDBImpl() {
}
Status GeoDBImpl::Insert(const GeoObject& obj) {
WriteBatch batch;
// It is possible that this id is already associated with
// with a different position. We first have to remove that
// association before we can insert the new one.
// remove existing object, if it exists
GeoObject old;
Status status = GetById(obj.id, &old);
if (status.ok()) {
assert(obj.id.compare(old.id) == 0);
std::string quadkey = PositionToQuad(old.position, Detail);
std::string key1 = MakeKey1(old.position, old.id, quadkey);
std::string key2 = MakeKey2(old.id);
batch.Delete(Slice(key1));
batch.Delete(Slice(key2));
} else if (status.IsNotFound()) {
// What if another thread is trying to insert the same ID concurrently?
} else {
return status;
}
// insert new object
std::string quadkey = PositionToQuad(obj.position, Detail);
std::string key1 = MakeKey1(obj.position, obj.id, quadkey);
std::string key2 = MakeKey2(obj.id);
batch.Put(Slice(key1), Slice(obj.value));
batch.Put(Slice(key2), Slice(quadkey));
return db_->Write(woptions_, &batch);
}
Status GeoDBImpl::GetByPosition(const GeoPosition& pos,
const Slice& id,
std::string* value) {
std::string quadkey = PositionToQuad(pos, Detail);
std::string key1 = MakeKey1(pos, id, quadkey);
return db_->Get(roptions_, Slice(key1), value);
}
Status GeoDBImpl::GetById(const Slice& id, GeoObject* object) {
Status status;
Slice quadkey;
// create an iterator so that we can get a consistent picture
// of the database.
Iterator* iter = db_->NewIterator(roptions_);
// create key for table2
std::string kt = MakeKey2(id);
Slice key2(kt);
iter->Seek(key2);
if (iter->Valid() && iter->status().ok()) {
if (iter->key().compare(key2) == 0) {
quadkey = iter->value();
}
}
if (quadkey.size() == 0) {
delete iter;
return Status::NotFound(key2);
}
//
// Seek to the quadkey + id prefix
//
std::string prefix = MakeKey1Prefix(quadkey.ToString(), id);
iter->Seek(Slice(prefix));
assert(iter->Valid());
if (!iter->Valid() || !iter->status().ok()) {
delete iter;
return Status::NotFound();
}
// split the key into p + quadkey + id + lat + lon
std::vector<std::string> parts;
Slice key = iter->key();
StringSplit(&parts, key.ToString(), ':');
assert(parts.size() == 5);
assert(parts[0] == "p");
assert(parts[1] == quadkey);
assert(parts[2] == id);
// fill up output parameters
object->position.latitude = atof(parts[3].c_str());
object->position.longitude = atof(parts[4].c_str());
object->id = id.ToString(); // this is redundant
object->value = iter->value().ToString();
delete iter;
return Status::OK();
}
Status GeoDBImpl::Remove(const Slice& id) {
// Read the object from the database
GeoObject obj;
Status status = GetById(id, &obj);
if (!status.ok()) {
return status;
}
// remove the object by atomically deleting it from both tables
std::string quadkey = PositionToQuad(obj.position, Detail);
std::string key1 = MakeKey1(obj.position, obj.id, quadkey);
std::string key2 = MakeKey2(obj.id);
WriteBatch batch;
batch.Delete(Slice(key1));
batch.Delete(Slice(key2));
return db_->Write(woptions_, &batch);
}
Status GeoDBImpl::SearchRadial(const GeoPosition& pos,
double radius,
std::vector<GeoObject>* values,
int number_of_values) {
// Gather all bounding quadkeys
std::vector<std::string> qids;
Status s = searchQuadIds(pos, radius, &qids);
if (!s.ok()) {
return s;
}
// create an iterator
Iterator* iter = db_->NewIterator(ReadOptions());
// Process each prospective quadkey
for (std::string qid : qids) {
// The user is interested in only these many objects.
if (number_of_values == 0) {
break;
}
// convert quadkey to db key prefix
std::string dbkey = MakeQuadKeyPrefix(qid);
for (iter->Seek(dbkey);
number_of_values > 0 && iter->Valid() && iter->status().ok();
iter->Next()) {
// split the key into p + quadkey + id + lat + lon
std::vector<std::string> parts;
Slice key = iter->key();
StringSplit(&parts, key.ToString(), ':');
assert(parts.size() == 5);
assert(parts[0] == "p");
std::string* quadkey = &parts[1];
// If the key we are looking for is a prefix of the key
// we found from the database, then this is one of the keys
// we are looking for.
auto res = std::mismatch(qid.begin(), qid.end(), quadkey->begin());
if (res.first == qid.end()) {
GeoPosition pos(atof(parts[3].c_str()), atof(parts[4].c_str()));
GeoObject obj(pos, parts[4], iter->value().ToString());
values->push_back(obj);
number_of_values--;
} else {
break;
}
}
}
delete iter;
return Status::OK();
}
std::string GeoDBImpl::MakeKey1(const GeoPosition& pos, Slice id,
std::string quadkey) {
std::string lat = std::to_string(pos.latitude);
std::string lon = std::to_string(pos.longitude);
std::string key = "p:";
key.reserve(5 + quadkey.size() + id.size() + lat.size() + lon.size());
key.append(quadkey);
key.append(":");
key.append(id.ToString());
key.append(":");
key.append(lat);
key.append(":");
key.append(lon);
return key;
}
std::string GeoDBImpl::MakeKey2(Slice id) {
std::string key = "k:";
key.append(id.ToString());
return key;
}
std::string GeoDBImpl::MakeKey1Prefix(std::string quadkey,
Slice id) {
std::string key = "p:";
key.reserve(3 + quadkey.size() + id.size());
key.append(quadkey);
key.append(":");
key.append(id.ToString());
return key;
}
std::string GeoDBImpl::MakeQuadKeyPrefix(std::string quadkey) {
std::string key = "p:";
key.append(quadkey);
return key;
}
void GeoDBImpl::StringSplit(std::vector<std::string>* tokens,
const std::string &text, char sep) {
std::size_t start = 0, end = 0;
while ((end = text.find(sep, start)) != std::string::npos) {
tokens->push_back(text.substr(start, end - start));
start = end + 1;
}
tokens->push_back(text.substr(start));
}
// convert degrees to radians
double GeoDBImpl::radians(double x) {
return (x * PI) / 180;
}
// convert radians to degrees
double GeoDBImpl::degrees(double x) {
return (x * 180) / PI;
}
// convert a gps location to quad coordinate
std::string GeoDBImpl::PositionToQuad(const GeoPosition& pos,
int levelOfDetail) {
Pixel p = PositionToPixel(pos, levelOfDetail);
Tile tile = PixelToTile(p);
return TileToQuadKey(tile, levelOfDetail);
}
GeoPosition GeoDBImpl::displaceLatLon(double lat, double lon,
double deltay, double deltax) {
double dLat = deltay / EarthRadius;
double dLon = deltax / (EarthRadius * cos(radians(lat)));
return GeoPosition(lat + degrees(dLat),
lon + degrees(dLon));
}
//
// Return the distance between two positions on the earth
//
double GeoDBImpl::distance(double lat1, double lon1,
double lat2, double lon2) {
double lon = radians(lon2 - lon1);
double lat = radians(lat2 - lat1);
double a = (sin(lat / 2) * sin(lat / 2)) +
cos(radians(lat1)) * cos(radians(lat2)) *
(sin(lon / 2) * sin(lon / 2));
double angle = 2 * atan2(sqrt(a), sqrt(1 - a));
return angle * EarthRadius;
}
//
// Returns all the quadkeys inside the search range
//
Status GeoDBImpl::searchQuadIds(const GeoPosition& position,
double radius,
std::vector<std::string>* quadKeys) {
// get the outline of the search square
GeoPosition topLeftPos = boundingTopLeft(position, radius);
GeoPosition bottomRightPos = boundingBottomRight(position, radius);
Pixel topLeft = PositionToPixel(topLeftPos, Detail);
Pixel bottomRight = PositionToPixel(bottomRightPos, Detail);
// how many level of details to look for
int numberOfTilesAtMaxDepth = floor((bottomRight.x - topLeft.x) / 256);
int zoomLevelsToRise = floor(log(numberOfTilesAtMaxDepth) / log(2));
zoomLevelsToRise++;
int levels = std::max(0, Detail - zoomLevelsToRise);
quadKeys->push_back(PositionToQuad(GeoPosition(topLeftPos.latitude,
topLeftPos.longitude),
levels));
quadKeys->push_back(PositionToQuad(GeoPosition(topLeftPos.latitude,
bottomRightPos.longitude),
levels));
quadKeys->push_back(PositionToQuad(GeoPosition(bottomRightPos.latitude,
topLeftPos.longitude),
levels));
quadKeys->push_back(PositionToQuad(GeoPosition(bottomRightPos.latitude,
bottomRightPos.longitude),
levels));
return Status::OK();
}
// Determines the ground resolution (in meters per pixel) at a specified
// latitude and level of detail.
// Latitude (in degrees) at which to measure the ground resolution.
// Level of detail, from 1 (lowest detail) to 23 (highest detail).
// Returns the ground resolution, in meters per pixel.
double GeoDBImpl::GroundResolution(double latitude, int levelOfDetail) {
latitude = clip(latitude, MinLatitude, MaxLatitude);
return cos(latitude * PI / 180) * 2 * PI * EarthRadius /
MapSize(levelOfDetail);
}
// Converts a point from latitude/longitude WGS-84 coordinates (in degrees)
// into pixel XY coordinates at a specified level of detail.
GeoDBImpl::Pixel GeoDBImpl::PositionToPixel(const GeoPosition& pos,
int levelOfDetail) {
double latitude = clip(pos.latitude, MinLatitude, MaxLatitude);
double x = (pos.longitude + 180) / 360;
double sinLatitude = sin(latitude * PI / 180);
double y = 0.5 - log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * PI);
double mapSize = MapSize(levelOfDetail);
double X = floor(clip(x * mapSize + 0.5, 0, mapSize - 1));
double Y = floor(clip(y * mapSize + 0.5, 0, mapSize - 1));
return Pixel((unsigned int)X, (unsigned int)Y);
}
GeoPosition GeoDBImpl::PixelToPosition(const Pixel& pixel, int levelOfDetail) {
double mapSize = MapSize(levelOfDetail);
double x = (clip(pixel.x, 0, mapSize - 1) / mapSize) - 0.5;
double y = 0.5 - (clip(pixel.y, 0, mapSize - 1) / mapSize);
double latitude = 90 - 360 * atan(exp(-y * 2 * PI)) / PI;
double longitude = 360 * x;
return GeoPosition(latitude, longitude);
}
// Converts a Pixel to a Tile
GeoDBImpl::Tile GeoDBImpl::PixelToTile(const Pixel& pixel) {
unsigned int tileX = floor(pixel.x / 256);
unsigned int tileY = floor(pixel.y / 256);
return Tile(tileX, tileY);
}
GeoDBImpl::Pixel GeoDBImpl::TileToPixel(const Tile& tile) {
unsigned int pixelX = tile.x * 256;
unsigned int pixelY = tile.y * 256;
return Pixel(pixelX, pixelY);
}
// Convert a Tile to a quadkey
std::string GeoDBImpl::TileToQuadKey(const Tile& tile, int levelOfDetail) {
std::stringstream quadKey;
for (int i = levelOfDetail; i > 0; i--) {
char digit = '0';
int mask = 1 << (i - 1);
if ((tile.x & mask) != 0) {
digit++;
}
if ((tile.y & mask) != 0) {
digit++;
digit++;
}
quadKey << digit;
}
return quadKey.str();
}
//
// Convert a quadkey to a tile and its level of detail
//
void GeoDBImpl::QuadKeyToTile(std::string quadkey, Tile* tile,
int *levelOfDetail) {
tile->x = tile->y = 0;
*levelOfDetail = quadkey.size();
const char* key = reinterpret_cast<const char *>(quadkey.c_str());
for (int i = *levelOfDetail; i > 0; i--) {
int mask = 1 << (i - 1);
switch (key[*levelOfDetail - i]) {
case '0':
break;
case '1':
tile->x |= mask;
break;
case '2':
tile->y |= mask;
break;
case '3':
tile->x |= mask;
tile->y |= mask;
break;
default:
std::stringstream msg;
msg << quadkey;
msg << " Invalid QuadKey.";
throw std::runtime_error(msg.str());
}
}
}
} // namespace rocksdb

@ -0,0 +1,187 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#include <algorithm>
#include <cmath>
#include <string>
#include <sstream>
#include <stdexcept>
#include <vector>
#include "utilities/geo_db.h"
#include "utilities/stackable_db.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
namespace rocksdb {
// A specific implementation of GeoDB
class GeoDBImpl : public GeoDB {
public:
GeoDBImpl(DB* db, const GeoDBOptions& options);
~GeoDBImpl();
// Associate the GPS location with the identified by 'id'. The value
// is a blob that is associated with this object.
virtual Status Insert(const GeoObject& object);
// Retrieve the value of the object located at the specified GPS
// location and is identified by the 'id'.
virtual Status GetByPosition(const GeoPosition& pos,
const Slice& id,
std::string* value);
// Retrieve the value of the object identified by the 'id'. This method
// could be potentially slower than GetByPosition
virtual Status GetById(const Slice& id, GeoObject* object);
// Delete the specified object
virtual Status Remove(const Slice& id);
// Returns a list of all items within a circular radius from the
// specified gps location
virtual Status SearchRadial(const GeoPosition& pos,
double radius,
std::vector<GeoObject>* values,
int number_of_values);
private:
DB* db_;
const GeoDBOptions options_;
const WriteOptions woptions_;
const ReadOptions roptions_;
// The value of PI
static constexpr double PI = 3.141592653589793;
// convert degrees to radians
static double radians(double x);
// convert radians to degrees
static double degrees(double x);
// A pixel class that captures X and Y coordinates
class Pixel {
public:
unsigned int x;
unsigned int y;
Pixel(unsigned int a, unsigned int b) :
x(a), y(b) {
}
};
// A Tile in the geoid
class Tile {
public:
unsigned int x;
unsigned int y;
Tile(unsigned int a, unsigned int b) :
x(a), y(b) {
}
};
// convert a gps location to quad coordinate
static std::string PositionToQuad(const GeoPosition& pos, int levelOfDetail);
// arbitrary constant use for WGS84 via
// http://en.wikipedia.org/wiki/World_Geodetic_System
// http://mathforum.org/library/drmath/view/51832.html
// http://msdn.microsoft.com/en-us/library/bb259689.aspx
// http://www.tuicool.com/articles/NBrE73
//
const int Detail = 23;
static constexpr double EarthRadius = 6378137;
static constexpr double MinLatitude = -85.05112878;
static constexpr double MaxLatitude = 85.05112878;
static constexpr double MinLongitude = -180;
static constexpr double MaxLongitude = 180;
// clips a number to the specified minimum and maximum values.
static double clip(double n, double minValue, double maxValue) {
return fmin(fmax(n, minValue), maxValue);
}
// Determines the map width and height (in pixels) at a specified level
// of detail, from 1 (lowest detail) to 23 (highest detail).
// Returns the map width and height in pixels.
static unsigned int MapSize(int levelOfDetail) {
return (unsigned int)(256 << levelOfDetail);
}
// Determines the ground resolution (in meters per pixel) at a specified
// latitude and level of detail.
// Latitude (in degrees) at which to measure the ground resolution.
// Level of detail, from 1 (lowest detail) to 23 (highest detail).
// Returns the ground resolution, in meters per pixel.
static double GroundResolution(double latitude, int levelOfDetail);
// Converts a point from latitude/longitude WGS-84 coordinates (in degrees)
// into pixel XY coordinates at a specified level of detail.
static Pixel PositionToPixel(const GeoPosition& pos, int levelOfDetail);
static GeoPosition PixelToPosition(const Pixel& pixel, int levelOfDetail);
// Converts a Pixel to a Tile
static Tile PixelToTile(const Pixel& pixel);
static Pixel TileToPixel(const Tile& tile);
// Convert a Tile to a quadkey
static std::string TileToQuadKey(const Tile& tile, int levelOfDetail);
// Convert a quadkey to a tile and its level of detail
static void QuadKeyToTile(std::string quadkey, Tile* tile,
int *levelOfDetail);
// Return the distance between two positions on the earth
static double distance(double lat1, double lon1,
double lat2, double lon2);
static GeoPosition displaceLatLon(double lat, double lon,
double deltay, double deltax);
//
// Returns the top left position after applying the delta to
// the specified position
//
static GeoPosition boundingTopLeft(const GeoPosition& in, double radius) {
return displaceLatLon(in.latitude, in.longitude, -radius, -radius);
}
//
// Returns the bottom right position after applying the delta to
// the specified position
static GeoPosition boundingBottomRight(const GeoPosition& in,
double radius) {
return displaceLatLon(in.latitude, in.longitude, radius, radius);
}
//
// Get all quadkeys within a radius of a specified position
//
Status searchQuadIds(const GeoPosition& position,
double radius,
std::vector<std::string>* quadKeys);
// splits a string into its components
static void StringSplit(std::vector<std::string>* tokens,
const std::string &text,
char sep);
//
// Create keys for accessing rocksdb table(s)
//
static std::string MakeKey1(const GeoPosition& pos,
Slice id,
std::string quadkey);
static std::string MakeKey2(Slice id);
static std::string MakeKey1Prefix(std::string quadkey,
Slice id);
static std::string MakeQuadKeyPrefix(std::string quadkey);
};
} // namespace rocksdb

@ -0,0 +1,123 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
//
#include "utilities/geodb/geodb_impl.h"
#include <cctype>
#include "util/testharness.h"
namespace rocksdb {
class GeoDBTest {
public:
static const std::string kDefaultDbName;
static Options options;
DB* db;
GeoDB* geodb;
GeoDBTest() {
GeoDBOptions geodb_options;
ASSERT_OK(DestroyDB(kDefaultDbName, options));
options.create_if_missing = true;
Status status = DB::Open(options, kDefaultDbName, &db);
geodb = new GeoDBImpl(db, geodb_options);
}
~GeoDBTest() {
delete geodb;
}
GeoDB* getdb() {
return geodb;
}
};
const std::string GeoDBTest::kDefaultDbName = "/tmp/geodefault/";
Options GeoDBTest::options = Options();
// Insert, Get and Remove
TEST(GeoDBTest, SimpleTest) {
GeoPosition pos1(100, 101);
std::string id1("id1");
std::string value1("value1");
// insert first object into database
GeoObject obj1(pos1, id1, value1);
Status status = getdb()->Insert(obj1);
ASSERT_TRUE(status.ok());
// insert second object into database
GeoPosition pos2(200, 201);
std::string id2("id2");
std::string value2 = "value2";
GeoObject obj2(pos2, id2, value2);
status = getdb()->Insert(obj2);
ASSERT_TRUE(status.ok());
// retrieve first object using position
std::string value;
status = getdb()->GetByPosition(pos1, Slice(id1), &value);
ASSERT_TRUE(status.ok());
ASSERT_EQ(value, value1);
// retrieve first object using id
GeoObject obj;
status = getdb()->GetById(Slice(id1), &obj);
ASSERT_TRUE(status.ok());
ASSERT_EQ(obj.position.latitude, 100);
ASSERT_EQ(obj.position.longitude, 101);
ASSERT_EQ(obj.id.compare(id1), 0);
ASSERT_EQ(obj.value, value1);
// delete first object
status = getdb()->Remove(Slice(id1));
ASSERT_TRUE(status.ok());
status = getdb()->GetByPosition(pos1, Slice(id1), &value);
ASSERT_TRUE(status.IsNotFound());
status = getdb()->GetById(id1, &obj);
ASSERT_TRUE(status.IsNotFound());
// check that we can still find second object
status = getdb()->GetByPosition(pos2, id2, &value);
ASSERT_TRUE(status.ok());
ASSERT_EQ(value, value2);
status = getdb()->GetById(id2, &obj);
ASSERT_TRUE(status.ok());
}
// Search.
// Verify distances via http://www.stevemorse.org/nearest/distance.php
TEST(GeoDBTest, Search) {
GeoPosition pos1(45, 45);
std::string id1("mid1");
std::string value1 = "midvalue1";
// insert object at 45 degree latitude
GeoObject obj1(pos1, id1, value1);
Status status = getdb()->Insert(obj1);
ASSERT_TRUE(status.ok());
// search all objects centered at 46 degree latitude with
// a radius of 200 kilometers. We should find the one object that
// we inserted earlier.
std::vector<GeoObject> values;
status = getdb()->SearchRadial(GeoPosition(46, 46), 200000, &values);
ASSERT_TRUE(status.ok());
ASSERT_EQ(values.size(), 1);
// search all objects centered at 46 degree latitude with
// a radius of 2 kilometers. There should be none.
values.clear();
status = getdb()->SearchRadial(GeoPosition(46, 46), 2, &values);
ASSERT_TRUE(status.ok());
ASSERT_EQ(values.size(), 0);
}
} // namespace rocksdb
int main(int argc, char* argv[]) {
return rocksdb::test::RunAllTests();
}
Loading…
Cancel
Save