@ -7,8 +7,10 @@
// Use of this source code is governed by a BSD-style license that can be
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// found in the LICENSE file. See the AUTHORS file for names of contributors.
# define __STDC_FORMAT_MACROS
# include "db/version_set.h"
# include "db/version_set.h"
# include <inttypes.h>
# include <algorithm>
# include <algorithm>
# include <climits>
# include <climits>
# include <stdio.h>
# include <stdio.h>
@ -1435,6 +1437,7 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
icmp_ ( * cmp ) ,
icmp_ ( * cmp ) ,
next_file_number_ ( 2 ) ,
next_file_number_ ( 2 ) ,
manifest_file_number_ ( 0 ) , // Filled by Recover()
manifest_file_number_ ( 0 ) , // Filled by Recover()
pending_manifest_file_number_ ( 0 ) ,
last_sequence_ ( 0 ) ,
last_sequence_ ( 0 ) ,
log_number_ ( 0 ) ,
log_number_ ( 0 ) ,
prev_log_number_ ( 0 ) ,
prev_log_number_ ( 0 ) ,
@ -1527,22 +1530,17 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// Initialize new descriptor log file if necessary by creating
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
// a temporary file that contains a snapshot of the current version.
std : : string new_manifest_filename ;
uint64_t new_manifest_file_size = 0 ;
uint64_t new_manifest_file_size = 0 ;
Status s ;
Status s ;
// we will need this if we are creating new manifest
uint64_t old_manifest_file_number = manifest_file_number_ ;
// No need to perform this check if a new Manifest is being created anyways.
assert ( pending_manifest_file_number_ = = 0 ) ;
if ( ! descriptor_log_ | |
if ( ! descriptor_log_ | |
manifest_file_size_ > options_ - > max_manifest_file_size ) {
manifest_file_size_ > options_ - > max_manifest_file_size ) {
pending_manifest_file_number_ = NewFileNumber ( ) ;
batch_edits . back ( ) - > SetNextFile ( next_file_number_ ) ;
new_descriptor_log = true ;
new_descriptor_log = true ;
manifest_file_number_ = NewFileNumber ( ) ; // Change manifest file no.
} else {
}
pending_manifest_file_number_ = manifest_file_number_ ;
if ( new_descriptor_log ) {
new_manifest_filename = DescriptorFileName ( dbname_ , manifest_file_number_ ) ;
edit - > SetNextFile ( next_file_number_ ) ;
}
}
// Unlock during expensive operations. New writes cannot get here
// Unlock during expensive operations. New writes cannot get here
@ -1562,10 +1560,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// This is fine because everything inside of this block is serialized --
// This is fine because everything inside of this block is serialized --
// only one thread can be here at the same time
// only one thread can be here at the same time
if ( ! new_manifest_filename . empty ( ) ) {
if ( new_descriptor_log ) {
unique_ptr < WritableFile > descriptor_file ;
unique_ptr < WritableFile > descriptor_file ;
s = env_ - > NewWritableFile ( new_manifest_filename , & descriptor_file ,
s = env_ - > NewWritableFile (
storage_options_ . AdaptForLogWrite ( ) ) ;
DescriptorFileName ( dbname_ , pending_manifest_file_number_ ) ,
& descriptor_file , storage_options_ . AdaptForLogWrite ( ) ) ;
if ( s . ok ( ) ) {
if ( s . ok ( ) ) {
descriptor_log_ . reset ( new log : : Writer ( std : : move ( descriptor_file ) ) ) ;
descriptor_log_ . reset ( new log : : Writer ( std : : move ( descriptor_file ) ) ) ;
s = WriteSnapshot ( descriptor_log_ . get ( ) ) ;
s = WriteSnapshot ( descriptor_log_ . get ( ) ) ;
@ -1604,7 +1603,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
for ( auto & e : batch_edits ) {
for ( auto & e : batch_edits ) {
std : : string record ;
std : : string record ;
e - > EncodeTo ( & record ) ;
e - > EncodeTo ( & record ) ;
if ( ! ManifestContains ( record ) ) {
if ( ! ManifestContains ( pending_manifest_file_number_ , record ) ) {
all_records_in = false ;
all_records_in = false ;
break ;
break ;
}
}
@ -1621,17 +1620,16 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// If we just created a new descriptor file, install it by writing a
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
// new CURRENT file that points to it.
if ( s . ok ( ) & & ! new_manifest_filename . empty ( ) ) {
if ( s . ok ( ) & & new_descriptor_log ) {
s = SetCurrentFile ( env_ , dbname_ , manifest_file_number_ ) ;
s = SetCurrentFile ( env_ , dbname_ , pending_ manifest_file_number_) ;
if ( s . ok ( ) & & old_manifest_file_number < manifest_file_number_ ) {
if ( s . ok ( ) & & pending_manifest_file_number_ > manifest_file_number_ ) {
// delete old manifest file
// delete old manifest file
Log ( options_ - > info_log ,
Log ( options_ - > info_log ,
" Deleting manifest %lu current manifest %lu \n " ,
" Deleting manifest % " PRIu64 " current manifest % " PRIu64 " \n " ,
( unsigned long ) old_manifest_file_number ,
manifest_file_number_ , pending_manifest_file_number_ ) ;
( unsigned long ) manifest_file_number_ ) ;
// we don't care about an error here, PurgeObsoleteFiles will take care
// we don't care about an error here, PurgeObsoleteFiles will take care
// of it later
// of it later
env_ - > DeleteFile ( DescriptorFileName ( dbname_ , old_ manifest_file_number) ) ;
env_ - > DeleteFile ( DescriptorFileName ( dbname_ , manifest_file_number_ ) ) ;
}
}
if ( ! options_ - > disableDataSync & & db_directory ! = nullptr ) {
if ( ! options_ - > disableDataSync & & db_directory ! = nullptr ) {
db_directory - > Fsync ( ) ;
db_directory - > Fsync ( ) ;
@ -1649,6 +1647,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// Install the new version
// Install the new version
if ( s . ok ( ) ) {
if ( s . ok ( ) ) {
manifest_file_number_ = pending_manifest_file_number_ ;
manifest_file_size_ = new_manifest_file_size ;
manifest_file_size_ = new_manifest_file_size ;
AppendVersion ( v ) ;
AppendVersion ( v ) ;
if ( max_log_number_in_batch ! = 0 ) {
if ( max_log_number_in_batch ! = 0 ) {
@ -1656,16 +1655,17 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
log_number_ = max_log_number_in_batch ;
log_number_ = max_log_number_in_batch ;
}
}
prev_log_number_ = edit - > prev_log_number_ ;
prev_log_number_ = edit - > prev_log_number_ ;
} else {
} else {
Log ( options_ - > info_log , " Error in committing version %lu " ,
Log ( options_ - > info_log , " Error in committing version %lu " ,
( unsigned long ) v - > GetVersionNumber ( ) ) ;
( unsigned long ) v - > GetVersionNumber ( ) ) ;
delete v ;
delete v ;
if ( ! new_manifest_filename . empty ( ) ) {
if ( new_descriptor_log ) {
descriptor_log_ . reset ( ) ;
descriptor_log_ . reset ( ) ;
env_ - > DeleteFile ( new_manifest_filename ) ;
env_ - > DeleteFile (
DescriptorFileName ( dbname_ , pending_manifest_file_number_ ) ) ;
}
}
}
}
pending_manifest_file_number_ = 0 ;
// wake up all the waiting writers
// wake up all the waiting writers
while ( true ) {
while ( true ) {
@ -2103,8 +2103,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// Opens the mainfest file and reads all records
// Opens the mainfest file and reads all records
// till it finds the record we are looking for.
// till it finds the record we are looking for.
bool VersionSet : : ManifestContains ( const std : : string & record ) const {
bool VersionSet : : ManifestContains ( uint64_t manifest_file_number ,
std : : string fname = DescriptorFileName ( dbname_ , manifest_file_number_ ) ;
const std : : string & record ) const {
std : : string fname =
DescriptorFileName ( dbname_ , manifest_file_number ) ;
Log ( options_ - > info_log , " ManifestContains: checking %s \n " , fname . c_str ( ) ) ;
Log ( options_ - > info_log , " ManifestContains: checking %s \n " , fname . c_str ( ) ) ;
unique_ptr < SequentialFile > file ;
unique_ptr < SequentialFile > file ;
Status s = env_ - > NewSequentialFile ( fname , & file , storage_options_ ) ;
Status s = env_ - > NewSequentialFile ( fname , & file , storage_options_ ) ;