From 8b7ab9951c7dd3eb26ff5170702bc81b045293e2 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 25 Feb 2014 17:30:54 -0800 Subject: [PATCH] [CF] Handle failure in WriteBatch::Handler Summary: * Add ColumnFamilyHandle::GetID() function. Client needs to know column family's ID to be able to construct WriteBatch * Handle WriteBatch::Handler failure gracefully. Since WriteBatch is not a very smart function (it takes raw CF id), client can add data to WriteBatch for column family that doesn't exist. In that case, we need to gracefully return failure status from DB::Write(). To do that, I added a return Status to WriteBatch functions PutCF, DeleteCF and MergeCF. Test Plan: Added test to column_family_test Reviewers: dhruba, haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16323 --- HISTORY.md | 1 - db/column_family.cc | 2 ++ db/column_family.h | 2 ++ db/column_family_test.cc | 11 +++++++ db/db_impl.cc | 15 ++++----- db/db_test.cc | 9 ++++-- db/write_batch.cc | 61 ++++++++++++++++++++++------------- db/write_batch_test.cc | 21 ++++++------ include/rocksdb/db.h | 2 ++ include/rocksdb/write_batch.h | 22 ++++++++++--- 10 files changed, 98 insertions(+), 48 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index c14094351..a808682d9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,7 +6,6 @@ executed in high priority thread pool. ## Unreleased (will be relased in 2.8) -## Unreleased ### Public API changes diff --git a/db/column_family.cc b/db/column_family.cc index 3204a95d4..91f222cee 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -44,6 +44,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { } } +uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); } + namespace { // Fix user-supplied options to be reasonable template diff --git a/db/column_family.h b/db/column_family.h index 6b9f636d7..eff4be4bb 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -41,6 +41,8 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle { virtual ~ColumnFamilyHandleImpl(); virtual ColumnFamilyData* cfd() const { return cfd_; } + virtual uint32_t GetID() const override; + private: ColumnFamilyData* cfd_; DBImpl* db_; diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 9917ca0c1..47797eecc 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -259,6 +259,17 @@ TEST(ColumnFamilyTest, DropTest) { } } +TEST(ColumnFamilyTest, WriteBatchFailure) { + Open(); + WriteBatch batch; + batch.Put(1, Slice("non-existing"), Slice("column-family")); + Status s = db_->Write(WriteOptions(), &batch); + ASSERT_TRUE(s.IsInvalidArgument()); + CreateColumnFamilies({"one"}); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + Close(); +} + TEST(ColumnFamilyTest, ReadWrite) { ASSERT_OK(Open({"default"})); CreateColumnFamilies({"one", "two"}); diff --git a/db/db_impl.cc b/db/db_impl.cc index 1379acbb4..66d203737 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3304,11 +3304,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer); if (!status.ok()) { - // Panic for in-memory corruptions + // Iteration failed (either in-memory writebatch corruption (very + // bad), or the client specified invalid column family). Return + // failure. // Note that existing logic was not sound. Any partial failure writing // into the memtable would result in a state that some write ops might // have succeeded in memtable but Status reports error for all writes. - throw std::runtime_error("In memory WriteBatch corruption!"); + return status; } SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, last_sequence); @@ -3822,24 +3824,21 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, // and we allocate 11 extra bytes for key length, as well as value length. WriteBatch batch(key.size() + value.size() + 24); - auto cfh = reinterpret_cast(column_family); - batch.Put(cfh->cfd()->GetID(), key, value); + batch.Put(column_family->GetID(), key, value); return Write(opt, &batch); } Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { WriteBatch batch; - auto cfh = reinterpret_cast(column_family); - batch.Delete(cfh->cfd()->GetID(), key); + batch.Delete(column_family->GetID(), key); return Write(opt, &batch); } Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { WriteBatch batch; - auto cfh = reinterpret_cast(column_family); - batch.Merge(cfh->cfd()->GetID(), key, value); + batch.Merge(column_family->GetID(), key, value); return Write(opt, &batch); } diff --git a/db/db_test.cc b/db/db_test.cc index 84f6e5e52..70b8439a1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4804,19 +4804,22 @@ TEST(DBTest, TransactionLogIteratorBlobs) { auto res = OpenTransactionLogIter(0)->GetBatch(); struct Handler : public WriteBatch::Handler { std::string seen; - virtual void PutCF(uint32_t cf, const Slice& key, const Slice& value) { + virtual Status PutCF(uint32_t cf, const Slice& key, const Slice& value) { seen += "Put(" + std::to_string(cf) + ", " + key.ToString() + ", " + std::to_string(value.size()) + ")"; + return Status::OK(); } - virtual void MergeCF(uint32_t cf, const Slice& key, const Slice& value) { + virtual Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) { seen += "Merge(" + std::to_string(cf) + ", " + key.ToString() + ", " + std::to_string(value.size()) + ")"; + return Status::OK(); } virtual void LogData(const Slice& blob) { seen += "LogData(" + blob.ToString() + ")"; } - virtual void DeleteCF(uint32_t cf, const Slice& key) { + virtual Status DeleteCF(uint32_t cf, const Slice& key) { seen += "Delete(" + std::to_string(cf) + ", " + key.ToString() + ")"; + return Status::OK(); } } handler; res.writeBatchPtr->Iterate(&handler); diff --git a/db/write_batch.cc b/db/write_batch.cc index 827f60aa7..680bd1ccd 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -89,7 +89,8 @@ Status WriteBatch::Iterate(Handler* handler) const { input.remove_prefix(kHeader); Slice key, value, blob; int found = 0; - while (!input.empty() && handler->Continue()) { + Status s; + while (s.ok() && !input.empty() && handler->Continue()) { char tag = input[0]; input.remove_prefix(1); uint32_t column_family = 0; // default @@ -98,11 +99,11 @@ Status WriteBatch::Iterate(Handler* handler) const { if (!GetVarint32(&input, &column_family)) { return Status::Corruption("bad WriteBatch Put"); } - // intentional fallthrough + // intentional fallthrough case kTypeValue: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->PutCF(column_family, key, value); + s = handler->PutCF(column_family, key, value); found++; } else { return Status::Corruption("bad WriteBatch Put"); @@ -112,10 +113,10 @@ Status WriteBatch::Iterate(Handler* handler) const { if (!GetVarint32(&input, &column_family)) { return Status::Corruption("bad WriteBatch Delete"); } - // intentional fallthrough + // intentional fallthrough case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { - handler->DeleteCF(column_family, key); + s = handler->DeleteCF(column_family, key); found++; } else { return Status::Corruption("bad WriteBatch Delete"); @@ -125,11 +126,11 @@ Status WriteBatch::Iterate(Handler* handler) const { if (!GetVarint32(&input, &column_family)) { return Status::Corruption("bad WriteBatch Merge"); } - // intentional fallthrough + // intentional fallthrough case kTypeMerge: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->MergeCF(column_family, key, value); + s = handler->MergeCF(column_family, key, value); found++; } else { return Status::Corruption("bad WriteBatch Merge"); @@ -146,7 +147,10 @@ Status WriteBatch::Iterate(Handler* handler) const { return Status::Corruption("unknown WriteBatch tag"); } } - if (found != WriteBatchInternal::Count(this)) { + if (!s.ok()) { + return s; + } + if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); @@ -251,13 +255,15 @@ class MemTableInserter : public WriteBatch::Handler { return log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber(); } - virtual void PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) { + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { bool found = cf_mems_->Seek(column_family_id); - // TODO(icanadi) if found = false somehow return the error to caller - // Will need to change public API to do this - if (!found || IgnoreUpdate()) { - return; + if (!found) { + return Status::InvalidArgument( + "Invalid column family specified in write batch"); + } + if (IgnoreUpdate()) { + return Status::OK(); } MemTable* mem = cf_mems_->GetMemTable(); const Options* options = cf_mems_->GetFullOptions(); @@ -304,13 +310,18 @@ class MemTableInserter : public WriteBatch::Handler { // sequence number. Even if the update eventually fails and does not result // in memtable add/update. sequence_++; + return Status::OK(); } - virtual void MergeCF(uint32_t column_family_id, const Slice& key, - const Slice& value) { + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { bool found = cf_mems_->Seek(column_family_id); - if (!found || IgnoreUpdate()) { - return; + if (!found) { + return Status::InvalidArgument( + "Invalid column family specified in write batch"); + } + if (IgnoreUpdate()) { + return Status::OK(); } MemTable* mem = cf_mems_->GetMemTable(); const Options* options = cf_mems_->GetFullOptions(); @@ -372,12 +383,17 @@ class MemTableInserter : public WriteBatch::Handler { } sequence_++; + return Status::OK(); } - virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { bool found = cf_mems_->Seek(column_family_id); - if (!found || IgnoreUpdate()) { - return; + if (!found) { + return Status::InvalidArgument( + "Invalid column family specified in write batch"); + } + if (IgnoreUpdate()) { + return Status::OK(); } MemTable* mem = cf_mems_->GetMemTable(); const Options* options = cf_mems_->GetFullOptions(); @@ -393,11 +409,12 @@ class MemTableInserter : public WriteBatch::Handler { } if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES); - return; + return Status::OK(); } } mem->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; + return Status::OK(); } }; } // namespace diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index d56d7107a..773823216 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -145,7 +145,7 @@ TEST(WriteBatchTest, Append) { namespace { struct TestHandler : public WriteBatch::Handler { std::string seen; - virtual void PutCF(uint32_t column_family_id, const Slice& key, + virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { if (column_family_id == 0) { seen += "Put(" + key.ToString() + ", " + value.ToString() + ")"; @@ -153,8 +153,9 @@ namespace { seen += "PutCF(" + std::to_string(column_family_id) + ", " + key.ToString() + ", " + value.ToString() + ")"; } + return Status::OK(); } - virtual void MergeCF(uint32_t column_family_id, const Slice& key, + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { if (column_family_id == 0) { seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")"; @@ -162,17 +163,19 @@ namespace { seen += "MergeCF(" + std::to_string(column_family_id) + ", " + key.ToString() + ", " + value.ToString() + ")"; } + return Status::OK(); } virtual void LogData(const Slice& blob) { seen += "LogData(" + blob.ToString() + ")"; } - virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { if (column_family_id == 0) { seen += "Delete(" + key.ToString() + ")"; } else { seen += "DeleteCF(" + std::to_string(column_family_id) + ", " + key.ToString() + ")"; } + return Status::OK(); } }; } @@ -212,23 +215,23 @@ TEST(WriteBatchTest, Continue) { struct Handler : public TestHandler { int num_seen = 0; - virtual void PutCF(uint32_t column_family_id, const Slice& key, + virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { ++num_seen; - TestHandler::PutCF(column_family_id, key, value); + return TestHandler::PutCF(column_family_id, key, value); } - virtual void MergeCF(uint32_t column_family_id, const Slice& key, + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { ++num_seen; - TestHandler::MergeCF(column_family_id, key, value); + return TestHandler::MergeCF(column_family_id, key, value); } virtual void LogData(const Slice& blob) { ++num_seen; TestHandler::LogData(blob); } - virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { ++num_seen; - TestHandler::DeleteCF(column_family_id, key); + return TestHandler::DeleteCF(column_family_id, key); } virtual bool Continue() override { return num_seen < 3; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 9773fe3f3..7bd0c7c8a 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -27,6 +27,8 @@ using std::unique_ptr; class ColumnFamilyHandle { public: virtual ~ColumnFamilyHandle() {} + + virtual uint32_t GetID() const = 0; }; extern const std::string default_column_family_name; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 238d6d5f5..2f8112375 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -88,29 +88,41 @@ class WriteBatch { // default implementation will just call Put without column family for // backwards compatibility. If the column family is not default, // the function is noop - virtual void PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) { + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { if (column_family_id == 0) { + // Put() historically doesn't return status. We didn't want to be + // backwards incompatible so we didn't change the return status + // (this is a public API). We do an ordinary get and return Status::OK() Put(key, value); + return Status::OK(); } + return Status::InvalidArgument( + "non-default column family and PutCF not implemented"); } virtual void Put(const Slice& key, const Slice& value); // Merge and LogData are not pure virtual. Otherwise, we would break // existing clients of Handler on a source code level. The default // implementation of Merge simply throws a runtime exception. - virtual void MergeCF(uint32_t column_family_id, const Slice& key, - const Slice& value) { + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { if (column_family_id == 0) { Merge(key, value); + return Status::OK(); } + return Status::InvalidArgument( + "non-default column family and MergeCF not implemented"); } virtual void Merge(const Slice& key, const Slice& value); // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); - virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { if (column_family_id == 0) { Delete(key); + return Status::OK(); } + return Status::InvalidArgument( + "non-default column family and DeleteCF not implemented"); } virtual void Delete(const Slice& key); // Continue is called by WriteBatch::Iterate. If it returns false,