[CF] WriteBatch to take in ColumnFamilyHandle

Summary: Client doesn't need to know anything about ColumnFamily ID. By making WriteBatch take ColumnFamilyHandle as a parameter, we can eliminate method GetID() from ColumnFamilyHandle

Test Plan: column_family_test

Reviewers: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16887
main
Igor Canadi 11 years ago
parent f0e1e3ebf1
commit db234133a9
  1. 2
      db/column_family.h
  2. 13
      db/column_family_test.cc
  3. 6
      db/db_impl.cc
  4. 32
      db/write_batch.cc
  5. 2
      include/rocksdb/db.h
  6. 19
      include/rocksdb/write_batch.h

@ -47,7 +47,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
virtual ~ColumnFamilyHandleImpl(); virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; } virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual uint32_t GetID() const override; virtual uint32_t GetID() const;
private: private:
ColumnFamilyData* cfd_; ColumnFamilyData* cfd_;

@ -289,7 +289,8 @@ TEST(ColumnFamilyTest, DontReuseColumnFamilyID) {
Open(); Open();
CreateColumnFamilies({"one", "two", "three"}); CreateColumnFamilies({"one", "two", "three"});
for (size_t i = 0; i < handles_.size(); ++i) { for (size_t i = 0; i < handles_.size(); ++i) {
ASSERT_EQ(i, handles_[i]->GetID()); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(i, cfh->GetID());
} }
if (iter == 1) { if (iter == 1) {
Reopen(); Reopen();
@ -303,7 +304,8 @@ TEST(ColumnFamilyTest, DontReuseColumnFamilyID) {
} }
CreateColumnFamilies({"three2"}); CreateColumnFamilies({"three2"});
// ID 3 that was used for dropped column family "three" should not be reused // ID 3 that was used for dropped column family "three" should not be reused
ASSERT_EQ(4, handles_[3]->GetID()); auto cfh3 = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[3]);
ASSERT_EQ(4, cfh3->GetID());
Close(); Close();
Destroy(); Destroy();
} }
@ -362,12 +364,13 @@ TEST(ColumnFamilyTest, DropTest) {
TEST(ColumnFamilyTest, WriteBatchFailure) { TEST(ColumnFamilyTest, WriteBatchFailure) {
Open(); Open();
CreateColumnFamiliesAndReopen({"one", "two"});
WriteBatch batch; WriteBatch batch;
batch.Put(1, Slice("non-existing"), Slice("column-family")); batch.Put(handles_[1], Slice("non-existing"), Slice("column-family"));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
DropColumnFamilies({1});
Status s = db_->Write(WriteOptions(), &batch); Status s = db_->Write(WriteOptions(), &batch);
ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_TRUE(s.IsInvalidArgument());
CreateColumnFamilies({"one"});
ASSERT_OK(db_->Write(WriteOptions(), &batch));
Close(); Close();
} }

@ -3992,21 +3992,21 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type, // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
// and we allocate 11 extra bytes for key length, as well as value length. // and we allocate 11 extra bytes for key length, as well as value length.
WriteBatch batch(key.size() + value.size() + 24); WriteBatch batch(key.size() + value.size() + 24);
batch.Put(column_family->GetID(), key, value); batch.Put(column_family, key, value);
return Write(opt, &batch); return Write(opt, &batch);
} }
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
WriteBatch batch; WriteBatch batch;
batch.Delete(column_family->GetID(), key); batch.Delete(column_family, key);
return Write(opt, &batch); return Write(opt, &batch);
} }
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) { const Slice& key, const Slice& value) {
WriteBatch batch; WriteBatch batch;
batch.Merge(column_family->GetID(), key, value); batch.Merge(column_family, key, value);
return Write(opt, &batch); return Write(opt, &batch);
} }

@ -173,8 +173,14 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq); EncodeFixed64(&b->rep_[0], seq);
} }
void WriteBatch::Put(uint32_t column_family_id, const Slice& key, void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) { const Slice& value) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeValue)); rep_.push_back(static_cast<char>(kTypeValue));
@ -186,8 +192,14 @@ void WriteBatch::Put(uint32_t column_family_id, const Slice& key,
PutLengthPrefixedSlice(&rep_, value); PutLengthPrefixedSlice(&rep_, value);
} }
void WriteBatch::Put(uint32_t column_family_id, const SliceParts& key, void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) { const SliceParts& value) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeValue)); rep_.push_back(static_cast<char>(kTypeValue));
@ -199,7 +211,13 @@ void WriteBatch::Put(uint32_t column_family_id, const SliceParts& key,
PutLengthPrefixedSliceParts(&rep_, value); PutLengthPrefixedSliceParts(&rep_, value);
} }
void WriteBatch::Delete(uint32_t column_family_id, const Slice& key) { void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeDeletion)); rep_.push_back(static_cast<char>(kTypeDeletion));
@ -210,8 +228,14 @@ void WriteBatch::Delete(uint32_t column_family_id, const Slice& key) {
PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, key);
} }
void WriteBatch::Merge(uint32_t column_family_id, const Slice& key, void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) { const Slice& value) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeMerge)); rep_.push_back(static_cast<char>(kTypeMerge));

@ -27,8 +27,6 @@ using std::unique_ptr;
class ColumnFamilyHandle { class ColumnFamilyHandle {
public: public:
virtual ~ColumnFamilyHandle() {} virtual ~ColumnFamilyHandle() {}
virtual uint32_t GetID() const = 0;
}; };
extern const std::string default_column_family_name; extern const std::string default_column_family_name;

@ -31,6 +31,7 @@
namespace rocksdb { namespace rocksdb {
class Slice; class Slice;
class ColumnFamilyHandle;
struct SliceParts; struct SliceParts;
class WriteBatch { class WriteBatch {
@ -39,31 +40,33 @@ class WriteBatch {
~WriteBatch(); ~WriteBatch();
// Store the mapping "key->value" in the database. // Store the mapping "key->value" in the database.
void Put(uint32_t column_family_id, const Slice& key, const Slice& value); void Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
void Put(const Slice& key, const Slice& value) { void Put(const Slice& key, const Slice& value) {
Put(0, key, value); Put(nullptr, key, value);
} }
// Variant of Put() that gathers output like writev(2). The key and value // Variant of Put() that gathers output like writev(2). The key and value
// that will be written to the database are concatentations of arrays of // that will be written to the database are concatentations of arrays of
// slices. // slices.
void Put(uint32_t column_family_id, const SliceParts& key, void Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value); const SliceParts& value);
void Put(const SliceParts& key, const SliceParts& value) { void Put(const SliceParts& key, const SliceParts& value) {
Put(0, key, value); Put(nullptr, key, value);
} }
// Merge "value" with the existing value of "key" in the database. // Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)" // "key->merge(existing, value)"
void Merge(uint32_t column_family_id, const Slice& key, const Slice& value); void Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
void Merge(const Slice& key, const Slice& value) { void Merge(const Slice& key, const Slice& value) {
Merge(0, key, value); Merge(nullptr, key, value);
} }
// If the database contains a mapping for "key", erase it. Else do nothing. // If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(uint32_t column_family_id, const Slice& key); void Delete(ColumnFamilyHandle* column_family, const Slice& key);
void Delete(const Slice& key) { void Delete(const Slice& key) {
Delete(0, key); Delete(nullptr, key);
} }
// Append a blob of arbitrary size to the records in this batch. The blob will // Append a blob of arbitrary size to the records in this batch. The blob will

Loading…
Cancel
Save