Ignore missing column families

Summary:
Before this diff, whenever we Write to non-existing column family, Write() would fail.

This diff adds an option to not fail a Write() when WriteBatch points to non-existing column family. MongoDB said this would be useful for them, since they might have a transaction updating an index that was dropped by another thread. This way, they don't have to worry about checking if all indexes are alive on every write. They don't care if they lose writes to dropped index.

Test Plan: added a small unit test

Reviewers: sdong, yhchiang, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22143
main
Igor Canadi 10 years ago
parent 8ed70fc209
commit a84234a61b
  1. 6
      db/column_family_test.cc
  2. 15
      db/db_impl.cc
  3. 44
      db/write_batch.cc
  4. 18
      db/write_batch_internal.h
  5. 12
      include/rocksdb/options.h

@ -408,9 +408,15 @@ TEST(ColumnFamilyTest, WriteBatchFailure) {
Open(); Open();
CreateColumnFamiliesAndReopen({"one", "two"}); CreateColumnFamiliesAndReopen({"one", "two"});
WriteBatch batch; WriteBatch batch;
batch.Put(handles_[0], Slice("existing"), Slice("column-family"));
batch.Put(handles_[1], Slice("non-existing"), Slice("column-family")); batch.Put(handles_[1], Slice("non-existing"), Slice("column-family"));
ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(db_->Write(WriteOptions(), &batch));
DropColumnFamilies({1}); DropColumnFamilies({1});
WriteOptions woptions_ignore_missing_cf;
woptions_ignore_missing_cf.ignore_missing_column_families = true;
batch.Put(handles_[0], Slice("still here"), Slice("column-family"));
ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch));
ASSERT_EQ("column-family", Get(0, "still here"));
Status s = db_->Write(WriteOptions(), &batch); Status s = db_->Write(WriteOptions(), &batch);
ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_TRUE(s.IsInvalidArgument());
Close(); Close();

@ -1301,14 +1301,20 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
WriteBatch batch; WriteBatch batch;
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) { if (record.size() < 12) {
reporter.Corruption( reporter.Corruption(record.size(),
record.size(), Status::Corruption("log record too small")); Status::Corruption("log record too small"));
continue; continue;
} }
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case -- we
// just ignore the update. That's why we set ignore missing column families
// to true
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), true, log_number); &batch, column_family_memtables_.get(),
true /* ignore missing column families */, log_number);
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
@ -4066,7 +4072,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
PERF_TIMER_GUARD(write_memtable_time); PERF_TIMER_GUARD(write_memtable_time);
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(), false, 0, this, false); updates, column_family_memtables_.get(),
options.ignore_missing_column_families, 0, this, false);
// A non-OK status here indicates iteration failure (either in-memory // A non-OK status here indicates iteration failure (either in-memory
// writebatch corruption (very bad), or the client specified invalid // writebatch corruption (very bad), or the client specified invalid
// column family). This will later on trigger bg_error_. // column family). This will later on trigger bg_error_.

