|
|
|
@ -8,6 +8,7 @@ |
|
|
|
|
|
|
|
|
|
#include <mutex> |
|
|
|
|
#include <string> |
|
|
|
|
|
|
|
|
|
#include "rocksdb/db.h" |
|
|
|
|
#include "rocksdb/env.h" |
|
|
|
|
#include "rocksdb/options.h" |
|
|
|
@ -39,8 +40,8 @@ class Compactor : public EventListener { |
|
|
|
|
// and column family. It is the caller's responsibility to
|
|
|
|
|
// destroy the returned CompactionTask. Returns "nullptr"
|
|
|
|
|
// if it cannot find a proper compaction task.
|
|
|
|
|
virtual CompactionTask* PickCompaction( |
|
|
|
|
DB* db, const std::string& cf_name) = 0; |
|
|
|
|
virtual CompactionTask* PickCompaction(DB* db, |
|
|
|
|
const std::string& cf_name) = 0; |
|
|
|
|
|
|
|
|
|
// Schedule and run the specified compaction task in background.
|
|
|
|
|
virtual void ScheduleCompaction(CompactionTask* task) = 0; |
|
|
|
@ -48,13 +49,11 @@ class Compactor : public EventListener { |
|
|
|
|
|
|
|
|
|
// Example structure that describes a compaction task.
|
|
|
|
|
struct CompactionTask { |
|
|
|
|
CompactionTask( |
|
|
|
|
DB* _db, Compactor* _compactor, |
|
|
|
|
CompactionTask(DB* _db, Compactor* _compactor, |
|
|
|
|
const std::string& _column_family_name, |
|
|
|
|
const std::vector<std::string>& _input_file_names, |
|
|
|
|
const int _output_level, |
|
|
|
|
const CompactionOptions& _compact_options, |
|
|
|
|
bool _retry_on_fail) |
|
|
|
|
const CompactionOptions& _compact_options, bool _retry_on_fail) |
|
|
|
|
: db(_db), |
|
|
|
|
compactor(_compactor), |
|
|
|
|
column_family_name(_column_family_name), |
|
|
|
@ -77,15 +76,13 @@ class FullCompactor : public Compactor { |
|
|
|
|
public: |
|
|
|
|
explicit FullCompactor(const Options options) : options_(options) { |
|
|
|
|
compact_options_.compression = options_.compression; |
|
|
|
|
compact_options_.output_file_size_limit = |
|
|
|
|
options_.target_file_size_base; |
|
|
|
|
compact_options_.output_file_size_limit = options_.target_file_size_base; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// When flush happens, it determines whether to trigger compaction. If
|
|
|
|
|
// triggered_writes_stop is true, it will also set the retry flag of
|
|
|
|
|
// compaction-task to true.
|
|
|
|
|
void OnFlushCompleted( |
|
|
|
|
DB* db, const FlushJobInfo& info) override { |
|
|
|
|
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { |
|
|
|
|
CompactionTask* task = PickCompaction(db, info.cf_name); |
|
|
|
|
if (task != nullptr) { |
|
|
|
|
if (info.triggered_writes_stop) { |
|
|
|
@ -97,8 +94,7 @@ class FullCompactor : public Compactor { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Always pick a compaction which includes all files whenever possible.
|
|
|
|
|
CompactionTask* PickCompaction( |
|
|
|
|
DB* db, const std::string& cf_name) override { |
|
|
|
|
CompactionTask* PickCompaction(DB* db, const std::string& cf_name) override { |
|
|
|
|
ColumnFamilyMetaData cf_meta; |
|
|
|
|
db->GetColumnFamilyMetaData(&cf_meta); |
|
|
|
|
|
|
|
|
@ -111,8 +107,7 @@ class FullCompactor : public Compactor { |
|
|
|
|
input_file_names.push_back(file.name); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return new CompactionTask( |
|
|
|
|
db, this, cf_name, input_file_names, |
|
|
|
|
return new CompactionTask(db, this, cf_name, input_file_names, |
|
|
|
|
options_.num_levels - 1, compact_options_, false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -127,16 +122,14 @@ class FullCompactor : public Compactor { |
|
|
|
|
assert(task); |
|
|
|
|
assert(task->db); |
|
|
|
|
Status s = task->db->CompactFiles( |
|
|
|
|
task->compact_options, |
|
|
|
|
task->input_file_names, |
|
|
|
|
task->output_level); |
|
|
|
|
task->compact_options, task->input_file_names, task->output_level); |
|
|
|
|
printf("CompactFiles() finished with status %s\n", s.ToString().c_str()); |
|
|
|
|
if (!s.ok() && !s.IsIOError() && task->retry_on_fail) { |
|
|
|
|
// If a compaction task with its retry_on_fail=true failed,
|
|
|
|
|
// try to schedule another compaction in case the reason
|
|
|
|
|
// is not an IO error.
|
|
|
|
|
CompactionTask* new_task = task->compactor->PickCompaction( |
|
|
|
|
task->db, task->column_family_name); |
|
|
|
|
CompactionTask* new_task = |
|
|
|
|
task->compactor->PickCompaction(task->db, task->column_family_name); |
|
|
|
|
task->compactor->ScheduleCompaction(new_task); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -173,8 +166,7 @@ int main() { |
|
|
|
|
// verify the values are still there
|
|
|
|
|
std::string value; |
|
|
|
|
for (int i = 1000; i < 99999; ++i) { |
|
|
|
|
db->Get(ReadOptions(), std::to_string(i), |
|
|
|
|
&value); |
|
|
|
|
db->Get(ReadOptions(), std::to_string(i), &value); |
|
|
|
|
assert(value == std::string(500, 'a' + (i % 26))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|