[RocksDB] Support for column families in manifest

Summary:
<This diff is for Column Family branch>

Added fields in manifest file to support adding and deleting column families.

Pretty simple change, each version edit record can be:
1. add column family
2. drop column family
3. add and delete N files from a single column family (compactions and flushes will generate such records)

Test Plan: make check works, the code is backward compatible

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14733
main
Igor Canadi 11 years ago
parent 6de1b5b83e
commit 7535443083
  1. 4
      db/db_test.cc
  2. 60
      db/version_edit.cc
  3. 36
      db/version_edit.h
  4. 12
      db/version_edit_test.cc
  5. 4
      db/version_set.cc
  6. 4
      db/version_set.h
  7. 4
      include/rocksdb/column_family.h

@ -767,8 +767,8 @@ TEST(DBTest, LevelLimitReopen) {
Status s = TryReopen(&options);
ASSERT_EQ(s.IsCorruption(), true);
ASSERT_EQ(s.ToString(),
"Corruption: VersionEdit: db already has "
"more levels than options.num_levels");
"Corruption: VersionEdit: column family already has "
"more levels than specified");
options.num_levels = 10;
options.max_bytes_for_level_multiplier_additional.resize(10, 1);

@ -11,6 +11,7 @@
#include "db/version_set.h"
#include "util/coding.h"
#include "rocksdb/slice.h"
namespace rocksdb {
@ -28,7 +29,11 @@ enum Tag {
kPrevLogNumber = 9,
// these are new formats divergent from open source leveldb
kNewFile2 = 100 // store smallest & largest seqno
kNewFile2 = 100, // store smallest & largest seqno
kColumnFamily = 200, // specify column family for version edit
kColumnFamilyAdd = 201,
kColumnFamilyDrop = 202,
};
void VersionEdit::Clear() {
@ -44,6 +49,10 @@ void VersionEdit::Clear() {
has_last_sequence_ = false;
deleted_files_.clear();
new_files_.clear();
column_family_ = 0;
is_column_family_add_ = 0;
is_column_family_drop_ = 0;
column_family_name_.clear();
}
void VersionEdit::EncodeTo(std::string* dst) const {
@ -93,6 +102,21 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint64(dst, f.smallest_seqno);
PutVarint64(dst, f.largest_seqno);
}
// 0 is default and does not need to be explicitly written
if (column_family_ != 0) {
PutVarint32(dst, kColumnFamily);
PutVarint32(dst, column_family_);
}
if (is_column_family_add_) {
PutVarint32(dst, kColumnFamilyAdd);
PutLengthPrefixedSlice(dst, Slice(column_family_name_));
}
if (is_column_family_drop_) {
PutVarint32(dst, kColumnFamilyDrop);
}
}
static bool GetInternalKey(Slice* input, InternalKey* dst) {
@ -113,7 +137,7 @@ bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) {
return true;
} else {
if ((int)v >= number_levels_) {
*msg = "db already has more levels than options.num_levels";
*msg = "column family already has more levels than specified";
}
return false;
}
@ -227,6 +251,29 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
case kColumnFamily:
if (!GetVarint32(&input, &column_family_)) {
if (!msg) {
msg = "set column family id";
}
}
break;
case kColumnFamilyAdd:
if (GetLengthPrefixedSlice(&input, &str)) {
is_column_family_add_ = true;
column_family_name_ = str.ToString();
} else {
if (!msg) {
msg = "column family add";
}
}
break;
case kColumnFamilyDrop:
is_column_family_drop_ = true;
break;
default:
msg = "unknown tag";
break;
@ -294,6 +341,15 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append(" .. ");
r.append(f.largest.DebugString(hex_key));
}
r.append("\n ColumnFamily: ");
AppendNumberTo(&r, column_family_);
if (is_column_family_add_) {
r.append("\n ColumnFamilyAdd: ");
r.append(column_family_name_);
}
if (is_column_family_drop_) {
r.append("\n ColumnFamilyDrop");
}
r.append("\n}\n");
return r;
}