@ -299,17 +299,17 @@ class MemTableInserter : public WriteBatch::Handler {
public: public:
SequenceNumber sequence_; SequenceNumber sequence_;
ColumnFamilyMemTables* cf_mems_; ColumnFamilyMemTables* cf_mems_;
bool recovery_; bool ignore_missing_column_families_;
uint64_t log_number_; uint64_t log_number_;
DBImpl* db_; DBImpl* db_;
const bool dont_filter_deletes_; const bool dont_filter_deletes_;
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
bool recovery, uint64_t log_number, DB* db, bool ignore_missing_column_families, uint64_t log_number,
const bool dont_filter_deletes) DB* db, const bool dont_filter_deletes)
: sequence_(sequence), : sequence_(sequence),
cf_mems_(cf_mems), cf_mems_(cf_mems),
recovery_(recovery), ignore_missing_column_families_(ignore_missing_column_families),
log_number_(log_number), log_number_(log_number),
db_(reinterpret_cast<DBImpl*>(db)), db_(reinterpret_cast<DBImpl*>(db)),
dont_filter_deletes_(dont_filter_deletes) { dont_filter_deletes_(dont_filter_deletes) {
@ -321,12 +321,18 @@ class MemTableInserter : public WriteBatch::Handler {
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
bool found = cf_mems_->Seek(column_family_id); bool found = cf_mems_->Seek(column_family_id);
if (recovery_ && (!found || log_number_ < cf_mems_->GetLogNumber())) { if (!found) {
// if in recovery envoronment: if (ignore_missing_column_families_) {
// * If column family was not found, it might mean that the WAL write *s = Status::OK();
// batch references to the column family that was dropped after the } else {
// insert. We don't want to fail the whole write batch in that case -- we *s = Status::InvalidArgument(
// just ignore the update. "Invalid column family specified in write batch");
}
return false;
}
if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) {
// This is true only in recovery environment (log_number_ is always 0 in
// non-recovery, regular write code-path)
// * If log_number_ < cf_mems_->GetLogNumber(), this means that column // * If log_number_ < cf_mems_->GetLogNumber(), this means that column
// family already contains updates from this log. We can't apply updates // family already contains updates from this log. We can't apply updates
// twice because of update-in-place or merge workloads -- ignore the // twice because of update-in-place or merge workloads -- ignore the
@ -334,18 +340,8 @@ class MemTableInserter : public WriteBatch::Handler {
*s = Status::OK(); *s = Status::OK();
return false; return false;
} }
if (!found) {
assert(!recovery_);
// If the column family was not found in non-recovery enviornment
// (client's write code-path), we have to fail the write and return
// the failure status to the client.
*s = Status::InvalidArgument(
"Invalid column family specified in write batch");
return false;
}
return true; return true;
} }
virtual Status PutCF(uint32_t column_family_id, const Slice& key, virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) { const Slice& value) {
Status seek_status; Status seek_status;
@ -503,10 +499,12 @@ class MemTableInserter : public WriteBatch::Handler {
Status WriteBatchInternal::InsertInto(const WriteBatch* b, Status WriteBatchInternal::InsertInto(const WriteBatch* b,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
bool recovery, uint64_t log_number, bool ignore_missing_column_families,
DB* db, const bool dont_filter_deletes) { uint64_t log_number, DB* db,
const bool dont_filter_deletes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables,
recovery, log_number, db, dont_filter_deletes); ignore_missing_column_families, log_number, db,
dont_filter_deletes);
return b->Iterate(&inserter); return b->Iterate(&inserter);
} }

@ -106,18 +106,18 @@ class WriteBatchInternal {
// Inserts batch entries into memtable // Inserts batch entries into memtable
// If dont_filter_deletes is false AND options.filter_deletes is true, // If dont_filter_deletes is false AND options.filter_deletes is true,
// then --> Drops deletes in batch if db->KeyMayExist returns false // then --> Drops deletes in batch if db->KeyMayExist returns false
// If recovery == true, this means InsertInto is executed on a recovery // If ignore_missing_column_families == true. WriteBatch referencing
// code-path. WriteBatch referencing a dropped column family can be // non-existing column family should be ignored.
// found on a recovery code-path and should be ignored (recovery should not // However, if ignore_missing_column_families == false, any WriteBatch
// fail). Additionally, the memtable will be updated only if // referencing non-existing column family will return a InvalidArgument()
// failure.
//
// If log_number is non-zero, the memtable will be updated only if
// memtables->GetLogNumber() >= log_number // memtables->GetLogNumber() >= log_number
// However, if recovery == false, any WriteBatch referencing
// non-existing column family will return a failure. Also, log_number is
// ignored in that case
static Status InsertInto(const WriteBatch* batch, static Status InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
bool recovery = false, uint64_t log_number = 0, bool ignore_missing_column_families = false,
DB* db = nullptr, uint64_t log_number = 0, DB* db = nullptr,
const bool dont_filter_deletes = true); const bool dont_filter_deletes = true);
static void Append(WriteBatch* dst, const WriteBatch* src); static void Append(WriteBatch* dst, const WriteBatch* src);

@ -959,7 +959,17 @@ struct WriteOptions {
// Default: 0 // Default: 0
uint64_t timeout_hint_us; uint64_t timeout_hint_us;
WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0) {} // If true and if user is trying to write to column families that don't exist
// (they were dropped), ignore the write (don't return an error). If there
// are multiple writes in a WriteBatch, other writes will succeed.
// Default: false
bool ignore_missing_column_families;
WriteOptions()
: sync(false),
disableWAL(false),
timeout_hint_us(0),
ignore_missing_column_families(false) {}
}; };
// Options that control flush operations // Options that control flush operations

Loading…
Cancel
Save