Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/version_edit.cc
	db/version_edit.h
	db/version_set.cc
main
Igor Canadi 11 years ago
commit 3055a15b29
  1. 2
      INSTALL.md
  2. 38
      db/db_impl.cc
  3. 4
      db/db_impl.h
  4. 4
      db/repair.cc
  5. 15
      db/version_edit.cc
  6. 10
      db/version_edit.h
  7. 89
      db/version_set.cc
  8. 8
      db/version_set.h
  9. 22
      include/rocksdb/env.h
  10. 10
      util/env.cc
  11. 76
      util/env_posix.cc

@ -68,7 +68,7 @@ libraries. You are on your own.
We did not run any production workloads on it. We did not run any production workloads on it.
## Compilation ## Compilation
`make clean; make` will compile librocksdb.a (RocskDB static library) and all `make clean; make` will compile librocksdb.a (RocksDB static library) and all
the unit tests. You can run all unit tests with `make check`. the unit tests. You can run all unit tests with `make check`.
For shared library builds, exec `make shared_lib` instead. For shared library builds, exec `make shared_lib` instead.

@ -349,14 +349,15 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
Status DBImpl::NewDB() { Status DBImpl::NewDB() {
VersionEdit new_db; VersionEdit new_db;
new_db.SetVersionNumber();
new_db.SetLogNumber(0); new_db.SetLogNumber(0);
new_db.SetNextFile(2); new_db.SetNextFile(2);
new_db.SetLastSequence(0); new_db.SetLastSequence(0);
const std::string manifest = DescriptorFileName(dbname_, 1); const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file; unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(manifest, &file, Status s = env_->NewWritableFile(
storage_options_.AdaptForLogWrite()); manifest, &file, env_->OptimizeForManifestWrite(storage_options_));
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -459,6 +460,8 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// store the current filenum, lognum, etc // store the current filenum, lognum, etc
deletion_state.manifest_file_number = versions_->ManifestFileNumber(); deletion_state.manifest_file_number = versions_->ManifestFileNumber();
deletion_state.pending_manifest_file_number =
versions_->PendingManifestFileNumber();
deletion_state.log_number = versions_->MinLogNumber(); deletion_state.log_number = versions_->MinLogNumber();
deletion_state.prev_log_number = versions_->PrevLogNumber(); deletion_state.prev_log_number = versions_->PrevLogNumber();
@ -509,12 +512,10 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
return; return;
} }
// Now, convert live list to an unordered set, WITHOUT mutex held; // Now, convert live list to an unordered set, WITHOUT mutex held;
// set is slow. // set is slow.
std::unordered_set<uint64_t> sst_live( std::unordered_set<uint64_t> sst_live(state.sst_live.begin(),
state.sst_live.begin(), state.sst_live.end() state.sst_live.end());
);
auto& candidate_files = state.candidate_files; auto& candidate_files = state.candidate_files;
candidate_files.reserve( candidate_files.reserve(
@ -532,19 +533,15 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
for (auto file_num : state.log_delete_files) { for (auto file_num : state.log_delete_files) {
if (file_num > 0) { if (file_num > 0) {
candidate_files.push_back( candidate_files.push_back(LogFileName(kDumbDbName, file_num).substr(1));
LogFileName(kDumbDbName, file_num).substr(1)
);
} }
} }
// dedup state.candidate_files so we don't try to delete the same // dedup state.candidate_files so we don't try to delete the same
// file twice // file twice
sort(candidate_files.begin(), candidate_files.end()); sort(candidate_files.begin(), candidate_files.end());
candidate_files.erase( candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()),
unique(candidate_files.begin(), candidate_files.end()), candidate_files.end());
candidate_files.end()
);
std::vector<std::string> old_info_log_files; std::vector<std::string> old_info_log_files;
@ -564,7 +561,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
break; break;
case kDescriptorFile: case kDescriptorFile:
// Keep my manifest file, and any newer incarnations' // Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations) // (can happen during manifest roll)
keep = (number >= state.manifest_file_number); keep = (number >= state.manifest_file_number);
break; break;
case kTableFile: case kTableFile:
@ -572,8 +569,12 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
break; break;
case kTempFile: case kTempFile:
// Any temp files that are currently being written to must // Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live" // be recorded in pending_outputs_, which is inserted into "live".
keep = (sst_live.find(number) != sst_live.end()); // Also, SetCurrentFile creates a temp file when writing out new
// manifest, which is equal to state.pending_manifest_file_number. We
// should not delete that file
keep = (sst_live.find(number) != sst_live.end()) ||
(number == state.pending_manifest_file_number);
break; break;
case kInfoLogFile: case kInfoLogFile:
keep = true; keep = true;
@ -3731,7 +3732,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
{ {
DelayLoggingAndReset(); DelayLoggingAndReset();
s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
&lfile, storage_options_.AdaptForLogWrite()); &lfile,
env_->OptimizeForLogWrite(storage_options_));
if (s.ok()) { if (s.ok()) {
// Our final size should be less than write_buffer_size // Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution. // (compression, etc) but err on the side of caution.
@ -4076,7 +4078,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
EnvOptions soptions(db_options); EnvOptions soptions(db_options);
s = impl->options_.env->NewWritableFile( s = impl->options_.env->NewWritableFile(
LogFileName(impl->options_.wal_dir, new_log_number), &lfile, LogFileName(impl->options_.wal_dir, new_log_number), &lfile,
soptions.AdaptForLogWrite()); impl->options_.env->OptimizeForLogWrite(soptions));
if (s.ok()) { if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size); lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size);
impl->logfile_number_ = new_log_number; impl->logfile_number_ = new_log_number;

@ -210,10 +210,12 @@ class DBImpl : public DB {
// the current manifest_file_number, log_number and prev_log_number // the current manifest_file_number, log_number and prev_log_number
// that corresponds to the set of files in 'live'. // that corresponds to the set of files in 'live'.
uint64_t manifest_file_number, log_number, prev_log_number; uint64_t manifest_file_number, pending_manifest_file_number, log_number,
prev_log_number;
explicit DeletionState(bool create_superversion = false) { explicit DeletionState(bool create_superversion = false) {
manifest_file_number = 0; manifest_file_number = 0;
pending_manifest_file_number = 0;
log_number = 0; log_number = 0;
prev_log_number = 0; prev_log_number = 0;
new_superversion = create_superversion ? new SuperVersion() : nullptr; new_superversion = create_superversion ? new SuperVersion() : nullptr;

@ -315,8 +315,8 @@ class Repairer {
Status WriteDescriptor() { Status WriteDescriptor() {
std::string tmp = TempFileName(dbname_, 1); std::string tmp = TempFileName(dbname_, 1);
unique_ptr<WritableFile> file; unique_ptr<WritableFile> file;
Status status = Status status = env_->NewWritableFile(
env_->NewWritableFile(tmp, &file, storage_options_.AdaptForLogWrite()); tmp, &file, env_->OptimizeForManifestWrite(storage_options_));
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }

@ -30,6 +30,7 @@ enum Tag {
// these are new formats divergent from open source leveldb // these are new formats divergent from open source leveldb
kNewFile2 = 100, // store smallest & largest seqno kNewFile2 = 100, // store smallest & largest seqno
kVersionNumber = 101, // manifest version number, available after 2.8
kColumnFamily = 200, // specify column family for version edit kColumnFamily = 200, // specify column family for version edit
kColumnFamilyAdd = 201, kColumnFamilyAdd = 201,
@ -38,6 +39,7 @@ enum Tag {
}; };
void VersionEdit::Clear() { void VersionEdit::Clear() {
version_number_ = 0;
comparator_.clear(); comparator_.clear();
max_level_ = 0; max_level_ = 0;
log_number_ = 0; log_number_ = 0;
@ -45,6 +47,7 @@ void VersionEdit::Clear() {
last_sequence_ = 0; last_sequence_ = 0;
next_file_number_ = 0; next_file_number_ = 0;
max_column_family_ = 0; max_column_family_ = 0;
has_version_number_ = false;
has_comparator_ = false; has_comparator_ = false;
has_log_number_ = false; has_log_number_ = false;
has_prev_log_number_ = false; has_prev_log_number_ = false;
@ -60,6 +63,10 @@ void VersionEdit::Clear() {
} }
void VersionEdit::EncodeTo(std::string* dst) const { void VersionEdit::EncodeTo(std::string* dst) const {
if (has_version_number_) {
PutVarint32(dst, kVersionNumber);
PutVarint32(dst, version_number_);
}
if (has_comparator_) { if (has_comparator_) {
PutVarint32(dst, kComparator); PutVarint32(dst, kComparator);
PutLengthPrefixedSlice(dst, comparator_); PutLengthPrefixedSlice(dst, comparator_);
@ -157,6 +164,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
while (msg == nullptr && GetVarint32(&input, &tag)) { while (msg == nullptr && GetVarint32(&input, &tag)) {
switch (tag) { switch (tag) {
case kVersionNumber:
if (GetVarint32(&input, &version_number_)) {
has_version_number_ = true;
} else {
msg = "version number";
}
break;
case kComparator: case kComparator:
if (GetLengthPrefixedSlice(&input, &str)) { if (GetLengthPrefixedSlice(&input, &str)) {
comparator_ = str.ToString(); comparator_ = str.ToString();

@ -50,6 +50,10 @@ class VersionEdit {
void Clear(); void Clear();
void SetVersionNumber() {
has_version_number_ = true;
version_number_ = kManifestVersion;
}
void SetComparatorName(const Slice& name) { void SetComparatorName(const Slice& name) {
has_comparator_ = true; has_comparator_ = true;
comparator_ = name.ToString(); comparator_ = name.ToString();
@ -143,12 +147,14 @@ class VersionEdit {
bool GetLevel(Slice* input, int* level, const char** msg); bool GetLevel(Slice* input, int* level, const char** msg);
int max_level_; int max_level_;
uint32_t version_number_;
std::string comparator_; std::string comparator_;
uint64_t log_number_; uint64_t log_number_;
uint64_t prev_log_number_; uint64_t prev_log_number_;
uint64_t next_file_number_; uint64_t next_file_number_;
uint32_t max_column_family_; uint32_t max_column_family_;
SequenceNumber last_sequence_; SequenceNumber last_sequence_;
bool has_version_number_;
bool has_comparator_; bool has_comparator_;
bool has_log_number_; bool has_log_number_;
bool has_prev_log_number_; bool has_prev_log_number_;
@ -168,6 +174,10 @@ class VersionEdit {
bool is_column_family_drop_; bool is_column_family_drop_;
bool is_column_family_add_; bool is_column_family_add_;
std::string column_family_name_; std::string column_family_name_;
enum {
kManifestVersion = 1
};
}; };
} // namespace rocksdb } // namespace rocksdb

@ -7,9 +7,8 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/version_set.h"
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#include "db/version_set.h"
#include <inttypes.h> #include <inttypes.h>
#include <algorithm> #include <algorithm>
@ -1446,6 +1445,7 @@ VersionSet::VersionSet(const std::string& dbname, const DBOptions* options,
options_(options), options_(options),
next_file_number_(2), next_file_number_(2),
manifest_file_number_(0), // Filled by Recover() manifest_file_number_(0), // Filled by Recover()
pending_manifest_file_number_(0),
last_sequence_(0), last_sequence_(0),
prev_log_number_(0), prev_log_number_(0),
current_version_number_(0), current_version_number_(0),
@ -1548,24 +1548,21 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// Initialize new descriptor log file if necessary by creating // Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version. // a temporary file that contains a snapshot of the current version.
std::string new_manifest_filename;
uint64_t new_manifest_file_size = 0; uint64_t new_manifest_file_size = 0;
Status s; Status s;
// we will need this if we are creating new manifest
uint64_t old_manifest_file_number = manifest_file_number_;
// No need to perform this check if a new Manifest is being created anyways. assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ || if (!descriptor_log_ ||
manifest_file_size_ > options_->max_manifest_file_size) { manifest_file_size_ > options_->max_manifest_file_size) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_);
new_descriptor_log = true; new_descriptor_log = true;
manifest_file_number_ = NewFileNumber(); // Change manifest file no. } else {
pending_manifest_file_number_ = manifest_file_number_;
} }
if (new_descriptor_log) { if (new_descriptor_log) {
new_manifest_filename = DescriptorFileName(dbname_, manifest_file_number_); // if we're writing out new snapshot make sure to persist max column family
edit->SetNextFile(next_file_number_);
// if we're writing out new snapshot make sure to persist max column
// family
if (column_family_set_->GetMaxColumnFamily() > 0) { if (column_family_set_->GetMaxColumnFamily() > 0) {
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
} }
@ -1594,8 +1591,9 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// only one thread can be here at the same time // only one thread can be here at the same time
if (new_descriptor_log) { if (new_descriptor_log) {
unique_ptr<WritableFile> descriptor_file; unique_ptr<WritableFile> descriptor_file;
s = env_->NewWritableFile(new_manifest_filename, &descriptor_file, s = env_->NewWritableFile(
storage_options_.AdaptForLogWrite()); DescriptorFileName(dbname_, pending_manifest_file_number_),
&descriptor_file, env_->OptimizeForManifestWrite(storage_options_));
if (s.ok()) { if (s.ok()) {
descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); descriptor_log_.reset(new log::Writer(std::move(descriptor_file)));
s = WriteSnapshot(descriptor_log_.get()); s = WriteSnapshot(descriptor_log_.get());
@ -1636,7 +1634,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
for (auto& e : batch_edits) { for (auto& e : batch_edits) {
std::string record; std::string record;
e->EncodeTo(&record); e->EncodeTo(&record);
if (!ManifestContains(record)) { if (!ManifestContains(pending_manifest_file_number_, record)) {
all_records_in = false; all_records_in = false;
break; break;
} }
@ -1653,17 +1651,16 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// If we just created a new descriptor file, install it by writing a // If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it. // new CURRENT file that points to it.
if (s.ok() && !new_manifest_filename.empty()) { if (s.ok() && new_descriptor_log) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_); s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_);
if (s.ok() && old_manifest_file_number < manifest_file_number_) { if (s.ok() && pending_manifest_file_number_ > manifest_file_number_) {
// delete old manifest file // delete old manifest file
Log(options_->info_log, Log(options_->info_log,
"Deleting manifest %lu current manifest %lu\n", "Deleting manifest %" PRIu64 " current manifest %" PRIu64 "\n",
(unsigned long)old_manifest_file_number, manifest_file_number_, pending_manifest_file_number_);
(unsigned long)manifest_file_number_);
// we don't care about an error here, PurgeObsoleteFiles will take care // we don't care about an error here, PurgeObsoleteFiles will take care
// of it later // of it later
env_->DeleteFile(DescriptorFileName(dbname_, old_manifest_file_number)); env_->DeleteFile(DescriptorFileName(dbname_, manifest_file_number_));
} }
if (!options_->disableDataSync && db_directory != nullptr) { if (!options_->disableDataSync && db_directory != nullptr) {
db_directory->Fsync(); db_directory->Fsync();
@ -1707,17 +1704,20 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
AppendVersion(column_family_data, v); AppendVersion(column_family_data, v);
} }
manifest_file_number_ = pending_manifest_file_number_;
manifest_file_size_ = new_manifest_file_size; manifest_file_size_ = new_manifest_file_size;
prev_log_number_ = edit->prev_log_number_; prev_log_number_ = edit->prev_log_number_;
} else { } else {
Log(options_->info_log, "Error in committing version %lu", Log(options_->info_log, "Error in committing version %lu",
(unsigned long)v->GetVersionNumber()); (unsigned long)v->GetVersionNumber());
delete v; delete v;
if (!new_manifest_filename.empty()) { if (new_descriptor_log) {
descriptor_log_.reset(); descriptor_log_.reset();
env_->DeleteFile(new_manifest_filename); env_->DeleteFile(
DescriptorFileName(dbname_, pending_manifest_file_number_));
} }
} }
pending_manifest_file_number_ = 0;
// wake up all the waiting writers // wake up all the waiting writers
while (true) { while (true) {
@ -1816,6 +1816,8 @@ Status VersionSet::Recover(
return s; return s;
} }
bool have_version_number = false;
bool log_number_decrease = false;
bool have_log_number = false; bool have_log_number = false;
bool have_prev_log_number = false; bool have_prev_log_number = false;
bool have_next_file = false; bool have_next_file = false;
@ -1932,11 +1934,11 @@ Status VersionSet::Recover(
if (cfd != nullptr) { if (cfd != nullptr) {
if (edit.has_log_number_) { if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) { if (cfd->GetLogNumber() > edit.log_number_) {
s = Status::Corruption( log_number_decrease = true;
"Log Numbers in MANIFEST are not always increasing"); } else {
cfd->SetLogNumber(edit.log_number_);
have_log_number = true;
} }
cfd->SetLogNumber(edit.log_number_);
have_log_number = true;
} }
if (edit.has_comparator_ && if (edit.has_comparator_ &&
edit.comparator_ != cfd->user_comparator()->Name()) { edit.comparator_ != cfd->user_comparator()->Name()) {
@ -1947,6 +1949,10 @@ Status VersionSet::Recover(
} }
} }
if (edit.has_version_number_) {
have_version_number = true;
}
if (edit.has_prev_log_number_) { if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_; prev_log_number = edit.prev_log_number_;
have_prev_log_number = true; have_prev_log_number = true;
@ -1966,6 +1972,23 @@ Status VersionSet::Recover(
have_last_sequence = true; have_last_sequence = true;
} }
} }
if (s.ok() && log_number_decrease) {
// Since release 2.8, version number is added into MANIFEST file.
// Prior release 2.8, a bug in LogAndApply() can cause log_number
// to be smaller than the one from previous edit. To ensure backward
// compatibility, only fail for MANIFEST genearated by release 2.8
// and after.
if (have_version_number) {
s = Status::Corruption(
"MANIFEST corruption - Log numbers in records NOT "
"monotonically increasing");
} else {
Log(options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
}
}
} }
if (s.ok()) { if (s.ok()) {
@ -2389,6 +2412,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// WARNING: This method doesn't hold a mutex!! // WARNING: This method doesn't hold a mutex!!
bool first_record = false;
// This is done without DB mutex lock held, but only within single-threaded // This is done without DB mutex lock held, but only within single-threaded
// LogAndApply. Column family manipulations can only happen within LogAndApply // LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate. // (the same single thread), so we're safe to iterate.
@ -2396,6 +2421,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
{ {
// Store column family info // Store column family info
VersionEdit edit; VersionEdit edit;
if (first_record) {
edit.SetVersionNumber();
first_record = false;
}
if (cfd->GetID() != 0) { if (cfd->GetID() != 0) {
// default column family is always there, // default column family is always there,
// no need to explicitly write it // no need to explicitly write it
@ -2443,8 +2472,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// Opens the mainfest file and reads all records // Opens the mainfest file and reads all records
// till it finds the record we are looking for. // till it finds the record we are looking for.
bool VersionSet::ManifestContains(const std::string& record) const { bool VersionSet::ManifestContains(uint64_t manifest_file_number,
std::string fname = DescriptorFileName(dbname_, manifest_file_number_); const std::string& record) const {
std::string fname =
DescriptorFileName(dbname_, manifest_file_number);
Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str()); Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
Status s = env_->NewSequentialFile(fname, &file, storage_options_); Status s = env_->NewSequentialFile(fname, &file, storage_options_);

@ -332,6 +332,10 @@ class VersionSet {
// Return the current manifest file number // Return the current manifest file number
uint64_t ManifestFileNumber() const { return manifest_file_number_; } uint64_t ManifestFileNumber() const { return manifest_file_number_; }
uint64_t PendingManifestFileNumber() const {
return pending_manifest_file_number_;
}
// Allocate and return a new file number // Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_++; } uint64_t NewFileNumber() { return next_file_number_++; }
@ -426,7 +430,8 @@ class VersionSet {
void AppendVersion(ColumnFamilyData* column_family_data, Version* v); void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
bool ManifestContains(const std::string& record) const; bool ManifestContains(uint64_t manifest_file_number,
const std::string& record) const;
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options, ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options,
VersionEdit* edit); VersionEdit* edit);
@ -438,6 +443,7 @@ class VersionSet {
const DBOptions* const options_; const DBOptions* const options_;
uint64_t next_file_number_; uint64_t next_file_number_;
uint64_t manifest_file_number_; uint64_t manifest_file_number_;
uint64_t pending_manifest_file_number_;
std::atomic<uint64_t> last_sequence_; std::atomic<uint64_t> last_sequence_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted uint64_t prev_log_number_; // 0 or backing store for memtable being compacted

@ -49,8 +49,6 @@ struct EnvOptions {
// construct from Options // construct from Options
explicit EnvOptions(const DBOptions& options); explicit EnvOptions(const DBOptions& options);
EnvOptions AdaptForLogWrite() const;
// If true, then allow caching of data in environment buffers // If true, then allow caching of data in environment buffers
bool use_os_buffer = true; bool use_os_buffer = true;
@ -61,13 +59,21 @@ struct EnvOptions {
bool use_mmap_writes = true; bool use_mmap_writes = true;
// If true, set the FD_CLOEXEC on open fd. // If true, set the FD_CLOEXEC on open fd.
bool set_fd_cloexec= true; bool set_fd_cloexec = true;
// Allows OS to incrementally sync files to disk while they are being // Allows OS to incrementally sync files to disk while they are being
// written, in the background. Issue one request for every bytes_per_sync // written, in the background. Issue one request for every bytes_per_sync
// written. 0 turns it off. // written. 0 turns it off.
// Default: 0 // Default: 0
uint64_t bytes_per_sync = 0; uint64_t bytes_per_sync = 0;
// If true, we will preallocate the file with FALLOC_FL_KEEP_SIZE flag, which
// means that file size won't change as part of preallocation.
// If false, preallocation will also change the file size. This option will
// improve the performance in workloads where you sync the data on every
// write. By default, we set it to true for MANIFEST writes and false for
// WAL writes
bool fallocate_with_keep_size = true;
}; };
class Env { class Env {
@ -260,6 +266,16 @@ class Env {
// Generates a unique id that can be used to identify a db // Generates a unique id that can be used to identify a db
virtual std::string GenerateUniqueId(); virtual std::string GenerateUniqueId();
// OptimizeForLogWrite will create a new EnvOptions object that is a copy of
// the EnvOptions in the parameters, but is optimized for writing log files.
// Default implementation returns the copy of the same object.
virtual EnvOptions OptimizeForLogWrite(const EnvOptions& env_options) const;
// OptimizeForManifestWrite will create a new EnvOptions object that is a copy
// of the EnvOptions in the parameters, but is optimized for writing manifest
// files. Default implementation returns the copy of the same object.
virtual EnvOptions OptimizeForManifestWrite(const EnvOptions& env_options)
const;
private: private:
// No copying allowed // No copying allowed
Env(const Env&); Env(const Env&);

@ -241,10 +241,12 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
} }
EnvOptions EnvOptions::AdaptForLogWrite() const { EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options) const {
EnvOptions adapted = *this; return env_options;
adapted.use_mmap_writes = false; }
return adapted;
EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
return env_options;
} }
EnvOptions::EnvOptions(const DBOptions& options) { EnvOptions::EnvOptions(const DBOptions& options) {

@ -354,9 +354,9 @@ class PosixMmapFile : public WritableFile {
char* dst_; // Where to write next (in range [base_,limit_]) char* dst_; // Where to write next (in range [base_,limit_])
char* last_sync_; // Where have we synced up to char* last_sync_; // Where have we synced up to
uint64_t file_offset_; // Offset of base_ in file uint64_t file_offset_; // Offset of base_ in file
// Have we done an munmap of unsynced data? // Have we done an munmap of unsynced data?
bool pending_sync_; bool pending_sync_;
bool fallocate_with_keep_size_;
// Roundup x to a multiple of y // Roundup x to a multiple of y
static size_t Roundup(size_t x, size_t y) { static size_t Roundup(size_t x, size_t y) {
@ -399,7 +399,12 @@ class PosixMmapFile : public WritableFile {
assert(base_ == nullptr); assert(base_ == nullptr);
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
int alloc_status = posix_fallocate(fd_, file_offset_, map_size_); // we can't fallocate with FALLOC_FL_KEEP_SIZE here
int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
if (alloc_status != 0) {
// fallback to posix_fallocate
alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
}
if (alloc_status != 0) { if (alloc_status != 0) {
return Status::IOError("Error allocating space to file : " + filename_ + return Status::IOError("Error allocating space to file : " + filename_ +
"Error : " + strerror(alloc_status)); "Error : " + strerror(alloc_status));
@ -436,7 +441,8 @@ class PosixMmapFile : public WritableFile {
dst_(nullptr), dst_(nullptr),
last_sync_(nullptr), last_sync_(nullptr),
file_offset_(0), file_offset_(0),
pending_sync_(false) { pending_sync_(false),
fallocate_with_keep_size_(options.fallocate_with_keep_size) {
assert((page_size & (page_size - 1)) == 0); assert((page_size & (page_size - 1)) == 0);
assert(options.use_mmap_writes); assert(options.use_mmap_writes);
} }
@ -584,7 +590,9 @@ class PosixMmapFile : public WritableFile {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) { virtual Status Allocate(off_t offset, off_t len) {
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) { int alloc_status = fallocate(
fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len);
if (alloc_status == 0) {
return Status::OK(); return Status::OK();
} else { } else {
return IOError(filename_, errno); return IOError(filename_, errno);
@ -606,20 +614,22 @@ class PosixWritableFile : public WritableFile {
bool pending_fsync_; bool pending_fsync_;
uint64_t last_sync_size_; uint64_t last_sync_size_;
uint64_t bytes_per_sync_; uint64_t bytes_per_sync_;
bool fallocate_with_keep_size_;
public: public:
PosixWritableFile(const std::string& fname, int fd, size_t capacity, PosixWritableFile(const std::string& fname, int fd, size_t capacity,
const EnvOptions& options) : const EnvOptions& options)
filename_(fname), : filename_(fname),
fd_(fd), fd_(fd),
cursize_(0), cursize_(0),
capacity_(capacity), capacity_(capacity),
buf_(new char[capacity]), buf_(new char[capacity]),
filesize_(0), filesize_(0),
pending_sync_(false), pending_sync_(false),
pending_fsync_(false), pending_fsync_(false),
last_sync_size_(0), last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync) { bytes_per_sync_(options.bytes_per_sync),
fallocate_with_keep_size_(options.fallocate_with_keep_size) {
assert(!options.use_mmap_writes); assert(!options.use_mmap_writes);
} }
@ -771,7 +781,9 @@ class PosixWritableFile : public WritableFile {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) { virtual Status Allocate(off_t offset, off_t len) {
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) { int alloc_status = fallocate(
fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len);
if (alloc_status == 0) {
return Status::OK(); return Status::OK();
} else { } else {
return IOError(filename_, errno); return IOError(filename_, errno);
@ -797,14 +809,15 @@ class PosixRandomRWFile : public RandomRWFile {
int fd_; int fd_;
bool pending_sync_; bool pending_sync_;
bool pending_fsync_; bool pending_fsync_;
bool fallocate_with_keep_size_;
public: public:
PosixRandomRWFile(const std::string& fname, int fd, PosixRandomRWFile(const std::string& fname, int fd, const EnvOptions& options)
const EnvOptions& options) : : filename_(fname),
filename_(fname), fd_(fd),
fd_(fd), pending_sync_(false),
pending_sync_(false), pending_fsync_(false),
pending_fsync_(false) { fallocate_with_keep_size_(options.fallocate_with_keep_size) {
assert(!options.use_mmap_writes && !options.use_mmap_reads); assert(!options.use_mmap_writes && !options.use_mmap_reads);
} }
@ -874,7 +887,10 @@ class PosixRandomRWFile : public RandomRWFile {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) { virtual Status Allocate(off_t offset, off_t len) {
if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) { TEST_KILL_RANDOM(rocksdb_kill_odds);
int alloc_status = fallocate(
fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len);
if (alloc_status == 0) {
return Status::OK(); return Status::OK();
} else { } else {
return IOError(filename_, errno); return IOError(filename_, errno);
@ -1332,6 +1348,20 @@ class PosixEnv : public Env {
return dummy; return dummy;
} }
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options) const {
EnvOptions optimized = env_options;
optimized.use_mmap_writes = false;
optimized.fallocate_with_keep_size = true;
return optimized;
}
EnvOptions OptimizeForManifestWrite(const EnvOptions& env_options) const {
EnvOptions optimized = env_options;
optimized.use_mmap_writes = false;
optimized.fallocate_with_keep_size = true;
return optimized;
}
private: private:
bool checkedDiskForMmap_; bool checkedDiskForMmap_;
bool forceMmapOff; // do we override Env options? bool forceMmapOff; // do we override Env options?

Loading…
Cancel
Save