Trigger read compaction only if seeks to storage are incurred.

Summary:
In the current code, a Get() call can trigger compaction if it has to look at more than one file. This causes unnecessary compaction because looking at more than one file is a penalty only if the file is not yet in the cache. Also, th current code counts these files before the bloom filter check is applied.

This patch counts a 'seek' only if the file fails the bloom filter
check and has to read in data block(s) from the storage.

This patch also counts a 'seek' if a file is not present in the file-cache, because opening a file means that its index blocks need to be read into cache.

Test Plan: unit test attached. I will probably add one more unti tests.

Reviewers: heyongqiang

Reviewed By: heyongqiang

CC: MarkCallaghan

Differential Revision: https://reviews.facebook.net/D5709
main
Dhruba Borthakur 12 years ago
parent 92368ab8a2
commit c1bb32e1ba
  1. 66
      db/db_test.cc
  2. 10
      db/table_cache.cc
  3. 6
      db/table_cache.h
  4. 35
      db/version_set.cc
  5. 4
      include/leveldb/table.h
  6. 20
      table/table.cc

@ -592,7 +592,7 @@ TEST(DBTest, GetEncountersEmptyLevel) {
// Step 4: Wait for compaction to finish
env_->SleepForMicroseconds(1000000);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(0), 1); // XXX
} while (ChangeOptions());
}
@ -1817,6 +1817,70 @@ TEST(DBTest, SnapshotFiles) {
dbfull()->DisableFileDeletions();
}
TEST(DBTest, ReadCompaction) {
std::string value(4096, '4'); // a string of size 4K
{
Options options = CurrentOptions();
options.create_if_missing = true;
options.max_open_files = 20; // only 10 file in file-cache
options.target_file_size_base = 512;
options.write_buffer_size = 64 * 1024;
options.filter_policy = NULL;
options.block_size = 4096;
options.block_cache = NewLRUCache(0); // Prevent cache hits
Reopen(&options);
// Write 8MB (2000 values, each 4K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
std::vector<std::string> values;
for (int i = 0; i < 2000; i++) {
ASSERT_OK(Put(Key(i), value));
}
// clear level 0 and 1 if necessary.
dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, NULL, NULL);
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
// write some new keys into level 0
for (int i = 0; i < 2000; i = i + 16) {
ASSERT_OK(Put(Key(i), value));
}
dbfull()->Flush(FlushOptions());
// Wait for any write compaction to finish
dbfull()->TEST_WaitForCompact();
// remember number of files in each level
int l1 = NumTableFilesAtLevel(0);
int l2 = NumTableFilesAtLevel(1);
int l3 = NumTableFilesAtLevel(3);
ASSERT_NE(NumTableFilesAtLevel(0), 0);
ASSERT_NE(NumTableFilesAtLevel(1), 0);
ASSERT_NE(NumTableFilesAtLevel(2), 0);
// read a bunch of times, trigger read compaction
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 2000; i++) {
Get(Key(i));
}
}
// wait for read compaction to finish
env_->SleepForMicroseconds(1000000);
// verify that the number of files have decreased
// in some level, indicating that there was a compaction
ASSERT_TRUE(NumTableFilesAtLevel(0) < l1 ||
NumTableFilesAtLevel(1) < l2 ||
NumTableFilesAtLevel(2) < l3);
delete options.block_cache;
}
}
// Multi-threaded test:
namespace {

@ -48,7 +48,7 @@ TableCache::~TableCache() {
}
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Cache::Handle** handle, bool* tableIO) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
@ -56,6 +56,9 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
DBStatistics* stats = (DBStatistics*) options_->statistics;
*handle = cache_->Lookup(key);
if (*handle == NULL) {
if (tableIO != NULL) {
*tableIO = true; // we had to do IO from storage
}
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = NULL;
Table* table = NULL;
@ -109,9 +112,10 @@ Status TableCache::Get(const ReadOptions& options,
uint64_t file_size,
const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&)) {
void (*saver)(void*, const Slice&, const Slice&, bool),
bool* tableIO) {
Cache::Handle* handle = NULL;
Status s = FindTable(file_number, file_size, &handle);
Status s = FindTable(file_number, file_size, &handle, tableIO);
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, saver);

