|
|
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under the BSD-style license found in the
|
|
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
//
|
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
//
|
|
|
|
// We recover the contents of the descriptor from the other files we find.
|
|
|
|
// (1) Any log files are first converted to tables
|
|
|
|
// (2) We scan every table to compute
|
|
|
|
// (a) smallest/largest for the table
|
|
|
|
// (b) largest sequence number in the table
|
|
|
|
// (3) We generate descriptor contents:
|
|
|
|
// - log number is set to zero
|
|
|
|
// - next-file-number is set to 1 + largest file number we found
|
|
|
|
// - last-sequence-number is set to largest sequence# found across
|
|
|
|
// all tables (see 2c)
|
|
|
|
// - compaction pointers are cleared
|
|
|
|
// - every table file is added at level 0
|
|
|
|
//
|
|
|
|
// Possible optimization 1:
|
|
|
|
// (a) Compute total size and use to pick appropriate max-level M
|
|
|
|
// (b) Sort tables by largest sequence# in the table
|
|
|
|
// (c) For each table: if it overlaps earlier table, place in level-0,
|
|
|
|
// else place in level-M.
|
|
|
|
// Possible optimization 2:
|
|
|
|
// Store per-table metadata (smallest, largest, largest-seq#, ...)
|
|
|
|
// in the table's meta section to speed up ScanTable.
|
|
|
|
|
|
|
|
#include "db/builder.h"
|
|
|
|
#include "db/db_impl.h"
|
|
|
|
#include "db/dbformat.h"
|
|
|
|
#include "db/filename.h"
|
|
|
|
#include "db/log_reader.h"
|
|
|
|
#include "db/log_writer.h"
|
|
|
|
#include "db/memtable.h"
|
|
|
|
#include "db/table_cache.h"
|
|
|
|
#include "db/version_edit.h"
|
|
|
|
#include "db/write_batch_internal.h"
|
|
|
|
#include "rocksdb/comparator.h"
|
|
|
|
#include "rocksdb/db.h"
|
|
|
|
#include "rocksdb/env.h"
|
|
|
|
|
|
|
|
namespace rocksdb {
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
|
|
|
|
class Repairer {
|
|
|
|
public:
|
|
|
|
Repairer(const std::string& dbname, const Options& options)
|
|
|
|
: dbname_(dbname),
|
|
|
|
env_(options.env),
|
|
|
|
icmp_(options.comparator),
|
|
|
|
ipolicy_(options.filter_policy),
|
|
|
|
options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)),
|
|
|
|
next_file_number_(1) {
|
|
|
|
// TableCache can be small since we expect each table to be opened once.
|
|
|
|
table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10);
|
|
|
|
edit_ = new VersionEdit(options.num_levels);
|
|
|
|
}
|
|
|
|
|
|
|
|
~Repairer() {
|
|
|
|
delete table_cache_;
|
|
|
|
delete edit_;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status Run() {
|
|
|
|
Status status = FindFiles();
|
|
|
|
if (status.ok()) {
|
|
|
|
ConvertLogFilesToTables();
|
|
|
|
ExtractMetaData();
|
|
|
|
status = WriteDescriptor();
|
|
|
|
}
|
|
|
|
if (status.ok()) {
|
|
|
|
unsigned long long bytes = 0;
|
|
|
|
for (size_t i = 0; i < tables_.size(); i++) {
|
|
|
|
bytes += tables_[i].meta.file_size;
|
|
|
|
}
|
|
|
|
Log(options_.info_log,
|
|
|
|
"**** Repaired rocksdb %s; "
|
|
|
|
"recovered %d files; %llu bytes. "
|
|
|
|
"Some data may have been lost. "
|
|
|
|
"****",
|
|
|
|
dbname_.c_str(),
|
|
|
|
static_cast<int>(tables_.size()),
|
|
|
|
bytes);
|
|
|
|
}
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
struct TableInfo {
|
|
|
|
FileMetaData meta;
|
|
|
|
SequenceNumber min_sequence;
|
|
|
|
SequenceNumber max_sequence;
|
|
|
|
};
|
|
|
|
|
|
|
|
std::string const dbname_;
|
|
|
|
Env* const env_;
|
|
|
|
InternalKeyComparator const icmp_;
|
|
|
|
InternalFilterPolicy const ipolicy_;
|
|
|
|
Options const options_;
|
|
|
|
TableCache* table_cache_;
|
|
|
|
VersionEdit* edit_;
|
|
|
|
|
|
|
|
std::vector<std::string> manifests_;
|
|
|
|
std::vector<uint64_t> table_numbers_;
|
|
|
|
std::vector<uint64_t> logs_;
|
|
|
|
std::vector<TableInfo> tables_;
|
|
|
|
uint64_t next_file_number_;
|
|
|
|
const EnvOptions storage_options_;
|
|
|
|
|
|
|
|
Status FindFiles() {
|
|
|
|
std::vector<std::string> filenames;
|
|
|
|
Status status = env_->GetChildren(dbname_, &filenames);
|
|
|
|
if (!status.ok()) {
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
if (filenames.empty()) {
|
|
|
|
return Status::IOError(dbname_, "repair found no files");
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t number;
|
|
|
|
FileType type;
|
|
|
|
for (size_t i = 0; i < filenames.size(); i++) {
|
|
|
|
if (ParseFileName(filenames[i], &number, &type)) {
|
|
|
|
if (type == kDescriptorFile) {
|
|
|
|
manifests_.push_back(filenames[i]);
|
|
|
|
} else {
|
|
|
|
if (number + 1 > next_file_number_) {
|
|
|
|
next_file_number_ = number + 1;
|
|
|
|
}
|
|
|
|
if (type == kLogFile) {
|
|
|
|
logs_.push_back(number);
|
|
|
|
} else if (type == kTableFile) {
|
|
|
|
table_numbers_.push_back(number);
|
|
|
|
} else {
|
|
|
|
// Ignore other files
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
void ConvertLogFilesToTables() {
|
|
|
|
for (size_t i = 0; i < logs_.size(); i++) {
|
|
|
|
std::string logname = LogFileName(dbname_, logs_[i]);
|
|
|
|
Status status = ConvertLogToTable(logs_[i]);
|
|
|
|
if (!status.ok()) {
|
|
|
|
Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
|
|
|
|
(unsigned long long) logs_[i],
|
|
|
|
status.ToString().c_str());
|
|
|
|
}
|
|
|
|
ArchiveFile(logname);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Status ConvertLogToTable(uint64_t log) {
|
|
|
|
struct LogReporter : public log::Reader::Reporter {
|
|
|
|
Env* env;
|
|
|
|
std::shared_ptr<Logger> info_log;
|
|
|
|
uint64_t lognum;
|
|
|
|
virtual void Corruption(size_t bytes, const Status& s) {
|
|
|
|
// We print error messages for corruption, but continue repairing.
|
|
|
|
Log(info_log, "Log #%llu: dropping %d bytes; %s",
|
|
|
|
(unsigned long long) lognum,
|
|
|
|
static_cast<int>(bytes),
|
|
|
|
s.ToString().c_str());
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// Open the log file
|
|
|
|
std::string logname = LogFileName(dbname_, log);
|
|
|
|
unique_ptr<SequentialFile> lfile;
|
|
|
|
Status status = env_->NewSequentialFile(logname, &lfile, storage_options_);
|
|
|
|
if (!status.ok()) {
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create the log reader.
|
|
|
|
LogReporter reporter;
|
|
|
|
reporter.env = env_;
|
|
|
|
reporter.info_log = options_.info_log;
|
|
|
|
reporter.lognum = log;
|
|
|
|
// We intentially make log::Reader do checksumming so that
|
|
|
|
// corruptions cause entire commits to be skipped instead of
|
|
|
|
// propagating bad information (like overly large sequence
|
|
|
|
// numbers).
|
|
|
|
log::Reader reader(std::move(lfile), &reporter, false/*do not checksum*/,
|
|
|
|
0/*initial_offset*/);
|
|
|
|
|
|
|
|
// Read all the records and add to a memtable
|
|
|
|
std::string scratch;
|
|
|
|
Slice record;
|
|
|
|
WriteBatch batch;
|
|
|
|
MemTable* mem = new MemTable(icmp_, options_.memtable_factory.get(),
|
|
|
|
options_.num_levels);
|
|
|
|
mem->Ref();
|
|
|
|
int counter = 0;
|
|
|
|
while (reader.ReadRecord(&record, &scratch)) {
|
|
|
|
if (record.size() < 12) {
|
|
|
|
reporter.Corruption(
|
|
|
|
record.size(), Status::Corruption("log record too small"));
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
WriteBatchInternal::SetContents(&batch, record);
|
In-place updates for equal keys and similar sized values
Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory
TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.
Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.
Reviewers: xinyaohu, sumeet, haobo, dhruba
CC: leveldb
Differential Revision: https://reviews.facebook.net/D12423
Automatic commit by arc
12 years ago
|
|
|
status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
|
|
|
|
if (status.ok()) {
|
|
|
|
counter += WriteBatchInternal::Count(&batch);
|
|
|
|
} else {
|
|
|
|
Log(options_.info_log, "Log #%llu: ignoring %s",
|
|
|
|
(unsigned long long) log,
|
|
|
|
status.ToString().c_str());
|
|
|
|
status = Status::OK(); // Keep going with rest of file
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Do not record a version edit for this conversion to a Table
|
|
|
|
// since ExtractMetaData() will also generate edits.
|
|
|
|
FileMetaData meta;
|
|
|
|
meta.number = next_file_number_++;
|
|
|
|
Iterator* iter = mem->NewIterator();
|
|
|
|
status = BuildTable(dbname_, env_, options_, storage_options_,
|
|
|
|
table_cache_, iter, &meta,
|
|
|
|
icmp_.user_comparator(), 0, 0, true);
|
|
|
|
delete iter;
|
|
|
|
delete mem->Unref();
|
|
|
|
mem = nullptr;
|
|
|
|
if (status.ok()) {
|
|
|
|
if (meta.file_size > 0) {
|
|
|
|
table_numbers_.push_back(meta.number);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
|
|
|
|
(unsigned long long) log,
|
|
|
|
counter,
|
|
|
|
(unsigned long long) meta.number,
|
|
|
|
status.ToString().c_str());
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
void ExtractMetaData() {
|
|
|
|
std::vector<TableInfo> kept;
|
|
|
|
for (size_t i = 0; i < table_numbers_.size(); i++) {
|
|
|
|
TableInfo t;
|
|
|
|
t.meta.number = table_numbers_[i];
|
|
|
|
Status status = ScanTable(&t);
|
|
|
|
if (!status.ok()) {
|
|
|
|
std::string fname = TableFileName(dbname_, table_numbers_[i]);
|
|
|
|
Log(options_.info_log, "Table #%llu: ignoring %s",
|
|
|
|
(unsigned long long) table_numbers_[i],
|
|
|
|
status.ToString().c_str());
|
|
|
|
ArchiveFile(fname);
|
|
|
|
} else {
|
|
|
|
tables_.push_back(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Status ScanTable(TableInfo* t) {
|
|
|
|
std::string fname = TableFileName(dbname_, t->meta.number);
|
|
|
|
int counter = 0;
|
|
|
|
Status status = env_->GetFileSize(fname, &t->meta.file_size);
|
|
|
|
if (status.ok()) {
|
|
|
|
Iterator* iter = table_cache_->NewIterator(
|
|
|
|
ReadOptions(), storage_options_, t->meta.number, t->meta.file_size);
|
|
|
|
bool empty = true;
|
|
|
|
ParsedInternalKey parsed;
|
|
|
|
t->min_sequence = 0;
|
|
|
|
t->max_sequence = 0;
|
|
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
|
|
Slice key = iter->key();
|
|
|
|
if (!ParseInternalKey(key, &parsed)) {
|
|
|
|
Log(options_.info_log, "Table #%llu: unparsable key %s",
|
|
|
|
(unsigned long long) t->meta.number,
|
|
|
|
EscapeString(key).c_str());
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
counter++;
|
|
|
|
if (empty) {
|
|
|
|
empty = false;
|
|
|
|
t->meta.smallest.DecodeFrom(key);
|
|
|
|
}
|
|
|
|
t->meta.largest.DecodeFrom(key);
|
|
|
|
if (parsed.sequence < t->min_sequence) {
|
|
|
|
t->min_sequence = parsed.sequence;
|
|
|
|
}
|
|
|
|
if (parsed.sequence > t->max_sequence) {
|
|
|
|
t->max_sequence = parsed.sequence;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (!iter->status().ok()) {
|
|
|
|
status = iter->status();
|
|
|
|
}
|
|
|
|
delete iter;
|
|
|
|
}
|
|
|
|
Log(options_.info_log, "Table #%llu: %d entries %s",
|
|
|
|
(unsigned long long) t->meta.number,
|
|
|
|
counter,
|
|
|
|
status.ToString().c_str());
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status WriteDescriptor() {
|
|
|
|
std::string tmp = TempFileName(dbname_, 1);
|
|
|
|
unique_ptr<WritableFile> file;
|
|
|
|
Status status = env_->NewWritableFile(tmp, &file, storage_options_);
|
|
|
|
if (!status.ok()) {
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
SequenceNumber max_sequence = 0;
|
|
|
|
for (size_t i = 0; i < tables_.size(); i++) {
|
|
|
|
if (max_sequence < tables_[i].max_sequence) {
|
|
|
|
max_sequence = tables_[i].max_sequence;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
edit_->SetComparatorName(icmp_.user_comparator()->Name());
|
|
|
|
edit_->SetLogNumber(0);
|
|
|
|
edit_->SetNextFile(next_file_number_);
|
|
|
|
edit_->SetLastSequence(max_sequence);
|
|
|
|
|
|
|
|
for (size_t i = 0; i < tables_.size(); i++) {
|
|
|
|
// TODO(opt): separate out into multiple levels
|
|
|
|
const TableInfo& t = tables_[i];
|
|
|
|
edit_->AddFile(0, t.meta.number, t.meta.file_size,
|
|
|
|
t.meta.smallest, t.meta.largest,
|
|
|
|
t.min_sequence, t.max_sequence);
|
|
|
|
}
|
|
|
|
|
|
|
|
//fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
|
|
|
|
{
|
|
|
|
log::Writer log(std::move(file));
|
|
|
|
std::string record;
|
|
|
|
edit_->EncodeTo(&record);
|
|
|
|
status = log.AddRecord(record);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!status.ok()) {
|
|
|
|
env_->DeleteFile(tmp);
|
|
|
|
} else {
|
|
|
|
// Discard older manifests
|
|
|
|
for (size_t i = 0; i < manifests_.size(); i++) {
|
|
|
|
ArchiveFile(dbname_ + "/" + manifests_[i]);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Install new manifest
|
|
|
|
status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1));
|
|
|
|
if (status.ok()) {
|
|
|
|
status = SetCurrentFile(env_, dbname_, 1);
|
|
|
|
} else {
|
|
|
|
env_->DeleteFile(tmp);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
|
|
|
void ArchiveFile(const std::string& fname) {
|
|
|
|
// Move into another directory. E.g., for
|
|
|
|
// dir/foo
|
|
|
|
// rename to
|
|
|
|
// dir/lost/foo
|
|
|
|
const char* slash = strrchr(fname.c_str(), '/');
|
|
|
|
std::string new_dir;
|
|
|
|
if (slash != nullptr) {
|
|
|
|
new_dir.assign(fname.data(), slash - fname.data());
|
|
|
|
}
|
|
|
|
new_dir.append("/lost");
|
|
|
|
env_->CreateDir(new_dir); // Ignore error
|
|
|
|
std::string new_file = new_dir;
|
|
|
|
new_file.append("/");
|
|
|
|
new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
|
|
|
|
Status s = env_->RenameFile(fname, new_file);
|
|
|
|
Log(options_.info_log, "Archiving %s: %s\n",
|
|
|
|
fname.c_str(), s.ToString().c_str());
|
|
|
|
}
|
|
|
|
};
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
Status RepairDB(const std::string& dbname, const Options& options) {
|
|
|
|
Repairer repairer(dbname, options);
|
|
|
|
return repairer.Run();
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace rocksdb
|