diff --git a/HISTORY.md b/HISTORY.md index 5cb9fc81b..a3348c054 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -16,7 +16,7 @@ * Added "virtual void WaitForJoin()" in class Env. Default operation is no-op. * Removed BackupEngine::DeleteBackupsNewerThan() function * Added new option -- verify_checksums_in_compaction -* Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership) +* Changed Options.prefix_extractor from raw pointer to shared_ptr (take ownership) Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly) * Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools * Added a command "checkconsistency" in ldb tool, which checks @@ -28,6 +28,7 @@ we will ignore it. We assume that writers of these records were interrupted and that we can safely ignore it. * Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. +* Geo-spatial support for locations and radial-search. ## 2.7.0 (01/28/2014) diff --git a/Makefile b/Makefile index 81fd0032a..1e4638593 100644 --- a/Makefile +++ b/Makefile @@ -94,7 +94,8 @@ TESTS = \ write_batch_test\ deletefile_test \ table_test \ - thread_local_test + thread_local_test \ + geodb_test TOOLS = \ sst_dump \ @@ -370,6 +371,9 @@ merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) deletefile_test: db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +geodb_test: utilities/geodb/geodb_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) utilities/geodb/geodb_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + $(MEMENVLIBRARY) : $(MEMENVOBJECTS) rm -f $@ $(AR) -rs $@ $(MEMENVOBJECTS) @@ -398,6 +402,31 @@ sst_dump: tools/sst_dump.o $(LIBOBJECTS) ldb: tools/ldb.o $(LIBOBJECTS) $(CXX) tools/ldb.o $(LIBOBJECTS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +# --------------------------------------------------------------------------- +# Jni stuff +# --------------------------------------------------------------------------- +JNI_NATIVE_SOURCES = ./java/rocksjni/rocksjni.cc + +JAVA_INCLUDE = -I/usr/lib/jvm/java-openjdk/include/ -I/usr/lib/jvm/java-openjdk/include/linux +ROCKSDBJNILIB = ./java/librocksdbjni.so + +ifeq ($(PLATFORM), OS_MACOSX) +ROCKSDBJNILIB = ./java/librocksdbjni.jnilib +JAVA_INCLUDE = -I/System/Library/Frameworks/JavaVM.framework/Headers/ +endif + +jni: clean + OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32 + cd java;$(MAKE) java; + $(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o $(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(LIBOBJECTS) $(LDFLAGS) $(COVERAGEFLAGS) + +jclean: + cd java;$(MAKE) clean; + rm -f $(ROCKSDBJNILIB) + +jtest: + cd java;$(MAKE) sample; + # --------------------------------------------------------------------------- # Platform-specific compilation # --------------------------------------------------------------------------- @@ -461,6 +490,10 @@ depend: $(DEPFILES) # working solution. ifneq ($(MAKECMDGOALS),clean) ifneq ($(MAKECMDGOALS),format) +ifneq ($(MAKECMDGOALS),jclean) +ifneq ($(MAKECMDGOALS),jtest) -include $(DEPFILES) endif endif +endif +endif diff --git a/db/column_family.cc b/db/column_family.cc index 17539c695..ed50d0fef 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -300,12 +300,12 @@ Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, SuperVersion* ColumnFamilyData::InstallSuperVersion( SuperVersion* new_superversion, port::Mutex* db_mutex) { + new_superversion->db_mutex = db_mutex; new_superversion->Init(mem_, imm_.current(), current_); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; ++super_version_number_; super_version_->version_number = super_version_number_; - super_version_->db_mutex = db_mutex; if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex diff --git a/db/column_family_test.cc b/db/column_family_test.cc index fbf5030fc..16e98629f 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -704,6 +704,7 @@ TEST(ColumnFamilyTest, DifferentCompactionStyles) { default_cf.filter_policy = nullptr; default_cf.no_block_cache = true; default_cf.source_compaction_factor = 100; + default_cf.disable_seek_compaction = false; one.compaction_style = kCompactionStyleUniversal; // trigger compaction if there are >= 4 files diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 2e630c11b..f0397b6f5 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -387,7 +387,7 @@ TEST(CorruptionTest, FileSystemStateCorrupted) { DBImpl* dbi = reinterpret_cast(db_); std::vector metadata; dbi->GetLiveFilesMetaData(&metadata); - ASSERT_GT(metadata.size(), 0); + ASSERT_GT(metadata.size(), size_t(0)); std::string filename = dbname_ + metadata[0].name; delete db_; diff --git a/db/db_bench.cc b/db/db_bench.cc index 6d7c0898a..14d886f5c 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -134,6 +134,8 @@ DEFINE_int64(read_range, 1, "When ==1 reads use ::Get, when >1 reads use" DEFINE_bool(use_prefix_blooms, false, "Whether to place prefixes in blooms"); +DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality"); + DEFINE_bool(use_prefix_api, false, "Whether to set ReadOptions.prefix for" " prefixscanrandom. If true, use_prefix_blooms must also be true."); @@ -1543,6 +1545,7 @@ class Benchmark { NewFixedPrefixTransform(FLAGS_prefix_size)); } options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; + options.bloom_locality = FLAGS_bloom_locality; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; options.env = FLAGS_env; @@ -1916,7 +1919,7 @@ class Benchmark { Duration duration(FLAGS_duration, reads_); int64_t found = 0; - + int64_t read = 0; if (FLAGS_use_multiget) { // MultiGet const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group long keys_left = reads_; @@ -1924,6 +1927,7 @@ class Benchmark { // Recalculate number of keys per group, and call MultiGet until done long num_keys; while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { + read += num_keys; found += MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ""); thread->stats.FinishedSingleOp(db_); @@ -1937,8 +1941,9 @@ class Benchmark { std::string key = GenerateKeyFromInt(k, FLAGS_num); iter->Seek(key); + read++; if (iter->Valid() && iter->key().compare(Slice(key)) == 0) { - ++found; + found++; } thread->stats.FinishedSingleOp(db_); @@ -1957,6 +1962,7 @@ class Benchmark { } if (FLAGS_read_range < 2) { + read++; if (db_->Get(options, key, &value).ok()) { found++; } @@ -1972,6 +1978,7 @@ class Benchmark { db_->GetApproximateSizes(&range, 1, &sizes); } + read += FLAGS_read_range; for (iter->Seek(key); iter->Valid() && count <= FLAGS_read_range; ++count, iter->Next()) { @@ -1992,7 +1999,7 @@ class Benchmark { char msg[100]; snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", - found, reads_); + found, read); thread->stats.AddMessage(msg); diff --git a/db/db_impl.cc b/db/db_impl.cc index 00b5b0a52..357d67a2b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -70,7 +70,6 @@ namespace rocksdb { const std::string default_column_family_name("default"); -const std::string kNullString = "NULL"; void DumpLeveldbBuildVersion(Logger * log); @@ -466,7 +465,6 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() { Status DBImpl::NewDB() { VersionEdit new_db; - new_db.SetVersionNumber(); new_db.SetLogNumber(0); new_db.SetNextFile(2); new_db.SetLastSequence(0); @@ -2826,7 +2824,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, assert(compact); compact->CleanupBatchBuffer(); compact->CleanupMergedBuffer(); - compact->cur_prefix_ = kNullString; bool prefix_initialized = false; int64_t imm_micros = 0; // Micros spent doing imm_ compactions diff --git a/db/db_impl.h b/db/db_impl.h index e8b22774d..d754a6e51 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -269,6 +269,7 @@ class DBImpl : public DB { private: friend class DB; + friend class InternalStats; friend class TailingIterator; friend struct SuperVersion; struct CompactionState; diff --git a/db/db_test.cc b/db/db_test.cc index 3a5129898..e0112348c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -432,6 +432,7 @@ class DBTest { options.compaction_style = kCompactionStyleUniversal; break; case kCompressedBlockCache: + options.allow_mmap_writes = true; options.block_cache_compressed = NewLRUCache(8*1024*1024); break; case kInfiniteMaxOpenFiles: @@ -2185,6 +2186,8 @@ TEST(DBTest, NumImmutableMemTable) { ASSERT_EQ(1, (int) perf_context.get_from_memtable_count); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "k3", big_value)); + ASSERT_TRUE(dbfull()->GetProperty( + handles_[1], "rocksdb.cur-size-active-mem-table", &num)); ASSERT_TRUE(dbfull()->GetProperty(handles_[1], "rocksdb.num-immutable-mem-table", &num)); ASSERT_EQ(num, "2"); @@ -2202,6 +2205,11 @@ TEST(DBTest, NumImmutableMemTable) { ASSERT_TRUE(dbfull()->GetProperty(handles_[1], "rocksdb.num-immutable-mem-table", &num)); ASSERT_EQ(num, "0"); + ASSERT_TRUE(dbfull()->GetProperty( + handles_[1], "rocksdb.cur-size-active-mem-table", &num)); + // "208" is the size of the metadata of an empty skiplist, this would + // break if we change the default skiplist implementation + ASSERT_EQ(num, "208"); SetPerfLevel(kDisable); } while (ChangeCompactOptions()); } @@ -3481,6 +3489,7 @@ TEST(DBTest, InPlaceUpdateCallbackNoAction) { TEST(DBTest, CompactionFilter) { Options options = CurrentOptions(); + options.max_open_files = -1; options.num_levels = 3; options.max_mem_compaction_level = 0; options.compaction_filter_factory = std::make_shared(); @@ -3848,9 +3857,11 @@ TEST(DBTest, CompactionFilterV2) { options.num_levels = 3; options.max_mem_compaction_level = 0; // extract prefix - auto prefix_extractor = NewFixedPrefixTransform(8); + std::unique_ptr prefix_extractor; + prefix_extractor.reset(NewFixedPrefixTransform(8)); + options.compaction_filter_factory_v2 - = std::make_shared(prefix_extractor); + = std::make_shared(prefix_extractor.get()); // In a testing environment, we can only flush the application // compaction filter buffer using universal compaction option_config_ = kUniversalCompaction; @@ -3898,7 +3909,7 @@ TEST(DBTest, CompactionFilterV2) { // create a new database with the compaction // filter in such a way that it deletes all keys options.compaction_filter_factory_v2 = - std::make_shared(prefix_extractor); + std::make_shared(prefix_extractor.get()); options.create_if_missing = true; DestroyAndReopen(&options); @@ -3933,9 +3944,10 @@ TEST(DBTest, CompactionFilterV2WithValueChange) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - auto prefix_extractor = NewFixedPrefixTransform(8); + std::unique_ptr prefix_extractor; + prefix_extractor.reset(NewFixedPrefixTransform(8)); options.compaction_filter_factory_v2 = - std::make_shared(prefix_extractor); + std::make_shared(prefix_extractor.get()); // In a testing environment, we can only flush the application // compaction filter buffer using universal compaction option_config_ = kUniversalCompaction; @@ -3973,9 +3985,10 @@ TEST(DBTest, CompactionFilterV2NULLPrefix) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - auto prefix_extractor = NewFixedPrefixTransform(8); + std::unique_ptr prefix_extractor; + prefix_extractor.reset(NewFixedPrefixTransform(8)); options.compaction_filter_factory_v2 = - std::make_shared(prefix_extractor); + std::make_shared(prefix_extractor.get()); // In a testing environment, we can only flush the application // compaction filter buffer using universal compaction option_config_ = kUniversalCompaction; @@ -4713,6 +4726,7 @@ TEST(DBTest, NoSpace) { do { Options options = CurrentOptions(); options.env = env_; + options.paranoid_checks = false; Reopen(&options); ASSERT_OK(Put("foo", "v1")); @@ -5506,6 +5520,7 @@ TEST(DBTest, ReadCompaction) { options.filter_policy = nullptr; options.block_size = 4096; options.no_block_cache = true; + options.disable_seek_compaction = false; CreateAndReopenWithCF({"pikachu"}, &options); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 2988c88b2..fb5e9b229 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -35,6 +35,8 @@ DBPropertyType GetPropertyType(const Slice& property) { return kCompactionPending; } else if (in == "background-errors") { return kBackgroundErrors; + } else if (in == "cur-size-active-mem-table") { + return kCurSizeActiveMemTable; } return kUnknown; } @@ -339,12 +341,14 @@ bool InternalStats::GetProperty(DBPropertyType property_type, // 0 otherwise, *value = std::to_string(current->NeedsCompaction() ? 1 : 0); return true; - ///////////// case kBackgroundErrors: // Accumulated number of errors in background flushes or compactions. *value = std::to_string(GetBackgroundErrorCount()); return true; - ///////// + case kCurSizeActiveMemTable: + // Current size of the active memtable + *value = std::to_string(cfd->mem()->ApproximateMemoryUsage()); + return true; default: return false; } diff --git a/db/internal_stats.h b/db/internal_stats.h index 5922c6c81..616b6cc0d 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -21,6 +21,7 @@ class ColumnFamilyData; namespace rocksdb { class MemTableList; +class DBImpl; enum DBPropertyType { kNumFilesAtLevel, // Number of files at a specific level @@ -33,6 +34,7 @@ enum DBPropertyType { // 0. kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0. kBackgroundErrors, // Return accumulated background errors encountered. + kCurSizeActiveMemTable, // Return current size of the active memtable kUnknown, }; diff --git a/db/memtable.cc b/db/memtable.cc index 515f0588d..d8ca68c6d 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -52,6 +52,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, assert(!should_flush_); if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits, + options.bloom_locality, options.memtable_prefix_bloom_probes)); } } @@ -154,22 +155,24 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: MemTableIterator(const MemTable& mem, const ReadOptions& options) - : mem_(mem), iter_(), dynamic_prefix_seek_(false), valid_(false) { + : bloom_(nullptr), + prefix_extractor_(mem.prefix_extractor_), + iter_(), + valid_(false) { if (options.prefix) { - iter_.reset(mem_.table_->GetPrefixIterator(*options.prefix)); + iter_.reset(mem.table_->GetPrefixIterator(*options.prefix)); } else if (options.prefix_seek) { - dynamic_prefix_seek_ = true; - iter_.reset(mem_.table_->GetDynamicPrefixIterator()); + bloom_ = mem.prefix_bloom_.get(); + iter_.reset(mem.table_->GetDynamicPrefixIterator()); } else { - iter_.reset(mem_.table_->GetIterator()); + iter_.reset(mem.table_->GetIterator()); } } virtual bool Valid() const { return valid_; } virtual void Seek(const Slice& k) { - if (dynamic_prefix_seek_ && mem_.prefix_bloom_ && - !mem_.prefix_bloom_->MayContain( - mem_.prefix_extractor_->Transform(ExtractUserKey(k)))) { + if (bloom_ != nullptr && + !bloom_->MayContain(prefix_extractor_->Transform(ExtractUserKey(k)))) { valid_ = false; return; } @@ -207,9 +210,9 @@ class MemTableIterator: public Iterator { virtual Status status() const { return Status::OK(); } private: - const MemTable& mem_; + DynamicBloom* bloom_; + const SliceTransform* const prefix_extractor_; std::shared_ptr iter_; - bool dynamic_prefix_seek_; bool valid_; // No copying allowed diff --git a/db/merge_helper.cc b/db/merge_helper.cc index f5244498d..0e36f6ae0 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -40,12 +40,12 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, ParseInternalKey(keys_.back(), &orig_ikey); bool hit_the_next_user_key = false; - ParsedInternalKey ikey; std::string merge_result; // Temporary value for merge results if (steps) { ++(*steps); } for (iter->Next(); iter->Valid(); iter->Next()) { + ParsedInternalKey ikey; assert(operands_.size() >= 1); // Should be invariants! assert(keys_.size() == operands_.size()); @@ -194,7 +194,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, if (operands_.size() >= 2 && operands_.size() >= min_partial_merge_operands_ && user_merge_operator_->PartialMergeMulti( - ikey.user_key, + orig_ikey.user_key, std::deque(operands_.begin(), operands_.end()), &merge_result, logger_)) { // Merging of operands (associative merge) was successful. diff --git a/db/merge_operator.cc b/db/merge_operator.cc index a01d389e9..a14df8a87 100644 --- a/db/merge_operator.cc +++ b/db/merge_operator.cc @@ -23,7 +23,7 @@ bool MergeOperator::PartialMergeMulti(const Slice& key, std::string temp_value; Slice temp_slice(operand_list[0]); - for (int i = 1; i < operand_list.size(); ++i) { + for (size_t i = 1; i < operand_list.size(); ++i) { auto& operand = operand_list[i]; if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) { return false; diff --git a/db/table_cache.cc b/db/table_cache.cc index c03ab5e1a..36168d109 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -187,17 +187,27 @@ Status TableCache::GetTableProperties( bool TableCache::PrefixMayMatch(const ReadOptions& options, const InternalKeyComparator& icomparator, - uint64_t file_number, uint64_t file_size, + const FileMetaData& file_meta, const Slice& internal_prefix, bool* table_io) { - Cache::Handle* handle = nullptr; - Status s = FindTable(storage_options_, icomparator, file_number, file_size, - &handle, table_io); bool may_match = true; - if (s.ok()) { - TableReader* t = GetTableReaderFromHandle(handle); - may_match = t->PrefixMayMatch(internal_prefix); - ReleaseHandle(handle); + auto table_handle = file_meta.table_reader_handle; + if (table_handle == nullptr) { + // Need to get table handle from file number + Status s = FindTable(storage_options_, icomparator, file_meta.number, + file_meta.file_size, &table_handle, table_io); + if (!s.ok()) { + return may_match; + } } + + auto table = GetTableReaderFromHandle(table_handle); + may_match = table->PrefixMayMatch(internal_prefix); + + if (file_meta.table_reader_handle == nullptr) { + // Need to release handle if it is generated from here. + ReleaseHandle(table_handle); + } + return may_match; } diff --git a/db/table_cache.h b/db/table_cache.h index 865c4bec2..02063bdfd 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -58,7 +58,7 @@ class TableCache { // the table index or blooms are not in memory, this may cause an I/O bool PrefixMayMatch(const ReadOptions& options, const InternalKeyComparator& internal_comparator, - uint64_t file_number, uint64_t file_size, + const FileMetaData& file_meta, const Slice& internal_prefix, bool* table_io); // Evict any entry for the specified file number diff --git a/db/version_edit.cc b/db/version_edit.cc index 355aaf3cb..24d7f0d9f 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -30,7 +30,6 @@ enum Tag { // these are new formats divergent from open source leveldb kNewFile2 = 100, // store smallest & largest seqno - kVersionNumber = 101, // manifest version number, available after 2.8 kColumnFamily = 200, // specify column family for version edit kColumnFamilyAdd = 201, @@ -39,7 +38,6 @@ enum Tag { }; void VersionEdit::Clear() { - version_number_ = 0; comparator_.clear(); max_level_ = 0; log_number_ = 0; @@ -47,7 +45,6 @@ void VersionEdit::Clear() { last_sequence_ = 0; next_file_number_ = 0; max_column_family_ = 0; - has_version_number_ = false; has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; @@ -63,10 +60,6 @@ void VersionEdit::Clear() { } void VersionEdit::EncodeTo(std::string* dst) const { - if (has_version_number_) { - PutVarint32(dst, kVersionNumber); - PutVarint32(dst, version_number_); - } if (has_comparator_) { PutVarint32(dst, kComparator); PutLengthPrefixedSlice(dst, comparator_); @@ -164,14 +157,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) { while (msg == nullptr && GetVarint32(&input, &tag)) { switch (tag) { - case kVersionNumber: - if (GetVarint32(&input, &version_number_)) { - has_version_number_ = true; - } else { - msg = "version number"; - } - break; - case kComparator: if (GetLengthPrefixedSlice(&input, &str)) { comparator_ = str.ToString(); diff --git a/db/version_edit.h b/db/version_edit.h index 561a891d2..98731cfb2 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -50,10 +50,6 @@ class VersionEdit { void Clear(); - void SetVersionNumber() { - has_version_number_ = true; - version_number_ = kManifestVersion; - } void SetComparatorName(const Slice& name) { has_comparator_ = true; comparator_ = name.ToString(); @@ -147,14 +143,12 @@ class VersionEdit { bool GetLevel(Slice* input, int* level, const char** msg); int max_level_; - uint32_t version_number_; std::string comparator_; uint64_t log_number_; uint64_t prev_log_number_; uint64_t next_file_number_; uint32_t max_column_family_; SequenceNumber last_sequence_; - bool has_version_number_; bool has_comparator_; bool has_log_number_; bool has_prev_log_number_; @@ -174,10 +168,6 @@ class VersionEdit { bool is_column_family_drop_; bool is_column_family_add_; std::string column_family_name_; - - enum { - kManifestVersion = 1 - }; }; } // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index 6639c1e6b..76d83693f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -184,18 +184,14 @@ class Version::LevelFileNumIterator : public Iterator { } Slice value() const { assert(Valid()); - EncodeFixed64(value_buf_, (*flist_)[index_]->number); - EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size); - return Slice(value_buf_, sizeof(value_buf_)); + return Slice(reinterpret_cast((*flist_)[index_]), + sizeof(FileMetaData)); } virtual Status status() const { return Status::OK(); } private: const InternalKeyComparator icmp_; const std::vector* const flist_; uint32_t index_; - - // Backing store for value(). Holds the file number and size. - mutable char value_buf_[16]; }; static Iterator* GetFileIterator(void* arg, const ReadOptions& options, @@ -203,7 +199,7 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options, const InternalKeyComparator& icomparator, const Slice& file_value, bool for_compaction) { TableCache* cache = reinterpret_cast(arg); - if (file_value.size() != 16) { + if (file_value.size() != sizeof(FileMetaData)) { return NewErrorIterator( Status::Corruption("FileReader invoked with unexpected value")); } else { @@ -214,11 +210,12 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options, options_copy = options; options_copy.prefix = nullptr; } - FileMetaData meta(DecodeFixed64(file_value.data()), - DecodeFixed64(file_value.data() + 8)); + + const FileMetaData* meta_file = + reinterpret_cast(file_value.data()); return cache->NewIterator( - options.prefix ? options_copy : options, soptions, icomparator, meta, - nullptr /* don't need reference to table*/, for_compaction); + options.prefix ? options_copy : options, soptions, icomparator, + *meta_file, nullptr /* don't need reference to table*/, for_compaction); } } @@ -237,10 +234,11 @@ bool Version::PrefixMayMatch(const ReadOptions& options, // key() will always be the biggest value for this SST? may_match = true; } else { + const FileMetaData* meta_file = + reinterpret_cast(level_iter->value().data()); + may_match = cfd_->table_cache()->PrefixMayMatch( - options, cfd_->internal_comparator(), - DecodeFixed64(level_iter->value().data()), - DecodeFixed64(level_iter->value().data() + 8), internal_prefix, + options, cfd_->internal_comparator(), *meta_file, internal_prefix, nullptr); } return may_match; @@ -437,17 +435,30 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, return false; } -static bool NewestFirst(FileMetaData* a, FileMetaData* b) { +namespace { +bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } -static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { - if (a->smallest_seqno > b->smallest_seqno) { - assert(a->largest_seqno > b->largest_seqno); - return true; +bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { + if (a->smallest_seqno != b->smallest_seqno) { + return a->smallest_seqno > b->smallest_seqno; } - assert(a->largest_seqno <= b->largest_seqno); - return false; + if (a->largest_seqno != b->largest_seqno) { + return a->largest_seqno > b->largest_seqno; + } + // Break ties by file number + return NewestFirst(a, b); +} +bool BySmallestKey(FileMetaData* a, FileMetaData* b, + const InternalKeyComparator* cmp) { + int r = cmp->Compare(a->smallest, b->smallest); + if (r != 0) { + return (r < 0); + } + // Break ties by file number + return (a->number < b->number); } +} // anonymous namespace Version::Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number) @@ -1186,22 +1197,33 @@ struct VersionSet::ManifestWriter { // Versions that contain full copies of the intermediate state. class VersionSet::Builder { private: - // Helper to sort by v->files_[file_number].smallest - struct BySmallestKey { + // Helper to sort v->files_ + // kLevel0LevelCompaction -- NewestFirst + // kLevel0UniversalCompaction -- NewestFirstBySeqNo + // kLevelNon0 -- BySmallestKey + struct FileComparator { + enum SortMethod { + kLevel0LevelCompaction = 0, + kLevel0UniversalCompaction = 1, + kLevelNon0 = 2, + } sort_method; const InternalKeyComparator* internal_comparator; bool operator()(FileMetaData* f1, FileMetaData* f2) const { - int r = internal_comparator->Compare(f1->smallest, f2->smallest); - if (r != 0) { - return (r < 0); - } else { - // Break ties by file number - return (f1->number < f2->number); + switch (sort_method) { + case kLevel0LevelCompaction: + return NewestFirst(f1, f2); + case kLevel0UniversalCompaction: + return NewestFirstBySeqNo(f1, f2); + case kLevelNon0: + return BySmallestKey(f1, f2, internal_comparator); } + assert(false); + return false; } }; - typedef std::set FileSet; + typedef std::set FileSet; struct LevelState { std::set deleted_files; FileSet* added_files; @@ -1210,15 +1232,23 @@ class VersionSet::Builder { ColumnFamilyData* cfd_; Version* base_; LevelState* levels_; + FileComparator level_zero_cmp_; + FileComparator level_nonzero_cmp_; public: Builder(ColumnFamilyData* cfd) : cfd_(cfd), base_(cfd->current()) { base_->Ref(); levels_ = new LevelState[base_->NumberLevels()]; - BySmallestKey cmp; - cmp.internal_comparator = &cfd_->internal_comparator(); - for (int level = 0; level < base_->NumberLevels(); level++) { - levels_[level].added_files = new FileSet(cmp); + level_zero_cmp_.sort_method = + (cfd_->options()->compaction_style == kCompactionStyleUniversal) + ? FileComparator::kLevel0UniversalCompaction + : FileComparator::kLevel0LevelCompaction; + level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; + level_nonzero_cmp_.internal_comparator = &cfd->internal_comparator(); + + levels_[0].added_files = new FileSet(level_zero_cmp_); + for (int level = 1; level < base_->NumberLevels(); level++) { + levels_[level].added_files = new FileSet(level_nonzero_cmp_); } } @@ -1251,16 +1281,25 @@ class VersionSet::Builder { void CheckConsistency(Version* v) { #ifndef NDEBUG + // make sure the files are sorted correctly for (int level = 0; level < v->NumberLevels(); level++) { - // Make sure there is no overlap in levels > 0 - if (level > 0) { - for (uint32_t i = 1; i < v->files_[level].size(); i++) { - const InternalKey& prev_end = v->files_[level][i-1]->largest; - const InternalKey& this_begin = v->files_[level][i]->smallest; - if (cfd_->internal_comparator().Compare(prev_end, this_begin) >= 0) { + for (size_t i = 1; i < v->files_[level].size(); i++) { + auto f1 = v->files_[level][i - 1]; + auto f2 = v->files_[level][i]; + if (level == 0) { + assert(level_zero_cmp_(f1, f2)); + if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { + assert(f1->largest_seqno > f2->largest_seqno); + } + } else { + assert(level_nonzero_cmp_(f1, f2)); + + // Make sure there is no overlap in levels > 0 + if (cfd_->internal_comparator().Compare(f1->largest, f2->smallest) >= + 0) { fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", - prev_end.DebugString().c_str(), - this_begin.DebugString().c_str()); + (f1->largest).DebugString().c_str(), + (f2->smallest).DebugString().c_str()); abort(); } } @@ -1359,9 +1398,9 @@ class VersionSet::Builder { void SaveTo(Version* v) { CheckConsistency(base_); CheckConsistency(v); - BySmallestKey cmp; - cmp.internal_comparator = &cfd_->internal_comparator(); + for (int level = 0; level < base_->NumberLevels(); level++) { + const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. const auto& base_files = base_->files_[level]; @@ -1387,13 +1426,6 @@ class VersionSet::Builder { } } - // TODO(icanadi) do it in the loop above, which already sorts the files - // Pre-sort level0 for Get() - if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { - std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirstBySeqNo); - } else { - std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirst); - } CheckConsistency(v); } @@ -1585,6 +1617,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, DescriptorFileName(dbname_, pending_manifest_file_number_), &descriptor_file, env_->OptimizeForManifestWrite(storage_options_)); if (s.ok()) { + descriptor_file->SetPreallocationBlockSize( + options_->manifest_preallocation_size); descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); s = WriteSnapshot(descriptor_log_.get()); } @@ -1806,8 +1840,6 @@ Status VersionSet::Recover( return s; } - bool have_version_number = false; - bool log_number_decrease = false; bool have_log_number = false; bool have_prev_log_number = false; bool have_next_file = false; @@ -1924,7 +1956,9 @@ Status VersionSet::Recover( if (cfd != nullptr) { if (edit.has_log_number_) { if (cfd->GetLogNumber() > edit.log_number_) { - log_number_decrease = true; + Log(options_->info_log, + "MANIFEST corruption detected, but ignored - Log numbers in " + "records NOT monotonically increasing"); } else { cfd->SetLogNumber(edit.log_number_); have_log_number = true; @@ -1939,10 +1973,6 @@ Status VersionSet::Recover( } } - if (edit.has_version_number_) { - have_version_number = true; - } - if (edit.has_prev_log_number_) { prev_log_number = edit.prev_log_number_; have_prev_log_number = true; @@ -1962,23 +1992,6 @@ Status VersionSet::Recover( have_last_sequence = true; } } - - if (s.ok() && log_number_decrease) { - // Since release 2.8, version number is added into MANIFEST file. - // Prior release 2.8, a bug in LogAndApply() can cause log_number - // to be smaller than the one from previous edit. To ensure backward - // compatibility, only fail for MANIFEST genearated by release 2.8 - // and after. - if (have_version_number) { - s = Status::Corruption( - "MANIFEST corruption - Log numbers in records NOT " - "monotonically increasing"); - } else { - Log(options_->info_log, - "MANIFEST corruption detected, but ignored - Log numbers in " - "records NOT monotonically increasing"); - } - } } if (s.ok()) { @@ -2402,8 +2415,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // WARNING: This method doesn't hold a mutex!! - bool first_record = false; - // This is done without DB mutex lock held, but only within single-threaded // LogAndApply. Column family manipulations can only happen within LogAndApply // (the same single thread), so we're safe to iterate. @@ -2411,10 +2422,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { { // Store column family info VersionEdit edit; - if (first_record) { - edit.SetVersionNumber(); - first_record = false; - } if (cfd->GetID() != 0) { // default column family is always there, // no need to explicitly write it diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 9576bf2ca..f54ee620c 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -139,6 +139,7 @@ class DefaultCompactionFilterFactory : public CompactionFilterFactory { // class CompactionFilterFactoryV2 { public: + // NOTE: CompactionFilterFactoryV2 will not delete prefix_extractor explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor) : prefix_extractor_(prefix_extractor) { } @@ -169,9 +170,8 @@ class CompactionFilterFactoryV2 { // return any filter class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 { public: - explicit DefaultCompactionFilterFactoryV2( - const SliceTransform* prefix_extractor) - : CompactionFilterFactoryV2(prefix_extractor) { } + explicit DefaultCompactionFilterFactoryV2() + : CompactionFilterFactoryV2(nullptr) { } virtual std::unique_ptr CreateCompactionFilterV2( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1b80c3fdd..9cfefb8dd 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -127,6 +127,8 @@ struct ColumnFamilyOptions { // Version TWO of the compaction_filter_factory // It supports rolling compaction + // + // Default: a factory that doesn't provide any object std::shared_ptr compaction_filter_factory_v2; // ------------------- @@ -493,6 +495,17 @@ struct ColumnFamilyOptions { // number of hash probes per key uint32_t memtable_prefix_bloom_probes; + // Control locality of bloom filter probes to improve cache miss rate. + // This option only applies to memtable prefix bloom and plaintable + // prefix bloom. It essentially limits the max number of cache lines each + // bloom filter check can touch. + // This optimization is turned off when set to 0. The number should never + // be greater than number of probes. This option can boost performance + // for in-memory workload but should use with care since it can cause + // higher false positive rate. + // Default: 0 + uint32_t bloom_locality; + // Maximum number of successive merge operations on a key in the memtable. // // When a merge operation is added to the memtable and the maximum number of @@ -538,7 +551,7 @@ struct DBOptions { // If any of the writes to the database fails (Put, Delete, Merge, Write), // the database will switch to read-only mode and fail all other // Write operations. - // Default: false + // Default: true bool paranoid_checks; // Use the specified object to interact with the environment, @@ -559,7 +572,7 @@ struct DBOptions { // files opened are always kept open. You can estimate number of files based // on target_file_size_base and target_file_size_multiplier for level-based // compaction. For universal-style compaction, you can usually set it to -1. - // Default: 1000 + // Default: 5000 int max_open_files; // If non-null, then we should collect metrics about database operations @@ -696,7 +709,7 @@ struct DBOptions { // Allow the OS to mmap file for reading sst tables. Default: false bool allow_mmap_reads; - // Allow the OS to mmap file for writing. Default: true + // Allow the OS to mmap file for writing. Default: false bool allow_mmap_writes; // Disable child process inherit open files. Default: true diff --git a/include/utilities/geo_db.h b/include/utilities/geo_db.h new file mode 100644 index 000000000..8b3e44b06 --- /dev/null +++ b/include/utilities/geo_db.h @@ -0,0 +1,103 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// + +#pragma once +#include +#include + +#include "utilities/stackable_db.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +// +// Configurable options needed for setting up a Geo database +// +struct GeoDBOptions { + // Backup info and error messages will be written to info_log + // if non-nullptr. + // Default: nullptr + Logger* info_log; + + explicit GeoDBOptions(Logger* _info_log = nullptr):info_log(_info_log) { } +}; + +// +// A position in the earth's geoid +// +class GeoPosition { + public: + double latitude; + double longitude; + + explicit GeoPosition(double la = 0, double lo = 0) : + latitude(la), longitude(lo) { + } +}; + +// +// Description of an object on the Geoid. It is located by a GPS location, +// and is identified by the id. The value associated with this object is +// an opaque string 'value'. Different objects identified by unique id's +// can have the same gps-location associated with them. +// +class GeoObject { + public: + GeoPosition position; + std::string id; + std::string value; + + GeoObject() {} + + GeoObject(const GeoPosition& pos, const std::string& i, + const std::string& val) : + position(pos), id(i), value(val) { + } +}; + +// +// Stack your DB with GeoDB to be able to get geo-spatial support +// +class GeoDB : public StackableDB { + public: + // GeoDBOptions have to be the same as the ones used in a previous + // incarnation of the DB + // + // GeoDB owns the pointer `DB* db` now. You should not delete it or + // use it after the invocation of GeoDB + // GeoDB(DB* db, const GeoDBOptions& options) : StackableDB(db) {} + GeoDB(DB* db, const GeoDBOptions& options) : StackableDB(db) {} + virtual ~GeoDB() {} + + // Insert a new object into the location database. The object is + // uniquely identified by the id. If an object with the same id already + // exists in the db, then the old one is overwritten by the new + // object being inserted here. + virtual Status Insert(const GeoObject& object) = 0; + + // Retrieve the value of the object located at the specified GPS + // location and is identified by the 'id'. + virtual Status GetByPosition(const GeoPosition& pos, + const Slice& id, std::string* value) = 0; + + // Retrieve the value of the object identified by the 'id'. This method + // could be potentially slower than GetByPosition + virtual Status GetById(const Slice& id, GeoObject* object) = 0; + + // Delete the specified object + virtual Status Remove(const Slice& id) = 0; + + // Returns a list of all items within a circular radius from the + // specified gps location. If 'number_of_values' is specified, + // then this call returns at most that many number of objects. + // The radius is specified in 'meters'. + virtual Status SearchRadial(const GeoPosition& pos, + double radius, + std::vector* values, + int number_of_values = INT_MAX) = 0; +}; + +} // namespace rocksdb diff --git a/java/Makefile b/java/Makefile new file mode 100644 index 000000000..794ec1439 --- /dev/null +++ b/java/Makefile @@ -0,0 +1,17 @@ +NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB +NATIVE_INCLUDE = ./include +ROCKSDB_JAR = rocksdbjni.jar + +clean: + -find . -name "*.class" -exec rm {} \; + -find . -name "hs*.log" -exec rm {} \; + rm -f $(ROCKSDB_JAR) + +java: + javac org/rocksdb/*.java + jar -cf $(ROCKSDB_JAR) org/rocksdb/*.class + javah -d $(NATIVE_INCLUDE) -jni $(NATIVE_JAVA_CLASSES) + +sample: + javac -cp $(ROCKSDB_JAR) RocksDBSample.java + java -ea -Djava.library.path=.:../ -cp ".:./*" RocksDBSample /tmp/rocksdbjni/ diff --git a/java/RocksDBSample.java b/java/RocksDBSample.java new file mode 100644 index 000000000..b574c23e5 --- /dev/null +++ b/java/RocksDBSample.java @@ -0,0 +1,79 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +import java.util.*; +import java.lang.*; +import org.rocksdb.*; +import java.io.IOException; + +public class RocksDBSample { + static { + System.loadLibrary("rocksdbjni"); + } + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("usage: RocksDBSample db_path"); + return; + } + String db_path = args[0]; + + System.out.println("RocksDBSample"); + + try { + RocksDB db = RocksDB.open(db_path); + db.put("hello".getBytes(), "world".getBytes()); + byte[] value = db.get("hello".getBytes()); + System.out.format("Get('hello') = %s\n", + new String(value)); + + for (int i = 1; i <= 9; ++i) { + for (int j = 1; j <= 9; ++j) { + db.put(String.format("%dx%d", i, j).getBytes(), + String.format("%d", i * j).getBytes()); + } + } + + for (int i = 1; i <= 9; ++i) { + for (int j = 1; j <= 9; ++j) { + System.out.format("%s ", new String(db.get( + String.format("%dx%d", i, j).getBytes()))); + } + System.out.println(""); + } + + value = db.get("1x1".getBytes()); + assert(value != null); + value = db.get("world".getBytes()); + assert(value == null); + + byte[] testKey = "asdf".getBytes(); + byte[] testValue = + "asdfghjkl;'?> insufficientArray.length); + len = db.get("asdfjkl;".getBytes(), enoughArray); + assert(len == RocksDB.NOT_FOUND); + len = db.get(testKey, enoughArray); + assert(len == testValue.length); + try { + db.close(); + } catch (IOException e) { + System.err.println(e); + } + } catch (RocksDBException e) { + System.err.println(e); + } + } +} diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java new file mode 100644 index 000000000..7e96eff28 --- /dev/null +++ b/java/org/rocksdb/RocksDB.java @@ -0,0 +1,103 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +import java.lang.*; +import java.util.*; +import java.io.Closeable; +import java.io.IOException; + +/** + * A RocksDB is a persistent ordered map from keys to values. It is safe for + * concurrent access from multiple threads without any external synchronization. + * All methods of this class could potentially throw RocksDBException, which + * indicates sth wrong at the rocksdb library side and the call failed. + */ +public class RocksDB implements Closeable { + public static final int NOT_FOUND = -1; + /** + * The factory constructor of RocksDB that opens a RocksDB instance given + * the path to the database. + * + * @param path the path to the rocksdb. + * @param status an out value indicating the status of the Open(). + * @return a rocksdb instance on success, null if the specified rocksdb can + * not be opened. + */ + public static RocksDB open(String path) throws RocksDBException { + RocksDB db = new RocksDB(); + db.open0(path); + return db; + } + + @Override public void close() throws IOException { + if (nativeHandle != 0) { + close0(); + } + } + + /** + * Set the database entry for "key" to "value". + * + * @param key the specified key to be inserted. + * @param value the value associated with the specified key. + */ + public void put(byte[] key, byte[] value) throws RocksDBException { + put(key, key.length, value, value.length); + } + + /** + * Get the value associated with the specified key. + * + * @param key the key to retrieve the value. + * @param value the out-value to receive the retrieved value. + * @return The size of the actual value that matches the specified + * {@code key} in byte. If the return value is greater than the + * length of {@code value}, then it indicates that the size of the + * input buffer {@code value} is insufficient and partial result will + * be returned. RocksDB.NOT_FOUND will be returned if the value not + * found. + */ + public int get(byte[] key, byte[] value) throws RocksDBException { + return get(key, key.length, value, value.length); + } + + /** + * The simplified version of get which returns a new byte array storing + * the value associated with the specified input key if any. null will be + * returned if the specified key is not found. + * + * @param key the key retrieve the value. + * @return a byte array storing the value associated with the input key if + * any. null if it does not find the specified key. + * + * @see RocksDBException + */ + public byte[] get(byte[] key) throws RocksDBException { + return get(key, key.length); + } + + /** + * Private constructor. + */ + private RocksDB() { + nativeHandle = -1; + } + + // native methods + private native void open0(String path) throws RocksDBException; + private native void put( + byte[] key, int keyLen, + byte[] value, int valueLen) throws RocksDBException; + private native int get( + byte[] key, int keyLen, + byte[] value, int valueLen) throws RocksDBException; + private native byte[] get( + byte[] key, int keyLen) throws RocksDBException; + private native void close0(); + + private long nativeHandle; +} diff --git a/java/org/rocksdb/RocksDBException.java b/java/org/rocksdb/RocksDBException.java new file mode 100644 index 000000000..e426e03ee --- /dev/null +++ b/java/org/rocksdb/RocksDBException.java @@ -0,0 +1,24 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +import java.lang.*; +import java.util.*; + +/** + * A RocksDBException encapsulates the error of an operation. This exception + * type is used to describe an internal error from the c++ rocksdb library. + */ +public class RocksDBException extends Exception { + /** + * The private construct used by a set of public static factory method. + * + * @param msg the specified error message. + */ + public RocksDBException(String msg) { + super(msg); + } +} diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h new file mode 100644 index 000000000..d51ea2059 --- /dev/null +++ b/java/rocksjni/portal.h @@ -0,0 +1,81 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +// This file is designed for caching those frequently used IDs and provide +// efficient portal (i.e, a set of static functions) to access java code +// from c++. + +#ifndef JAVA_ROCKSJNI_PORTAL_H_ +#define JAVA_ROCKSJNI_PORTAL_H_ + +#include +#include "rocksdb/db.h" + +namespace rocksdb { + +// The portal class for org.rocksdb.RocksDB +class RocksDBJni { + public: + // Get the java class id of org.rocksdb.RocksDB. + static jclass getJClass(JNIEnv* env) { + static jclass jclazz = env->FindClass("org/rocksdb/RocksDB"); + assert(jclazz != nullptr); + return jclazz; + } + + // Get the field id of the member variable of org.rocksdb.RocksDB + // that stores the pointer to rocksdb::DB. + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle", "J"); + assert(fid != nullptr); + return fid; + } + + // Get the pointer to rocksdb::DB of the specified org.rocksdb.RocksDB. + static rocksdb::DB* getHandle(JNIEnv* env, jobject jdb) { + return reinterpret_cast( + env->GetLongField(jdb, getHandleFieldID(env))); + } + + // Pass the rocksdb::DB pointer to the java side. + static void setHandle(JNIEnv* env, jobject jdb, rocksdb::DB* db) { + env->SetLongField( + jdb, getHandleFieldID(env), + reinterpret_cast(db)); + } +}; + +// The portal class for org.rocksdb.RocksDBException +class RocksDBExceptionJni { + public: + // Get the jclass of org.rocksdb.RocksDBException + static jclass getJClass(JNIEnv* env) { + static jclass jclazz = env->FindClass("org/rocksdb/RocksDBException"); + assert(jclazz != nullptr); + return jclazz; + } + + // Create and throw a java exception by converting the input + // Status to an RocksDBException. + // + // In case s.ok() is true, then this function will not throw any + // exception. + static void ThrowNew(JNIEnv* env, Status s) { + if (s.ok()) { + return; + } + jstring msg = env->NewStringUTF(s.ToString().c_str()); + // get the constructor id of org.rocksdb.RocksDBException + static jmethodID mid = env->GetMethodID( + getJClass(env), "", "(Ljava/lang/String;)V"); + assert(mid != nullptr); + + env->Throw((jthrowable)env->NewObject(getJClass(env), mid, msg)); + } +}; + +} // namespace rocksdb +#endif // JAVA_ROCKSJNI_PORTAL_H_ diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc new file mode 100644 index 000000000..3ae53834e --- /dev/null +++ b/java/rocksjni/rocksjni.cc @@ -0,0 +1,185 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::DB methods from Java side. + +#include +#include +#include +#include + +#include "include/org_rocksdb_RocksDB.h" +#include "rocksjni/portal.h" +#include "rocksdb/db.h" + +/* + * Class: org_rocksdb_RocksDB + * Method: open0 + * Signature: (Ljava/lang/String;)V + */ +void Java_org_rocksdb_RocksDB_open0( + JNIEnv* env, jobject java_db, jstring jdb_path) { + rocksdb::DB* db; + rocksdb::Options options; + options.create_if_missing = true; + + jboolean isCopy = false; + const char* db_path = env->GetStringUTFChars(jdb_path, &isCopy); + rocksdb::Status s = rocksdb::DB::Open(options, db_path, &db); + env->ReleaseStringUTFChars(jdb_path, db_path); + + if (s.ok()) { + rocksdb::RocksDBJni::setHandle(env, java_db, db); + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: put + * Signature: ([BI[BI)V + */ +void Java_org_rocksdb_RocksDB_put( + JNIEnv* env, jobject jdb, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); + + jboolean isCopy; + jbyte* key = env->GetByteArrayElements(jkey, &isCopy); + jbyte* value = env->GetByteArrayElements(jvalue, &isCopy); + rocksdb::Slice key_slice( + reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice( + reinterpret_cast(value), jvalue_len); + + rocksdb::Status s = db->Put( + rocksdb::WriteOptions(), key_slice, value_slice); + + // trigger java unref on key and value. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); + + if (s.ok()) { + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: get + * Signature: ([BI)[B + */ +jbyteArray Java_org_rocksdb_RocksDB_get___3BI( + JNIEnv* env, jobject jdb, jbyteArray jkey, jint jkey_len) { + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); + + jboolean isCopy; + jbyte* key = env->GetByteArrayElements(jkey, &isCopy); + rocksdb::Slice key_slice( + reinterpret_cast(key), jkey_len); + + std::string value; + rocksdb::Status s = db->Get( + rocksdb::ReadOptions(), + key_slice, &value); + + // trigger java unref on key. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + + if (s.IsNotFound()) { + return nullptr; + } + + if (s.ok()) { + jbyteArray jvalue = env->NewByteArray(value.size()); + env->SetByteArrayRegion( + jvalue, 0, value.size(), + reinterpret_cast(value.c_str())); + return jvalue; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + + return nullptr; +} + +/* + * Class: org_rocksdb_RocksDB + * Method: get + * Signature: ([BI[BI)I + */ +jint Java_org_rocksdb_RocksDB_get___3BI_3BI( + JNIEnv* env, jobject jdb, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + static const int kNotFound = -1; + static const int kStatusError = -2; + + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); + + jboolean isCopy; + jbyte* key = env->GetByteArrayElements(jkey, &isCopy); + jbyte* value = env->GetByteArrayElements(jvalue, &isCopy); + rocksdb::Slice key_slice( + reinterpret_cast(key), jkey_len); + + // TODO(yhchiang): we might save one memory allocation here by adding + // a DB::Get() function which takes preallocated jbyte* as input. + std::string cvalue; + rocksdb::Status s = db->Get( + rocksdb::ReadOptions(), key_slice, &cvalue); + + // trigger java unref on key. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + + if (s.IsNotFound()) { + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); + return kNotFound; + } else if (!s.ok()) { + // Here since we are throwing a Java exception from c++ side. + // As a result, c++ does not know calling this function will in fact + // throwing an exception. As a result, the execution flow will + // not stop here, and codes after this throw will still be + // executed. + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + + // Return a dummy const value to avoid compilation error, although + // java side might not have a chance to get the return value :) + return kStatusError; + } + + int cvalue_len = static_cast(cvalue.size()); + int length = std::min(jvalue_len, cvalue_len); + + memcpy(value, cvalue.c_str(), length); + env->ReleaseByteArrayElements(jvalue, value, JNI_COMMIT); + if (cvalue_len > length) { + return static_cast(cvalue_len); + } + return length; +} + +/* + * Class: org_rocksdb_RocksDB + * Method: close0 + * Signature: ()V + */ +void Java_org_rocksdb_RocksDB_close0( + JNIEnv* env, jobject java_db) { + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, java_db); + delete db; + db = nullptr; + + rocksdb::RocksDBJni::setHandle(env, java_db, db); +} diff --git a/port/port_posix.cc b/port/port_posix.cc index f7025f461..911cebdf2 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include "util/logging.h" @@ -45,9 +46,25 @@ Mutex::Mutex(bool adaptive) { Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); } -void Mutex::Lock() { PthreadCall("lock", pthread_mutex_lock(&mu_)); } +void Mutex::Lock() { + PthreadCall("lock", pthread_mutex_lock(&mu_)); +#ifndef NDEBUG + locked_ = true; +#endif +} + +void Mutex::Unlock() { +#ifndef NDEBUG + locked_ = false; +#endif + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +} -void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } +void Mutex::AssertHeld() { +#ifndef NDEBUG + assert(locked_); +#endif +} CondVar::CondVar(Mutex* mu) : mu_(mu) { @@ -57,7 +74,13 @@ CondVar::CondVar(Mutex* mu) CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } void CondVar::Wait() { +#ifndef NDEBUG + mu_->locked_ = false; +#endif PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); +#ifndef NDEBUG + mu_->locked_ = true; +#endif } void CondVar::Signal() { diff --git a/port/port_posix.h b/port/port_posix.h index aaea0b574..6a7382926 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -97,11 +97,16 @@ class Mutex { void Lock(); void Unlock(); - void AssertHeld() { } + // this will assert if the mutex is not locked + // it does NOT verify that mutex is held by a calling thread + void AssertHeld(); private: friend class CondVar; pthread_mutex_t mu_; +#ifndef NDEBUG + bool locked_; +#endif // No copying Mutex(const Mutex&); @@ -475,6 +480,8 @@ inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) { return false; } +#define CACHE_LINE_SIZE 64U + } // namespace port } // namespace rocksdb diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 46886291e..d521446f8 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -270,7 +270,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) { if (options_.prefix_extractor != nullptr) { uint32_t bloom_total_bits = num_prefixes * kBloomBitsPerKey; if (bloom_total_bits > 0) { - bloom_.reset(new DynamicBloom(bloom_total_bits)); + bloom_.reset(new DynamicBloom(bloom_total_bits, options_.bloom_locality)); } } @@ -388,7 +388,7 @@ Status PlainTableReader::PopulateIndex() { if (IsTotalOrderMode()) { uint32_t num_bloom_bits = table_properties_->num_entries * kBloomBitsPerKey; if (num_bloom_bits > 0) { - bloom_.reset(new DynamicBloom(num_bloom_bits)); + bloom_.reset(new DynamicBloom(num_bloom_bits, options_.bloom_locality)); } } diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index 94df660ef..a4c8e11cb 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -5,6 +5,9 @@ #include "dynamic_bloom.h" +#include + +#include "port/port.h" #include "rocksdb/slice.h" #include "util/hash.h" @@ -17,20 +20,31 @@ static uint32_t BloomHash(const Slice& key) { } DynamicBloom::DynamicBloom(uint32_t total_bits, - uint32_t (*hash_func)(const Slice& key), - uint32_t num_probes) - : hash_func_(hash_func), - kTotalBits((total_bits + 7) / 8 * 8), - kNumProbes(num_probes) { - assert(hash_func_); + uint32_t cl_per_block, + uint32_t num_probes, + uint32_t (*hash_func)(const Slice& key)) + : kBlocked(cl_per_block > 0), + kBitsPerBlock(std::min(cl_per_block, num_probes) * CACHE_LINE_SIZE * 8), + kTotalBits((kBlocked ? (total_bits + kBitsPerBlock - 1) / kBitsPerBlock + * kBitsPerBlock : + total_bits + 7) / 8 * 8), + kNumBlocks(kBlocked ? kTotalBits / kBitsPerBlock : 1), + kNumProbes(num_probes), + hash_func_(hash_func == nullptr ? &BloomHash : hash_func) { + assert(kBlocked ? kTotalBits > 0 : kTotalBits >= kBitsPerBlock); assert(kNumProbes > 0); - assert(kTotalBits > 0); - data_.reset(new unsigned char[kTotalBits / 8]()); -} -DynamicBloom::DynamicBloom(uint32_t total_bits, - uint32_t num_probes) - : DynamicBloom(total_bits, &BloomHash, num_probes) { + uint32_t sz = kTotalBits / 8; + if (kBlocked) { + sz += CACHE_LINE_SIZE - 1; + } + raw_ = new unsigned char[sz](); + if (kBlocked && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { + data_ = raw_ + CACHE_LINE_SIZE - + reinterpret_cast(raw_) % CACHE_LINE_SIZE; + } else { + data_ = raw_; + } } } // rocksdb diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index 0851becbf..efc461cf9 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -15,13 +15,17 @@ class Slice; class DynamicBloom { public: // total_bits: fixed total bits for the bloom - // hash_func: customized hash function // num_probes: number of hash probes for a single key - DynamicBloom(uint32_t total_bits, - uint32_t (*hash_func)(const Slice& key), - uint32_t num_probes = 6); + // cl_per_block: block size in cache lines. When this is non-zero, a + // query/set is done within a block to improve cache locality. + // hash_func: customized hash function + explicit DynamicBloom(uint32_t total_bits, uint32_t cl_per_block = 0, + uint32_t num_probes = 6, + uint32_t (*hash_func)(const Slice& key) = nullptr); - explicit DynamicBloom(uint32_t total_bits, uint32_t num_probes = 6); + ~DynamicBloom() { + delete[] raw_; + } // Assuming single threaded access to this function. void Add(const Slice& key); @@ -36,10 +40,15 @@ class DynamicBloom { bool MayContainHash(uint32_t hash); private: - uint32_t (*hash_func_)(const Slice& key); + const bool kBlocked; + const uint32_t kBitsPerBlock; const uint32_t kTotalBits; + const uint32_t kNumBlocks; const uint32_t kNumProbes; - std::unique_ptr data_; + + uint32_t (*hash_func_)(const Slice& key); + unsigned char* data_; + unsigned char* raw_; }; inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); } @@ -50,22 +59,42 @@ inline bool DynamicBloom::MayContain(const Slice& key) { inline bool DynamicBloom::MayContainHash(uint32_t h) { const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits - for (uint32_t i = 0; i < kNumProbes; i++) { - const uint32_t bitpos = h % kTotalBits; - if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { - return false; + if (kBlocked) { + uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock; + for (uint32_t i = 0; i < kNumProbes; ++i) { + const uint32_t bitpos = b + h % kBitsPerBlock; + if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { + return false; + } + h += delta; + } + } else { + for (uint32_t i = 0; i < kNumProbes; ++i) { + const uint32_t bitpos = h % kTotalBits; + if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { + return false; + } + h += delta; } - h += delta; } return true; } inline void DynamicBloom::AddHash(uint32_t h) { const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits - for (uint32_t i = 0; i < kNumProbes; i++) { - const uint32_t bitpos = h % kTotalBits; - data_[bitpos / 8] |= (1 << (bitpos % 8)); - h += delta; + if (kBlocked) { + uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock; + for (uint32_t i = 0; i < kNumProbes; ++i) { + const uint32_t bitpos = b + h % kBitsPerBlock; + data_[bitpos / 8] |= (1 << (bitpos % 8)); + h += delta; + } + } else { + for (uint32_t i = 0; i < kNumProbes; ++i) { + const uint32_t bitpos = h % kTotalBits; + data_[bitpos / 8] |= (1 << (bitpos % 8)); + h += delta; + } } } diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index 58f05ae50..e8bbc38e1 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -3,19 +3,23 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include #include #include "dynamic_bloom.h" +#include "port/port.h" #include "util/logging.h" #include "util/testharness.h" #include "util/testutil.h" +#include "util/stop_watch.h" DEFINE_int32(bits_per_key, 10, ""); DEFINE_int32(num_probes, 6, ""); +DEFINE_bool(enable_perf, false, ""); namespace rocksdb { -static Slice Key(int i, char* buffer) { +static Slice Key(uint64_t i, char* buffer) { memcpy(buffer, &i, sizeof(i)); return Slice(buffer, sizeof(i)); } @@ -24,36 +28,48 @@ class DynamicBloomTest { }; TEST(DynamicBloomTest, EmptyFilter) { - DynamicBloom bloom(100, 2); - ASSERT_TRUE(! bloom.MayContain("hello")); - ASSERT_TRUE(! bloom.MayContain("world")); + DynamicBloom bloom1(100, 0, 2); + ASSERT_TRUE(!bloom1.MayContain("hello")); + ASSERT_TRUE(!bloom1.MayContain("world")); + + DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + ASSERT_TRUE(!bloom2.MayContain("hello")); + ASSERT_TRUE(!bloom2.MayContain("world")); } TEST(DynamicBloomTest, Small) { - DynamicBloom bloom(100, 2); - bloom.Add("hello"); - bloom.Add("world"); - ASSERT_TRUE(bloom.MayContain("hello")); - ASSERT_TRUE(bloom.MayContain("world")); - ASSERT_TRUE(! bloom.MayContain("x")); - ASSERT_TRUE(! bloom.MayContain("foo")); + DynamicBloom bloom1(100, 0, 2); + bloom1.Add("hello"); + bloom1.Add("world"); + ASSERT_TRUE(bloom1.MayContain("hello")); + ASSERT_TRUE(bloom1.MayContain("world")); + ASSERT_TRUE(!bloom1.MayContain("x")); + ASSERT_TRUE(!bloom1.MayContain("foo")); + + DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + bloom2.Add("hello"); + bloom2.Add("world"); + ASSERT_TRUE(bloom2.MayContain("hello")); + ASSERT_TRUE(bloom2.MayContain("world")); + ASSERT_TRUE(!bloom2.MayContain("x")); + ASSERT_TRUE(!bloom2.MayContain("foo")); } -static int NextLength(int length) { - if (length < 10) { - length += 1; - } else if (length < 100) { - length += 10; - } else if (length < 1000) { - length += 100; +static uint32_t NextNum(uint32_t num) { + if (num < 10) { + num += 1; + } else if (num < 100) { + num += 10; + } else if (num < 1000) { + num += 100; } else { - length += 1000; + num += 1000; } - return length; + return num; } TEST(DynamicBloomTest, VaryingLengths) { - char buffer[sizeof(int)]; + char buffer[sizeof(uint64_t)]; // Count number of filters that significantly exceed the false positive rate int mediocre_filters = 0; @@ -62,47 +78,116 @@ TEST(DynamicBloomTest, VaryingLengths) { fprintf(stderr, "bits_per_key: %d num_probes: %d\n", FLAGS_bits_per_key, FLAGS_num_probes); - for (int length = 1; length <= 10000; length = NextLength(length)) { - uint32_t bloom_bits = std::max(length * FLAGS_bits_per_key, 64); - DynamicBloom bloom(bloom_bits, FLAGS_num_probes); - for (int i = 0; i < length; i++) { - bloom.Add(Key(i, buffer)); - ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); - } + for (uint32_t cl_per_block = 0; cl_per_block < FLAGS_num_probes; + ++cl_per_block) { + for (uint32_t num = 1; num <= 10000; num = NextNum(num)) { + uint32_t bloom_bits = 0; + if (cl_per_block == 0) { + bloom_bits = std::max(num * FLAGS_bits_per_key, 64U); + } else { + bloom_bits = std::max(num * FLAGS_bits_per_key, + cl_per_block * CACHE_LINE_SIZE * 8); + } + DynamicBloom bloom(bloom_bits, cl_per_block, FLAGS_num_probes); + for (uint64_t i = 0; i < num; i++) { + bloom.Add(Key(i, buffer)); + ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); + } - // All added keys must match - for (int i = 0; i < length; i++) { - ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) - << "Length " << length << "; key " << i; - } + // All added keys must match + for (uint64_t i = 0; i < num; i++) { + ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) + << "Num " << num << "; key " << i; + } - // Check false positive rate + // Check false positive rate - int result = 0; - for (int i = 0; i < 10000; i++) { - if (bloom.MayContain(Key(i + 1000000000, buffer))) { - result++; + int result = 0; + for (uint64_t i = 0; i < 10000; i++) { + if (bloom.MayContain(Key(i + 1000000000, buffer))) { + result++; + } } + double rate = result / 10000.0; + + fprintf(stderr, "False positives: %5.2f%% @ num = %6u, bloom_bits = %6u, " + "cl per block = %u\n", rate*100.0, num, bloom_bits, cl_per_block); + + if (rate > 0.0125) + mediocre_filters++; // Allowed, but not too often + else + good_filters++; } - double rate = result / 10000.0; - fprintf(stderr, "False positives: %5.2f%% @ length = %6d ; \n", - rate*100.0, length); + fprintf(stderr, "Filters: %d good, %d mediocre\n", + good_filters, mediocre_filters); + ASSERT_LE(mediocre_filters, good_filters/5); + } +} + +TEST(DynamicBloomTest, perf) { + StopWatchNano timer(Env::Default()); - //ASSERT_LE(rate, 0.02); // Must not be over 2% - if (rate > 0.0125) - mediocre_filters++; // Allowed, but not too often - else - good_filters++; + if (!FLAGS_enable_perf) { + return; } - fprintf(stderr, "Filters: %d good, %d mediocre\n", - good_filters, mediocre_filters); + for (uint64_t m = 1; m <= 8; ++m) { + const uint64_t num_keys = m * 8 * 1024 * 1024; + fprintf(stderr, "testing %luM keys\n", m * 8); - ASSERT_LE(mediocre_filters, good_filters/5); -} + DynamicBloom std_bloom(num_keys * 10, 0, FLAGS_num_probes); -// Different bits-per-byte + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + std_bloom.Add(Slice(reinterpret_cast(&i), 8)); + } + + uint64_t elapsed = timer.ElapsedNanos(); + fprintf(stderr, "standard bloom, avg add latency %lu\n", + elapsed / num_keys); + + uint64_t count = 0; + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + if (std_bloom.MayContain(Slice(reinterpret_cast(&i), 8))) { + ++count; + } + } + elapsed = timer.ElapsedNanos(); + fprintf(stderr, "standard bloom, avg query latency %lu\n", + elapsed / count); + ASSERT_TRUE(count == num_keys); + + for (int cl_per_block = 1; cl_per_block <= FLAGS_num_probes; + ++cl_per_block) { + DynamicBloom blocked_bloom(num_keys * 10, cl_per_block, FLAGS_num_probes); + + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + blocked_bloom.Add(Slice(reinterpret_cast(&i), 8)); + } + + uint64_t elapsed = timer.ElapsedNanos(); + fprintf(stderr, "blocked bloom(%d), avg add latency %lu\n", + cl_per_block, elapsed / num_keys); + + uint64_t count = 0; + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + if (blocked_bloom.MayContain( + Slice(reinterpret_cast(&i), 8))) { + ++count; + } + } + + elapsed = timer.ElapsedNanos(); + fprintf(stderr, "blocked bloom(%d), avg query latency %lu\n", + cl_per_block, elapsed / count); + ASSERT_TRUE(count == num_keys); + } + } +} } // namespace rocksdb diff --git a/util/env_posix.cc b/util/env_posix.cc index 237038fcb..da65d7374 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1363,7 +1363,10 @@ class PosixEnv : public Env { EnvOptions OptimizeForLogWrite(const EnvOptions& env_options) const { EnvOptions optimized = env_options; optimized.use_mmap_writes = false; - optimized.fallocate_with_keep_size = false; + // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it + // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit + // test and make this false + optimized.fallocate_with_keep_size = true; return optimized; } diff --git a/util/env_test.cc b/util/env_test.cc index b7009bf5d..0a83037c3 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -481,9 +481,9 @@ class TestLogger : public Logger { if (new_format[0] == '[') { // "[DEBUG] " - ASSERT_TRUE(n <= 56 + (512 - sizeof(struct timeval))); + ASSERT_TRUE(n <= 56 + (512 - static_cast(sizeof(struct timeval)))); } else { - ASSERT_TRUE(n <= 48 + (512 - sizeof(struct timeval))); + ASSERT_TRUE(n <= 48 + (512 - static_cast(sizeof(struct timeval)))); } va_end(backup_ap); } diff --git a/util/options.cc b/util/options.cc index 73af0b7ae..43e265280 100644 --- a/util/options.cc +++ b/util/options.cc @@ -33,7 +33,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() compaction_filter_factory(std::shared_ptr( new DefaultCompactionFilterFactory())), compaction_filter_factory_v2( - new DefaultCompactionFilterFactoryV2(NewFixedPrefixTransform(8))), + new DefaultCompactionFilterFactoryV2()), write_buffer_size(4 << 20), max_write_buffer_number(2), min_write_buffer_number_to_merge(1), @@ -47,8 +47,8 @@ ColumnFamilyOptions::ColumnFamilyOptions() whole_key_filtering(true), num_levels(7), level0_file_num_compaction_trigger(4), - level0_slowdown_writes_trigger(8), - level0_stop_writes_trigger(12), + level0_slowdown_writes_trigger(20), + level0_stop_writes_trigger(24), max_mem_compaction_level(2), target_file_size_base(2 * 1048576), target_file_size_multiplier(1), @@ -58,7 +58,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() expanded_compaction_factor(25), source_compaction_factor(1), max_grandparent_overlap_factor(10), - disable_seek_compaction(false), + disable_seek_compaction(true), soft_rate_limit(0.0), hard_rate_limit(0.0), rate_limit_delay_max_milliseconds(1000), @@ -151,11 +151,11 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) DBOptions::DBOptions() : create_if_missing(false), error_if_exists(false), - paranoid_checks(false), + paranoid_checks(true), env(Env::Default()), info_log(nullptr), info_log_level(INFO), - max_open_files(1000), + max_open_files(5000), statistics(nullptr), disableDataSync(false), use_fsync(false), @@ -176,7 +176,7 @@ DBOptions::DBOptions() manifest_preallocation_size(4 * 1024 * 1024), allow_os_buffer(true), allow_mmap_reads(false), - allow_mmap_writes(true), + allow_mmap_writes(false), is_fd_close_on_exec(true), skip_log_error_on_recovery(false), stats_dump_period_sec(3600), diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index ff7179b2c..f42ea8cca 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -846,7 +846,7 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / backupable_options_->backup_rate_limit; ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time); - ASSERT_LT(backup_time, 1.3 * rate_limited_backup_time); + ASSERT_LT(backup_time, 1.5 * rate_limited_backup_time); CloseBackupableDB(); @@ -858,7 +858,7 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / backupable_options_->restore_rate_limit; ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); - ASSERT_LT(restore_time, 1.3 * rate_limited_restore_time); + ASSERT_LT(restore_time, 1.5 * rate_limited_restore_time); AssertBackupConsistency(0, 0, 100000, 100010); } diff --git a/utilities/geodb/geodb_impl.cc b/utilities/geodb/geodb_impl.cc new file mode 100644 index 000000000..095ecf8ab --- /dev/null +++ b/utilities/geodb/geodb_impl.cc @@ -0,0 +1,427 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#include "utilities/geodb/geodb_impl.h" + +#define __STDC_FORMAT_MACROS + +#include +#include +#include +#include +#include "db/filename.h" +#include "util/coding.h" + +// +// There are two types of keys. The first type of key-values +// maps a geo location to the set of object ids and their values. +// Table 1 +// key : p + : + $quadkey + : + $id + +// : + $latitude + : + $longitude +// value : value of the object +// This table can be used to find all objects that reside near +// a specified geolocation. +// +// Table 2 +// key : 'k' + : + $id +// value: $quadkey + +namespace rocksdb { + +GeoDBImpl::GeoDBImpl(DB* db, const GeoDBOptions& options) : + GeoDB(db, options), db_(db), options_(options) { +} + +GeoDBImpl::~GeoDBImpl() { +} + +Status GeoDBImpl::Insert(const GeoObject& obj) { + WriteBatch batch; + + // It is possible that this id is already associated with + // with a different position. We first have to remove that + // association before we can insert the new one. + + // remove existing object, if it exists + GeoObject old; + Status status = GetById(obj.id, &old); + if (status.ok()) { + assert(obj.id.compare(old.id) == 0); + std::string quadkey = PositionToQuad(old.position, Detail); + std::string key1 = MakeKey1(old.position, old.id, quadkey); + std::string key2 = MakeKey2(old.id); + batch.Delete(Slice(key1)); + batch.Delete(Slice(key2)); + } else if (status.IsNotFound()) { + // What if another thread is trying to insert the same ID concurrently? + } else { + return status; + } + + // insert new object + std::string quadkey = PositionToQuad(obj.position, Detail); + std::string key1 = MakeKey1(obj.position, obj.id, quadkey); + std::string key2 = MakeKey2(obj.id); + batch.Put(Slice(key1), Slice(obj.value)); + batch.Put(Slice(key2), Slice(quadkey)); + return db_->Write(woptions_, &batch); +} + +Status GeoDBImpl::GetByPosition(const GeoPosition& pos, + const Slice& id, + std::string* value) { + std::string quadkey = PositionToQuad(pos, Detail); + std::string key1 = MakeKey1(pos, id, quadkey); + return db_->Get(roptions_, Slice(key1), value); +} + +Status GeoDBImpl::GetById(const Slice& id, GeoObject* object) { + Status status; + Slice quadkey; + + // create an iterator so that we can get a consistent picture + // of the database. + Iterator* iter = db_->NewIterator(roptions_); + + // create key for table2 + std::string kt = MakeKey2(id); + Slice key2(kt); + + iter->Seek(key2); + if (iter->Valid() && iter->status().ok()) { + if (iter->key().compare(key2) == 0) { + quadkey = iter->value(); + } + } + if (quadkey.size() == 0) { + delete iter; + return Status::NotFound(key2); + } + + // + // Seek to the quadkey + id prefix + // + std::string prefix = MakeKey1Prefix(quadkey.ToString(), id); + iter->Seek(Slice(prefix)); + assert(iter->Valid()); + if (!iter->Valid() || !iter->status().ok()) { + delete iter; + return Status::NotFound(); + } + + // split the key into p + quadkey + id + lat + lon + std::vector parts; + Slice key = iter->key(); + StringSplit(&parts, key.ToString(), ':'); + assert(parts.size() == 5); + assert(parts[0] == "p"); + assert(parts[1] == quadkey); + assert(parts[2] == id); + + // fill up output parameters + object->position.latitude = atof(parts[3].c_str()); + object->position.longitude = atof(parts[4].c_str()); + object->id = id.ToString(); // this is redundant + object->value = iter->value().ToString(); + delete iter; + return Status::OK(); +} + + +Status GeoDBImpl::Remove(const Slice& id) { + // Read the object from the database + GeoObject obj; + Status status = GetById(id, &obj); + if (!status.ok()) { + return status; + } + + // remove the object by atomically deleting it from both tables + std::string quadkey = PositionToQuad(obj.position, Detail); + std::string key1 = MakeKey1(obj.position, obj.id, quadkey); + std::string key2 = MakeKey2(obj.id); + WriteBatch batch; + batch.Delete(Slice(key1)); + batch.Delete(Slice(key2)); + return db_->Write(woptions_, &batch); +} + +Status GeoDBImpl::SearchRadial(const GeoPosition& pos, + double radius, + std::vector* values, + int number_of_values) { + // Gather all bounding quadkeys + std::vector qids; + Status s = searchQuadIds(pos, radius, &qids); + if (!s.ok()) { + return s; + } + + // create an iterator + Iterator* iter = db_->NewIterator(ReadOptions()); + + // Process each prospective quadkey + for (std::string qid : qids) { + // The user is interested in only these many objects. + if (number_of_values == 0) { + break; + } + + // convert quadkey to db key prefix + std::string dbkey = MakeQuadKeyPrefix(qid); + + for (iter->Seek(dbkey); + number_of_values > 0 && iter->Valid() && iter->status().ok(); + iter->Next()) { + // split the key into p + quadkey + id + lat + lon + std::vector parts; + Slice key = iter->key(); + StringSplit(&parts, key.ToString(), ':'); + assert(parts.size() == 5); + assert(parts[0] == "p"); + std::string* quadkey = &parts[1]; + + // If the key we are looking for is a prefix of the key + // we found from the database, then this is one of the keys + // we are looking for. + auto res = std::mismatch(qid.begin(), qid.end(), quadkey->begin()); + if (res.first == qid.end()) { + GeoPosition pos(atof(parts[3].c_str()), atof(parts[4].c_str())); + GeoObject obj(pos, parts[4], iter->value().ToString()); + values->push_back(obj); + number_of_values--; + } else { + break; + } + } + } + delete iter; + return Status::OK(); +} + +std::string GeoDBImpl::MakeKey1(const GeoPosition& pos, Slice id, + std::string quadkey) { + std::string lat = std::to_string(pos.latitude); + std::string lon = std::to_string(pos.longitude); + std::string key = "p:"; + key.reserve(5 + quadkey.size() + id.size() + lat.size() + lon.size()); + key.append(quadkey); + key.append(":"); + key.append(id.ToString()); + key.append(":"); + key.append(lat); + key.append(":"); + key.append(lon); + return key; +} + +std::string GeoDBImpl::MakeKey2(Slice id) { + std::string key = "k:"; + key.append(id.ToString()); + return key; +} + +std::string GeoDBImpl::MakeKey1Prefix(std::string quadkey, + Slice id) { + std::string key = "p:"; + key.reserve(3 + quadkey.size() + id.size()); + key.append(quadkey); + key.append(":"); + key.append(id.ToString()); + return key; +} + +std::string GeoDBImpl::MakeQuadKeyPrefix(std::string quadkey) { + std::string key = "p:"; + key.append(quadkey); + return key; +} + +void GeoDBImpl::StringSplit(std::vector* tokens, + const std::string &text, char sep) { + std::size_t start = 0, end = 0; + while ((end = text.find(sep, start)) != std::string::npos) { + tokens->push_back(text.substr(start, end - start)); + start = end + 1; + } + tokens->push_back(text.substr(start)); +} + +// convert degrees to radians +double GeoDBImpl::radians(double x) { + return (x * PI) / 180; +} + +// convert radians to degrees +double GeoDBImpl::degrees(double x) { + return (x * 180) / PI; +} + +// convert a gps location to quad coordinate +std::string GeoDBImpl::PositionToQuad(const GeoPosition& pos, + int levelOfDetail) { + Pixel p = PositionToPixel(pos, levelOfDetail); + Tile tile = PixelToTile(p); + return TileToQuadKey(tile, levelOfDetail); +} + +GeoPosition GeoDBImpl::displaceLatLon(double lat, double lon, + double deltay, double deltax) { + double dLat = deltay / EarthRadius; + double dLon = deltax / (EarthRadius * cos(radians(lat))); + return GeoPosition(lat + degrees(dLat), + lon + degrees(dLon)); +} + +// +// Return the distance between two positions on the earth +// +double GeoDBImpl::distance(double lat1, double lon1, + double lat2, double lon2) { + double lon = radians(lon2 - lon1); + double lat = radians(lat2 - lat1); + + double a = (sin(lat / 2) * sin(lat / 2)) + + cos(radians(lat1)) * cos(radians(lat2)) * + (sin(lon / 2) * sin(lon / 2)); + double angle = 2 * atan2(sqrt(a), sqrt(1 - a)); + return angle * EarthRadius; +} + +// +// Returns all the quadkeys inside the search range +// +Status GeoDBImpl::searchQuadIds(const GeoPosition& position, + double radius, + std::vector* quadKeys) { + // get the outline of the search square + GeoPosition topLeftPos = boundingTopLeft(position, radius); + GeoPosition bottomRightPos = boundingBottomRight(position, radius); + + Pixel topLeft = PositionToPixel(topLeftPos, Detail); + Pixel bottomRight = PositionToPixel(bottomRightPos, Detail); + + // how many level of details to look for + int numberOfTilesAtMaxDepth = floor((bottomRight.x - topLeft.x) / 256); + int zoomLevelsToRise = floor(log(numberOfTilesAtMaxDepth) / log(2)); + zoomLevelsToRise++; + int levels = std::max(0, Detail - zoomLevelsToRise); + + quadKeys->push_back(PositionToQuad(GeoPosition(topLeftPos.latitude, + topLeftPos.longitude), + levels)); + quadKeys->push_back(PositionToQuad(GeoPosition(topLeftPos.latitude, + bottomRightPos.longitude), + levels)); + quadKeys->push_back(PositionToQuad(GeoPosition(bottomRightPos.latitude, + topLeftPos.longitude), + levels)); + quadKeys->push_back(PositionToQuad(GeoPosition(bottomRightPos.latitude, + bottomRightPos.longitude), + levels)); + return Status::OK(); +} + +// Determines the ground resolution (in meters per pixel) at a specified +// latitude and level of detail. +// Latitude (in degrees) at which to measure the ground resolution. +// Level of detail, from 1 (lowest detail) to 23 (highest detail). +// Returns the ground resolution, in meters per pixel. +double GeoDBImpl::GroundResolution(double latitude, int levelOfDetail) { + latitude = clip(latitude, MinLatitude, MaxLatitude); + return cos(latitude * PI / 180) * 2 * PI * EarthRadius / + MapSize(levelOfDetail); +} + +// Converts a point from latitude/longitude WGS-84 coordinates (in degrees) +// into pixel XY coordinates at a specified level of detail. +GeoDBImpl::Pixel GeoDBImpl::PositionToPixel(const GeoPosition& pos, + int levelOfDetail) { + double latitude = clip(pos.latitude, MinLatitude, MaxLatitude); + double x = (pos.longitude + 180) / 360; + double sinLatitude = sin(latitude * PI / 180); + double y = 0.5 - log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * PI); + double mapSize = MapSize(levelOfDetail); + double X = floor(clip(x * mapSize + 0.5, 0, mapSize - 1)); + double Y = floor(clip(y * mapSize + 0.5, 0, mapSize - 1)); + return Pixel((unsigned int)X, (unsigned int)Y); +} + +GeoPosition GeoDBImpl::PixelToPosition(const Pixel& pixel, int levelOfDetail) { + double mapSize = MapSize(levelOfDetail); + double x = (clip(pixel.x, 0, mapSize - 1) / mapSize) - 0.5; + double y = 0.5 - (clip(pixel.y, 0, mapSize - 1) / mapSize); + double latitude = 90 - 360 * atan(exp(-y * 2 * PI)) / PI; + double longitude = 360 * x; + return GeoPosition(latitude, longitude); +} + +// Converts a Pixel to a Tile +GeoDBImpl::Tile GeoDBImpl::PixelToTile(const Pixel& pixel) { + unsigned int tileX = floor(pixel.x / 256); + unsigned int tileY = floor(pixel.y / 256); + return Tile(tileX, tileY); +} + +GeoDBImpl::Pixel GeoDBImpl::TileToPixel(const Tile& tile) { + unsigned int pixelX = tile.x * 256; + unsigned int pixelY = tile.y * 256; + return Pixel(pixelX, pixelY); +} + +// Convert a Tile to a quadkey +std::string GeoDBImpl::TileToQuadKey(const Tile& tile, int levelOfDetail) { + std::stringstream quadKey; + for (int i = levelOfDetail; i > 0; i--) { + char digit = '0'; + int mask = 1 << (i - 1); + if ((tile.x & mask) != 0) { + digit++; + } + if ((tile.y & mask) != 0) { + digit++; + digit++; + } + quadKey << digit; + } + return quadKey.str(); +} + +// +// Convert a quadkey to a tile and its level of detail +// +void GeoDBImpl::QuadKeyToTile(std::string quadkey, Tile* tile, + int *levelOfDetail) { + tile->x = tile->y = 0; + *levelOfDetail = quadkey.size(); + const char* key = reinterpret_cast(quadkey.c_str()); + for (int i = *levelOfDetail; i > 0; i--) { + int mask = 1 << (i - 1); + switch (key[*levelOfDetail - i]) { + case '0': + break; + + case '1': + tile->x |= mask; + break; + + case '2': + tile->y |= mask; + break; + + case '3': + tile->x |= mask; + tile->y |= mask; + break; + + default: + std::stringstream msg; + msg << quadkey; + msg << " Invalid QuadKey."; + throw std::runtime_error(msg.str()); + } + } +} +} // namespace rocksdb diff --git a/utilities/geodb/geodb_impl.h b/utilities/geodb/geodb_impl.h new file mode 100644 index 000000000..376a211c6 --- /dev/null +++ b/utilities/geodb/geodb_impl.h @@ -0,0 +1,187 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// + +#pragma once +#include +#include +#include +#include +#include +#include + +#include "utilities/geo_db.h" +#include "utilities/stackable_db.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +// A specific implementation of GeoDB + +class GeoDBImpl : public GeoDB { + public: + GeoDBImpl(DB* db, const GeoDBOptions& options); + ~GeoDBImpl(); + + // Associate the GPS location with the identified by 'id'. The value + // is a blob that is associated with this object. + virtual Status Insert(const GeoObject& object); + + // Retrieve the value of the object located at the specified GPS + // location and is identified by the 'id'. + virtual Status GetByPosition(const GeoPosition& pos, + const Slice& id, + std::string* value); + + // Retrieve the value of the object identified by the 'id'. This method + // could be potentially slower than GetByPosition + virtual Status GetById(const Slice& id, GeoObject* object); + + // Delete the specified object + virtual Status Remove(const Slice& id); + + // Returns a list of all items within a circular radius from the + // specified gps location + virtual Status SearchRadial(const GeoPosition& pos, + double radius, + std::vector* values, + int number_of_values); + + private: + DB* db_; + const GeoDBOptions options_; + const WriteOptions woptions_; + const ReadOptions roptions_; + + // The value of PI + static constexpr double PI = 3.141592653589793; + + // convert degrees to radians + static double radians(double x); + + // convert radians to degrees + static double degrees(double x); + + // A pixel class that captures X and Y coordinates + class Pixel { + public: + unsigned int x; + unsigned int y; + Pixel(unsigned int a, unsigned int b) : + x(a), y(b) { + } + }; + + // A Tile in the geoid + class Tile { + public: + unsigned int x; + unsigned int y; + Tile(unsigned int a, unsigned int b) : + x(a), y(b) { + } + }; + + // convert a gps location to quad coordinate + static std::string PositionToQuad(const GeoPosition& pos, int levelOfDetail); + + // arbitrary constant use for WGS84 via + // http://en.wikipedia.org/wiki/World_Geodetic_System + // http://mathforum.org/library/drmath/view/51832.html + // http://msdn.microsoft.com/en-us/library/bb259689.aspx + // http://www.tuicool.com/articles/NBrE73 + // + const int Detail = 23; + static constexpr double EarthRadius = 6378137; + static constexpr double MinLatitude = -85.05112878; + static constexpr double MaxLatitude = 85.05112878; + static constexpr double MinLongitude = -180; + static constexpr double MaxLongitude = 180; + + // clips a number to the specified minimum and maximum values. + static double clip(double n, double minValue, double maxValue) { + return fmin(fmax(n, minValue), maxValue); + } + + // Determines the map width and height (in pixels) at a specified level + // of detail, from 1 (lowest detail) to 23 (highest detail). + // Returns the map width and height in pixels. + static unsigned int MapSize(int levelOfDetail) { + return (unsigned int)(256 << levelOfDetail); + } + + // Determines the ground resolution (in meters per pixel) at a specified + // latitude and level of detail. + // Latitude (in degrees) at which to measure the ground resolution. + // Level of detail, from 1 (lowest detail) to 23 (highest detail). + // Returns the ground resolution, in meters per pixel. + static double GroundResolution(double latitude, int levelOfDetail); + + // Converts a point from latitude/longitude WGS-84 coordinates (in degrees) + // into pixel XY coordinates at a specified level of detail. + static Pixel PositionToPixel(const GeoPosition& pos, int levelOfDetail); + + static GeoPosition PixelToPosition(const Pixel& pixel, int levelOfDetail); + + // Converts a Pixel to a Tile + static Tile PixelToTile(const Pixel& pixel); + + static Pixel TileToPixel(const Tile& tile); + + // Convert a Tile to a quadkey + static std::string TileToQuadKey(const Tile& tile, int levelOfDetail); + + // Convert a quadkey to a tile and its level of detail + static void QuadKeyToTile(std::string quadkey, Tile* tile, + int *levelOfDetail); + + // Return the distance between two positions on the earth + static double distance(double lat1, double lon1, + double lat2, double lon2); + static GeoPosition displaceLatLon(double lat, double lon, + double deltay, double deltax); + + // + // Returns the top left position after applying the delta to + // the specified position + // + static GeoPosition boundingTopLeft(const GeoPosition& in, double radius) { + return displaceLatLon(in.latitude, in.longitude, -radius, -radius); + } + + // + // Returns the bottom right position after applying the delta to + // the specified position + static GeoPosition boundingBottomRight(const GeoPosition& in, + double radius) { + return displaceLatLon(in.latitude, in.longitude, radius, radius); + } + + // + // Get all quadkeys within a radius of a specified position + // + Status searchQuadIds(const GeoPosition& position, + double radius, + std::vector* quadKeys); + + // splits a string into its components + static void StringSplit(std::vector* tokens, + const std::string &text, + char sep); + + // + // Create keys for accessing rocksdb table(s) + // + static std::string MakeKey1(const GeoPosition& pos, + Slice id, + std::string quadkey); + static std::string MakeKey2(Slice id); + static std::string MakeKey1Prefix(std::string quadkey, + Slice id); + static std::string MakeQuadKeyPrefix(std::string quadkey); +}; + +} // namespace rocksdb diff --git a/utilities/geodb/geodb_test.cc b/utilities/geodb/geodb_test.cc new file mode 100644 index 000000000..d7af6c32b --- /dev/null +++ b/utilities/geodb/geodb_test.cc @@ -0,0 +1,123 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// +#include "utilities/geodb/geodb_impl.h" + +#include +#include "util/testharness.h" + +namespace rocksdb { + +class GeoDBTest { + public: + static const std::string kDefaultDbName; + static Options options; + DB* db; + GeoDB* geodb; + + GeoDBTest() { + GeoDBOptions geodb_options; + ASSERT_OK(DestroyDB(kDefaultDbName, options)); + options.create_if_missing = true; + Status status = DB::Open(options, kDefaultDbName, &db); + geodb = new GeoDBImpl(db, geodb_options); + } + + ~GeoDBTest() { + delete geodb; + } + + GeoDB* getdb() { + return geodb; + } +}; + +const std::string GeoDBTest::kDefaultDbName = "/tmp/geodefault/"; +Options GeoDBTest::options = Options(); + +// Insert, Get and Remove +TEST(GeoDBTest, SimpleTest) { + GeoPosition pos1(100, 101); + std::string id1("id1"); + std::string value1("value1"); + + // insert first object into database + GeoObject obj1(pos1, id1, value1); + Status status = getdb()->Insert(obj1); + ASSERT_TRUE(status.ok()); + + // insert second object into database + GeoPosition pos2(200, 201); + std::string id2("id2"); + std::string value2 = "value2"; + GeoObject obj2(pos2, id2, value2); + status = getdb()->Insert(obj2); + ASSERT_TRUE(status.ok()); + + // retrieve first object using position + std::string value; + status = getdb()->GetByPosition(pos1, Slice(id1), &value); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(value, value1); + + // retrieve first object using id + GeoObject obj; + status = getdb()->GetById(Slice(id1), &obj); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj.position.latitude, 100); + ASSERT_EQ(obj.position.longitude, 101); + ASSERT_EQ(obj.id.compare(id1), 0); + ASSERT_EQ(obj.value, value1); + + // delete first object + status = getdb()->Remove(Slice(id1)); + ASSERT_TRUE(status.ok()); + status = getdb()->GetByPosition(pos1, Slice(id1), &value); + ASSERT_TRUE(status.IsNotFound()); + status = getdb()->GetById(id1, &obj); + ASSERT_TRUE(status.IsNotFound()); + + // check that we can still find second object + status = getdb()->GetByPosition(pos2, id2, &value); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(value, value2); + status = getdb()->GetById(id2, &obj); + ASSERT_TRUE(status.ok()); +} + +// Search. +// Verify distances via http://www.stevemorse.org/nearest/distance.php +TEST(GeoDBTest, Search) { + GeoPosition pos1(45, 45); + std::string id1("mid1"); + std::string value1 = "midvalue1"; + + // insert object at 45 degree latitude + GeoObject obj1(pos1, id1, value1); + Status status = getdb()->Insert(obj1); + ASSERT_TRUE(status.ok()); + + // search all objects centered at 46 degree latitude with + // a radius of 200 kilometers. We should find the one object that + // we inserted earlier. + std::vector values; + status = getdb()->SearchRadial(GeoPosition(46, 46), 200000, &values); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(values.size(), 1); + + // search all objects centered at 46 degree latitude with + // a radius of 2 kilometers. There should be none. + values.clear(); + status = getdb()->SearchRadial(GeoPosition(46, 46), 2, &values); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(values.size(), 0); +} + +} // namespace rocksdb + +int main(int argc, char* argv[]) { + return rocksdb::test::RunAllTests(); +}