@ -42,7 +42,8 @@ class TableCache {
uint64_t file_size,
const Slice& k,
void* arg,
void (*handle_result)(void*, const Slice&, const Slice&));
void (*handle_result)(void*, const Slice&, const Slice&, bool),
bool* tableIO);
// Evict any entry for the specified file number
void Evict(uint64_t file_number);
@ -53,7 +54,8 @@ class TableCache {
const Options* options_;
Cache* cache_;
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**,
bool* tableIO = NULL);
};
} // namespace leveldb

@ -243,11 +243,13 @@ struct Saver {
const Comparator* ucmp;
Slice user_key;
std::string* value;
bool didIO; // did we do any disk io?
};
}
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
static void SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
Saver* s = reinterpret_cast<Saver*>(arg);
ParsedInternalKey parsed_key;
s->didIO = didIO;
if (!ParseInternalKey(ikey, &parsed_key)) {
s->state = kCorrupt;
} else {
@ -335,26 +337,39 @@ Status Version::Get(const ReadOptions& options,
}
for (uint32_t i = 0; i < num_files; ++i) {
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
saver.didIO = false;
bool tableIO = false;
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue);
ikey, &saver, SaveValue, &tableIO);
if (!s.ok()) {
return s;
}
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
// If we did any IO as part of the read, then we remember it because
// it is a possible candidate for seek-based compaction. saver.didIO
// is true if the block had to be read in from storage and was not
// pre-exisiting in the block cache. Also, if this file was not pre-
// existing in the table cache and had to be freshly opened that needed
// the index blocks to be read-in, then tableIO is true. One thing
// to note is that the index blocks are not part of the block cache.
if (saver.didIO || tableIO) {
last_file_read = f;
last_file_read_level = level;
}
switch (saver.state) {
case kNotFound:
break; // Keep searching in other files

@ -61,6 +61,8 @@ class Table {
explicit Table(Rep* rep) { rep_ = rep; }
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&);
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO);
// Calls (*handle_result)(arg, ...) with the entry found after a call
// to Seek(key). May not make such a call if filter policy says
@ -69,7 +71,7 @@ class Table {
Status InternalGet(
const ReadOptions&, const Slice& key,
void* arg,
void (*handle_result)(void* arg, const Slice& k, const Slice& v));
void (*handle_result)(void* arg, const Slice& k, const Slice& v, bool));
void ReadMeta(const Footer& footer);

@ -153,7 +153,8 @@ static void ReleaseBlock(void* arg, void* h) {
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value) {
const Slice& index_value,
bool* didIO) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = NULL;
@ -184,6 +185,9 @@ Iterator* Table::BlockReader(void* arg,
key, block, block->size(), &DeleteCachedBlock);
}
}
if (didIO != NULL) {
*didIO = true; // we did some io from storage
}
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
@ -207,6 +211,12 @@ Iterator* Table::BlockReader(void* arg,
return iter;
}
Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value) {
return BlockReader(arg, options, index_value, NULL);
}
Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
@ -215,7 +225,7 @@ Iterator* Table::NewIterator(const ReadOptions& options) const {
Status Table::InternalGet(const ReadOptions& options, const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&)) {
void (*saver)(void*, const Slice&, const Slice&, bool)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
@ -229,10 +239,12 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
// Not found
} else {
Slice handle = iiter->value();
Iterator* block_iter = BlockReader(this, options, iiter->value());
bool didIO = false;
Iterator* block_iter = BlockReader(this, options, iiter->value(),
&didIO);
block_iter->Seek(k);
if (block_iter->Valid()) {
(*saver)(arg, block_iter->key(), block_iter->value());
(*saver)(arg, block_iter->key(), block_iter->value(), didIO);
}
s = block_iter->status();
delete block_iter;

Loading…
Cancel
Save