Port fix for Leveldb manifest writing bug from Open-Source

Summary:
Pretty much a blind copy of the patch in open source.
Hope to get this in before we make a release

Test Plan: make clean check

Reviewers: dhruba, heyongqiang

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D7809
main
Abhishek Kona 12 years ago
parent 41d7809212
commit 85ad13be1a
  1. 75
      db/db_test.cc
  2. 36
      db/version_set.cc
  3. 2
      db/version_set.h

@ -80,6 +80,12 @@ class SpecialEnv : public EnvWrapper {
// Simulate non-writable file system while this pointer is non-NULL
port::AtomicPointer non_writable_;
// Force sync of manifest files to fail while this pointer is non-NULL
port::AtomicPointer manifest_sync_error_;
// Force write to manifest files to fail while this pointer is non-NULL
port::AtomicPointer manifest_write_error_;
bool count_random_reads_;
anon::AtomicCounter random_read_counter_;
@ -90,6 +96,8 @@ class SpecialEnv : public EnvWrapper {
no_space_.Release_Store(NULL);
non_writable_.Release_Store(NULL);
count_random_reads_ = false;
manifest_sync_error_.Release_Store(NULL);
manifest_write_error_.Release_Store(NULL);
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
@ -121,6 +129,30 @@ class SpecialEnv : public EnvWrapper {
return base_->Sync();
}
};
class ManifestFile : public WritableFile {
private:
SpecialEnv* env_;
WritableFile* base_;
public:
ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
~ManifestFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->manifest_write_error_.Acquire_Load() != NULL) {
return Status::IOError("simulated writer error");
} else {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
return Status::IOError("simulated sync error");
} else {
return base_->Sync();
}
}
};
if (non_writable_.Acquire_Load() != NULL) {
return Status::IOError("simulated write error");
@ -130,6 +162,8 @@ class SpecialEnv : public EnvWrapper {
if (s.ok()) {
if (strstr(f.c_str(), ".sst") != NULL) {
*r = new SSTableFile(this, *r);
} else if (strstr(f.c_str(), "MANIFEST") != NULL) {
*r = new ManifestFile(this, *r);
}
}
return s;
@ -2042,6 +2076,47 @@ TEST(DBTest, NonWritableFileSystem)
env_->non_writable_.Release_Store(NULL);
}
TEST(DBTest, ManifestWriteError) {
// Test for the following problem:
// (a) Compaction produces file F
// (b) Log record containing F is written to MANIFEST file, but Sync() fails
// (c) GC deletes F
// (d) After reopening DB, reads fail since deleted F is named in log record
// We iterate twice. In the second iteration, everything is the
// same except the log record never makes it to the MANIFEST file.
for (int iter = 0; iter < 2; iter++) {
port::AtomicPointer* error_type = (iter == 0)
? &env_->manifest_sync_error_
: &env_->manifest_write_error_;
// Insert foo=>bar mapping
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.error_if_exists = false;
DestroyAndReopen(&options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_EQ("bar", Get("foo"));
// Memtable compaction (will succeed)
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("bar", Get("foo"));
const int last = dbfull()->MaxMemCompactionLevel();
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
// Merging compaction (will fail)
error_type->Release_Store(env_);
dbfull()->TEST_CompactRange(last, NULL, NULL); // Should fail
ASSERT_EQ("bar", Get("foo"));
// Recovery: should not lose data
error_type->Release_Store(NULL);
Reopen(&options);
ASSERT_EQ("bar", Get("foo"));
}
}
TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");

@ -1040,6 +1040,15 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
s = descriptor_file_->Sync();
}
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
if (ManifestContains(record)) {
Log(options_->info_log,
"MANIFEST contains log record despite error; advancing to new "
"version to prevent mismatch between in-memory and logged state");
s = Status::OK();
}
}
}
// If we just created a new descriptor file, install it by writing a
@ -1550,6 +1559,33 @@ const char* VersionSet::LevelDataSizeSummary(
return scratch->buffer;
}
// Opens the mainfest file and reads all records
// till it finds the record we are looking for.
bool VersionSet::ManifestContains(const std::string& record) const {
std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
SequentialFile* file = NULL;
Status s = env_->NewSequentialFile(fname, &file);
if (!s.ok()) {
Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
return false;
}
log::Reader reader(file, NULL, true/*checksum*/, 0);
Slice r;
std::string scratch;
bool result = false;
while (reader.ReadRecord(&r, &scratch)) {
if (r == Slice(record)) {
result = true;
break;
}
}
delete file;
Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0);
return result;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < NumberLevels(); level++) {

@ -393,6 +393,8 @@ class VersionSet {
void AppendVersion(Version* v);
bool ManifestContains(const std::string& record) const;
double MaxBytesForLevel(int level);
uint64_t MaxFileSizeForLevel(int level);

Loading…
Cancel
Save