@ -29,13 +29,12 @@ struct FileMetaData {
SequenceNumber largest_seqno; // The largest seqno in this file
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0),
being_compacted(false) { }
being_compacted(false) {}
};
class VersionEdit {
public:
explicit VersionEdit(int number_levels) :
number_levels_(number_levels) {
explicit VersionEdit(int number_levels) : number_levels_(number_levels) {
Clear();
}
~VersionEdit() { }
@ -96,6 +95,27 @@ class VersionEdit {
return new_files_.size() + deleted_files_.size();
}
void SetColumnFamily(uint32_t column_family_id) {
column_family_ = column_family_id;
}
// set column family ID by calling SetColumnFamily()
void AddColumnFamily(const std::string& name) {
assert(!is_column_family_drop_);
assert(!is_column_family_add_);
assert(NumEntries() == 0);
is_column_family_add_ = true;
column_family_name_ = name;
}
// set column family ID by calling SetColumnFamily()
void DropColumnFamily() {
assert(!is_column_family_drop_);
assert(!is_column_family_add_);
assert(NumEntries() == 0);
is_column_family_drop_ = true;
}
void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);
@ -123,6 +143,16 @@ class VersionEdit {
std::vector< std::pair<int, InternalKey> > compact_pointers_;
DeletedFileSet deleted_files_;
std::vector< std::pair<int, FileMetaData> > new_files_;
// Each version edit record should have column_family_id set
// If it's not set, it is default (0)
uint32_t column_family_;
// a version edit can be either column_family add or
// column_family drop. If it's column family add,
// it also includes column family name.
bool is_column_family_drop_;
bool is_column_family_add_;
std::string column_family_name_;
};
} // namespace rocksdb

@ -46,6 +46,18 @@ TEST(VersionEditTest, EncodeDecode) {
TestEncodeDecode(edit);
}
TEST(VersionEditTest, ColumnFamilyTest) {
VersionEdit edit(7);
edit.SetColumnFamily(2);
edit.AddColumnFamily("column_family");
TestEncodeDecode(edit);
edit.Clear();
edit.SetColumnFamily(3);
edit.DropColumnFamily();
TestEncodeDecode(edit);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -987,7 +987,7 @@ class VersionSet::Builder {
#ifndef NDEBUG
// a file to be deleted better exist in the previous version
bool found = false;
for (int l = 0; !found && l < edit->number_levels_; l++) {
for (int l = 0; !found && l < vset_->NumberLevels(); l++) {
const std::vector<FileMetaData*>& base_files = base_->files_[l];
for (unsigned int i = 0; i < base_files.size(); i++) {
FileMetaData* f = base_files[i];
@ -1000,7 +1000,7 @@ class VersionSet::Builder {
// if the file did not exist in the previous version, then it
// is possibly moved from lower level to higher level in current
// version
for (int l = level+1; !found && l < edit->number_levels_; l++) {
for (int l = level+1; !found && l < vset_->NumberLevels(); l++) {
const FileSet* added = levels_[l].added_files;
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end(); ++added_iter) {

@ -505,6 +505,10 @@ class VersionSet {
// generates a increasing version number for every new version
uint64_t current_version_number_;
// column family metadata
std::unordered_map<std::string, ColumnFamilyHandle> column_families_;
uint32_t max_column_family_id_;
// Queue of writers to the manifest file
std::deque<ManifestWriter*> manifest_writers_;

@ -18,11 +18,11 @@ namespace rocksdb {
// should not be used by the clients
struct ColumnFamilyHandle {
int id;
uint32_t id;
// default
ColumnFamilyHandle() : id() {}
/* implicit */
ColumnFamilyHandle(int _id) : id(_id) {}
ColumnFamilyHandle(uint32_t _id) : id(_id) {}
};
const ColumnFamilyHandle default_column_family = ColumnFamilyHandle();

Loading…
Cancel
Save