diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index f967716f6..bde472418 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -10,8 +10,14 @@ #ifndef ROCKSDB_LITE #include "table/cuckoo_table_reader.h" +#include +#include #include +#include +#include +#include "rocksdb/iterator.h" #include "table/meta_blocks.h" +#include "util/arena.h" #include "util/coding.h" namespace rocksdb { @@ -103,9 +109,164 @@ Status CuckooTableReader::Get( return Status::OK(); } +class CuckooTableIterator : public Iterator { + public: + explicit CuckooTableIterator(CuckooTableReader* reader); + ~CuckooTableIterator() {} + bool Valid() const override; + void SeekToFirst() override; + void SeekToLast() override; + void Seek(const Slice& target) override; + void Next() override; + void Prev() override; + Slice key() const override; + Slice value() const override; + Status status() const override { return status_; } + void LoadKeysFromReader(); + + private: + struct { + bool operator()(const std::pair& first, + const std::pair& second) const { + return first.first.compare(second.first) < 0; + } + } CompareKeys; + void PrepareKVAtCurrIdx(); + CuckooTableReader* reader_; + Status status_; + // Contains a map of keys to bucket_id sorted in key order. + // We assume byte-wise comparison for key ordering. + std::vector> key_to_bucket_id_; + // We assume that the number of items can be stored in uint32 (4 Billion). + uint32_t curr_key_idx_; + Slice curr_value_; + IterKey curr_key_; + // No copying allowed + CuckooTableIterator(const CuckooTableIterator&) = delete; + void operator=(const Iterator&) = delete; +}; + +CuckooTableIterator::CuckooTableIterator(CuckooTableReader* reader) + : reader_(reader), + curr_key_idx_(std::numeric_limits::max()) { + key_to_bucket_id_.clear(); + curr_value_.clear(); + curr_key_.Clear(); +} + +void CuckooTableIterator::LoadKeysFromReader() { + key_to_bucket_id_.reserve(reader_->GetTableProperties()->num_entries); + for (uint32_t bucket_id = 0; bucket_id < reader_->num_buckets_; bucket_id++) { + Slice read_key; + status_ = reader_->file_->Read(bucket_id * reader_->bucket_length_, + reader_->key_length_, &read_key, nullptr); + if (read_key != Slice(reader_->unused_key_)) { + key_to_bucket_id_.push_back(std::make_pair(read_key, bucket_id)); + } + } + assert(key_to_bucket_id_.size() == + reader_->GetTableProperties()->num_entries); + std::sort(key_to_bucket_id_.begin(), key_to_bucket_id_.end(), CompareKeys); + curr_key_idx_ = key_to_bucket_id_.size(); +} + +void CuckooTableIterator::SeekToFirst() { + curr_key_idx_ = 0; + PrepareKVAtCurrIdx(); +} + +void CuckooTableIterator::SeekToLast() { + curr_key_idx_ = key_to_bucket_id_.size() - 1; + PrepareKVAtCurrIdx(); +} + +void CuckooTableIterator::Seek(const Slice& target) { + // We assume that the target is an internal key. If this is last level file, + // we need to take only the user key part to seek. + Slice target_to_search = reader_->is_last_level_ ? + ExtractUserKey(target) : target; + auto seek_it = std::lower_bound(key_to_bucket_id_.begin(), + key_to_bucket_id_.end(), + std::make_pair(target_to_search, 0), + CompareKeys); + curr_key_idx_ = std::distance(key_to_bucket_id_.begin(), seek_it); + PrepareKVAtCurrIdx(); +} + +bool CuckooTableIterator::Valid() const { + return curr_key_idx_ < key_to_bucket_id_.size(); +} + +void CuckooTableIterator::PrepareKVAtCurrIdx() { + if (!Valid()) { + curr_value_.clear(); + curr_key_.Clear(); + return; + } + uint64_t offset = ((uint64_t) key_to_bucket_id_[curr_key_idx_].second + * reader_->bucket_length_) + reader_->key_length_; + status_ = reader_->file_->Read(offset, reader_->value_length_, + &curr_value_, nullptr); + if (reader_->is_last_level_) { + // Always return internal key. + curr_key_.SetInternalKey( + key_to_bucket_id_[curr_key_idx_].first, 0, kTypeValue); + } +} + +void CuckooTableIterator::Next() { + if (!Valid()) { + curr_value_.clear(); + curr_key_.Clear(); + return; + } + ++curr_key_idx_; + PrepareKVAtCurrIdx(); +} + +void CuckooTableIterator::Prev() { + if (curr_key_idx_ == 0) { + curr_key_idx_ = key_to_bucket_id_.size(); + } + if (!Valid()) { + curr_value_.clear(); + curr_key_.Clear(); + return; + } + --curr_key_idx_; + PrepareKVAtCurrIdx(); +} + +Slice CuckooTableIterator::key() const { + assert(Valid()); + if (reader_->is_last_level_) { + return curr_key_.GetKey(); + } else { + return key_to_bucket_id_[curr_key_idx_].first; + } +} + +Slice CuckooTableIterator::value() const { + assert(Valid()); + return curr_value_; +} + Iterator* CuckooTableReader::NewIterator(const ReadOptions&, Arena* arena) { - // TODO(rbs): Implement this as this will be used in compaction. - return nullptr; + if (!status().ok()) { + return NewErrorIterator( + Status::Corruption("CuckooTableReader status is not okay.")); + } + CuckooTableIterator* iter; + if (arena == nullptr) { + iter = new CuckooTableIterator(this); + } else { + auto iter_mem = arena->AllocateAligned(sizeof(CuckooTableIterator)); + iter = new (iter_mem) CuckooTableIterator(this); + } + if (iter->status().ok()) { + iter->LoadKeysFromReader(); + } + return iter; } } // namespace rocksdb #endif diff --git a/table/cuckoo_table_reader.h b/table/cuckoo_table_reader.h index 3e2de256a..4e99fc72e 100644 --- a/table/cuckoo_table_reader.h +++ b/table/cuckoo_table_reader.h @@ -11,6 +11,8 @@ #ifndef ROCKSDB_LITE #include #include +#include +#include #include "db/dbformat.h" #include "rocksdb/env.h" @@ -30,7 +32,7 @@ class CuckooTableReader: public TableReader { uint64_t (*GetSliceHash)(const Slice&, uint32_t, uint64_t)); ~CuckooTableReader() {} - std::shared_ptr GetTableProperties() const { + std::shared_ptr GetTableProperties() const override { return table_props_; } @@ -40,17 +42,20 @@ class CuckooTableReader: public TableReader { const ReadOptions& readOptions, const Slice& key, void* handle_context, bool (*result_handler)(void* arg, const ParsedInternalKey& k, const Slice& v), - void (*mark_key_may_exist_handler)(void* handle_context) = nullptr); + void (*mark_key_may_exist_handler)(void* handle_context) = nullptr) + override; - Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr); + Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override; // Following methods are not implemented for Cuckoo Table Reader - uint64_t ApproximateOffsetOf(const Slice& key) { return 0; } - void SetupForCompaction() {} - void Prepare(const Slice& target) {} + uint64_t ApproximateOffsetOf(const Slice& key) override { return 0; } + void SetupForCompaction() override {} + void Prepare(const Slice& target) override {} // End of methods not implemented. private: + friend class CuckooTableIterator; + void LoadAllKeys(std::vector>* key_to_bucket_id); std::unique_ptr file_; Slice file_data_; bool is_last_level_; diff --git a/table/cuckoo_table_reader_test.cc b/table/cuckoo_table_reader_test.cc index a7f1d0fd8..81746aa62 100644 --- a/table/cuckoo_table_reader_test.cc +++ b/table/cuckoo_table_reader_test.cc @@ -22,6 +22,7 @@ int main() { #include "table/cuckoo_table_builder.h" #include "table/cuckoo_table_reader.h" #include "table/cuckoo_table_factory.h" +#include "util/arena.h" #include "util/random.h" #include "util/testharness.h" #include "util/testutil.h" @@ -96,6 +97,10 @@ class CuckooReaderTest { values.resize(num_items); } + std::string NumToStr(int64_t i) { + return std::string(reinterpret_cast(&i), sizeof(i)); + } + void CreateCuckooFile(bool is_last_level) { unique_ptr writable_file; ASSERT_OK(env->NewWritableFile(fname, &writable_file, env_options)); @@ -105,8 +110,8 @@ class CuckooReaderTest { ASSERT_OK(builder.status()); for (uint32_t key_idx = 0; key_idx < num_items; ++key_idx) { builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); - ASSERT_EQ(builder.NumEntries(), key_idx + 1); ASSERT_OK(builder.status()); + ASSERT_EQ(builder.NumEntries(), key_idx + 1); } ASSERT_OK(builder.Finish()); ASSERT_EQ(num_items, builder.NumEntries()); @@ -123,7 +128,6 @@ class CuckooReaderTest { file_size, GetSliceHash); ASSERT_OK(reader.status()); - for (uint32_t i = 0; i < num_items; ++i) { ValuesToAssert v(user_keys[i], values[i]); ASSERT_OK(reader.Get( @@ -132,10 +136,70 @@ class CuckooReaderTest { } } + void CheckIterator() { + unique_ptr read_file; + ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); + CuckooTableReader reader( + options, + std::move(read_file), + file_size, + GetSliceHash); + ASSERT_OK(reader.status()); + Iterator* it = reader.NewIterator(ReadOptions(), nullptr); + ASSERT_OK(it->status()); + ASSERT_TRUE(!it->Valid()); + it->SeekToFirst(); + int cnt = 0; + while (it->Valid()) { + ASSERT_OK(it->status()); + ASSERT_TRUE(Slice(keys[cnt]) == it->key()); + ASSERT_TRUE(Slice(values[cnt]) == it->value()); + ++cnt; + it->Next(); + } + ASSERT_EQ(cnt, num_items); + + it->SeekToLast(); + cnt = num_items - 1; + ASSERT_TRUE(it->Valid()); + while (it->Valid()) { + ASSERT_OK(it->status()); + ASSERT_TRUE(Slice(keys[cnt]) == it->key()); + ASSERT_TRUE(Slice(values[cnt]) == it->value()); + --cnt; + it->Prev(); + } + ASSERT_EQ(cnt, -1); + + cnt = num_items / 2; + it->Seek(keys[cnt]); + while (it->Valid()) { + ASSERT_OK(it->status()); + ASSERT_TRUE(Slice(keys[cnt]) == it->key()); + ASSERT_TRUE(Slice(values[cnt]) == it->value()); + ++cnt; + it->Next(); + } + ASSERT_EQ(cnt, num_items); + delete it; + + Arena arena; + it = reader.NewIterator(ReadOptions(), &arena); + ASSERT_OK(it->status()); + ASSERT_TRUE(!it->Valid()); + it->Seek(keys[num_items/2]); + ASSERT_TRUE(it->Valid()); + ASSERT_OK(it->status()); + ASSERT_TRUE(keys[num_items/2] == it->key()); + ASSERT_TRUE(values[num_items/2] == it->value()); + ASSERT_OK(it->status()); + it->~Iterator(); + } + std::vector keys; std::vector user_keys; std::vector values; - uint32_t num_items; + uint64_t num_items; std::string fname; uint64_t file_size; Options options; @@ -144,13 +208,14 @@ class CuckooReaderTest { }; TEST(CuckooReaderTest, WhenKeyExists) { - SetUp(10); + SetUp(kNumHashFunc); fname = test::TmpDir() + "/CuckooReader_WhenKeyExists"; - for (uint32_t i = 0; i < num_items; i++) { - user_keys[i] = "keys" + std::to_string(i+100); + for (uint64_t i = 0; i < num_items; i++) { + user_keys[i] = "key" + NumToStr(i); ParsedInternalKey ikey(user_keys[i], i + 1000, kTypeValue); AppendInternalKey(&keys[i], ikey); - values[i] = "value" + std::to_string(i+100); + values[i] = "value" + NumToStr(i); + // Give disjoint hash values. AddHashLookups(user_keys[i], i * kNumHashFunc, kNumHashFunc); } CreateCuckooFile(false); @@ -170,15 +235,33 @@ TEST(CuckooReaderTest, WhenKeyExists) { CheckReader(); } +TEST(CuckooReaderTest, CheckIterator) { + SetUp(2*kNumHashFunc); + fname = test::TmpDir() + "/CuckooReader_CheckIterator"; + for (uint64_t i = 0; i < num_items; i++) { + user_keys[i] = "key" + NumToStr(i); + ParsedInternalKey ikey(user_keys[i], 0, kTypeValue); + AppendInternalKey(&keys[i], ikey); + values[i] = "value" + NumToStr(i); + // Give disjoint hash values, in reverse order. + AddHashLookups(user_keys[i], (num_items-i-1)*kNumHashFunc, kNumHashFunc); + } + CreateCuckooFile(false); + CheckIterator(); + // Last level file. + CreateCuckooFile(true); + CheckIterator(); +} + TEST(CuckooReaderTest, WhenKeyNotFound) { // Add keys with colliding hash values. SetUp(kNumHashFunc / 2); fname = test::TmpDir() + "/CuckooReader_WhenKeyNotFound"; - for (uint32_t i = 0; i < num_items; i++) { - user_keys[i] = "keys" + std::to_string(i+100); + for (uint64_t i = 0; i < num_items; i++) { + user_keys[i] = "key" + NumToStr(i); ParsedInternalKey ikey(user_keys[i], i + 1000, kTypeValue); AppendInternalKey(&keys[i], ikey); - values[i] = "value" + std::to_string(i+100); + values[i] = "value" + NumToStr(i); // Make all hash values collide. AddHashLookups(user_keys[i], 0, kNumHashFunc); } @@ -193,7 +276,7 @@ TEST(CuckooReaderTest, WhenKeyNotFound) { GetSliceHash); ASSERT_OK(reader.status()); // Search for a key with colliding hash values. - std::string not_found_user_key = "keys" + std::to_string(num_items + 100); + std::string not_found_user_key = "key" + NumToStr(num_items); std::string not_found_key; AddHashLookups(not_found_user_key, 0, kNumHashFunc); ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue); @@ -204,10 +287,10 @@ TEST(CuckooReaderTest, WhenKeyNotFound) { ASSERT_EQ(0, v.call_count); ASSERT_OK(reader.status()); // Search for a key with an independent hash value. - std::string not_found_user_key2 = "keys" + std::to_string(num_items + 101); - std::string not_found_key2; + std::string not_found_user_key2 = "key" + NumToStr(num_items + 1); AddHashLookups(not_found_user_key2, kNumHashFunc, kNumHashFunc); ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue); + std::string not_found_key2; AppendInternalKey(¬_found_key2, ikey2); ASSERT_OK(reader.Get( ReadOptions(), Slice(not_found_key2), &v, AssertValues, nullptr)); @@ -215,21 +298,21 @@ TEST(CuckooReaderTest, WhenKeyNotFound) { ASSERT_OK(reader.status()); // Test read with corrupted key. - not_found_key2.pop_back(); - ASSERT_TRUE(!ParseInternalKey(not_found_key2, &ikey)); + Slice corrupt_key("corrupt_ikey"); + ASSERT_TRUE(!ParseInternalKey(corrupt_key, &ikey)); ASSERT_TRUE(reader.Get( - ReadOptions(), Slice(not_found_key2), &v, + ReadOptions(), corrupt_key, &v, AssertValues, nullptr).IsCorruption()); ASSERT_EQ(0, v.call_count); ASSERT_OK(reader.status()); // Test read when key is unused key. - std::string unused_user_key = "keys10:"; + std::string unused_key = + reader.GetTableProperties()->user_collected_properties.at( + CuckooTablePropertyNames::kEmptyKey); // Add hash values that map to empty buckets. - AddHashLookups(unused_user_key, kNumHashFunc, kNumHashFunc); - std::string unused_key; - ParsedInternalKey ikey3(unused_user_key, 1000, kTypeValue); - AppendInternalKey(&unused_key, ikey3); + AddHashLookups(ExtractUserKey(unused_key).ToString(), + kNumHashFunc, kNumHashFunc); ASSERT_OK(reader.Get( ReadOptions(), Slice(unused_key), &v, AssertValues, nullptr)); ASSERT_EQ(0, v.call_count); @@ -318,6 +401,10 @@ void BM_CuckooRead(uint64_t num, uint32_t key_length, std::random_shuffle(keys.begin(), keys.end()); uint64_t time_now = env->NowMicros(); + reader.NewIterator(ReadOptions(), nullptr); + fprintf(stderr, "Time taken for preparing iterator for %lu items: %lu ms.\n", + num, (env->NowMicros() - time_now)/1000); + time_now = env->NowMicros(); for (uint64_t i = 0; i < num_reads; ++i) { reader.Get(r_options, Slice(keys[i % num]), nullptr, DoNothing, nullptr); }