@ -11,6 +11,7 @@
# include "table/block_based_table_builder.h"
# include "table/block_based_table_builder.h"
# include "table/sst_file_writer_collectors.h"
# include "table/sst_file_writer_collectors.h"
# include "util/file_reader_writer.h"
# include "util/file_reader_writer.h"
# include "util/sync_point.h"
namespace rocksdb {
namespace rocksdb {
@ -18,15 +19,19 @@ const std::string ExternalSstFilePropertyNames::kVersion =
" rocksdb.external_sst_file.version " ;
" rocksdb.external_sst_file.version " ;
const std : : string ExternalSstFilePropertyNames : : kGlobalSeqno =
const std : : string ExternalSstFilePropertyNames : : kGlobalSeqno =
" rocksdb.external_sst_file.global_seqno " ;
" rocksdb.external_sst_file.global_seqno " ;
const size_t kFadviseTrigger = 1024 * 1024 ; // 1MB
struct SstFileWriter : : Rep {
struct SstFileWriter : : Rep {
Rep ( const EnvOptions & _env_options , const Options & options ,
Rep ( const EnvOptions & _env_options , const Options & options ,
const Comparator * _user_comparator , ColumnFamilyHandle * _cfh )
const Comparator * _user_comparator , ColumnFamilyHandle * _cfh ,
bool _invalidate_page_cache )
: env_options ( _env_options ) ,
: env_options ( _env_options ) ,
ioptions ( options ) ,
ioptions ( options ) ,
mutable_cf_options ( options ) ,
mutable_cf_options ( options ) ,
internal_comparator ( _user_comparator ) ,
internal_comparator ( _user_comparator ) ,
cfh ( _cfh ) { }
cfh ( _cfh ) ,
invalidate_page_cache ( _invalidate_page_cache ) ,
last_fadvise_size ( 0 ) { }
std : : unique_ptr < WritableFileWriter > file_writer ;
std : : unique_ptr < WritableFileWriter > file_writer ;
std : : unique_ptr < TableBuilder > builder ;
std : : unique_ptr < TableBuilder > builder ;
@ -38,15 +43,23 @@ struct SstFileWriter::Rep {
InternalKey ikey ;
InternalKey ikey ;
std : : string column_family_name ;
std : : string column_family_name ;
ColumnFamilyHandle * cfh ;
ColumnFamilyHandle * cfh ;
// If true, We will give the OS a hint that this file pages is not needed
// everytime we write 1MB to the file
bool invalidate_page_cache ;
// the size of the file during the last time we called Fadvise to remove
// cached pages from page cache.
uint64_t last_fadvise_size ;
} ;
} ;
SstFileWriter : : SstFileWriter ( const EnvOptions & env_options ,
SstFileWriter : : SstFileWriter ( const EnvOptions & env_options ,
const Options & options ,
const Options & options ,
const Comparator * user_comparator ,
const Comparator * user_comparator ,
ColumnFamilyHandle * column_family )
ColumnFamilyHandle * column_family ,
: rep_ ( new Rep ( env_options , options , user_comparator , column_family ) ) {
bool invalidate_page_cache )
: rep_ ( new Rep ( env_options , options , user_comparator , column_family ,
invalidate_page_cache ) ) {
rep_ - > file_info . file_size = 0 ;
rep_ - > file_info . file_size = 0 ;
}
}
SstFileWriter : : ~ SstFileWriter ( ) {
SstFileWriter : : ~ SstFileWriter ( ) {
if ( rep_ - > builder ) {
if ( rep_ - > builder ) {
@ -143,15 +156,17 @@ Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
}
}
}
}
// TODO(tec) : For external SST files we could omit the seqno and type.
r - > ikey . Set ( user_key , 0 /* Sequence Number */ ,
ValueType : : kTypeValue /* Put */ ) ;
r - > builder - > Add ( r - > ikey . Encode ( ) , value ) ;
// update file info
// update file info
r - > file_info . num_entries + + ;
r - > file_info . num_entries + + ;
r - > file_info . largest_key . assign ( user_key . data ( ) , user_key . size ( ) ) ;
r - > file_info . largest_key . assign ( user_key . data ( ) , user_key . size ( ) ) ;
r - > file_info . file_size = r - > builder - > FileSize ( ) ;
r - > file_info . file_size = r - > builder - > FileSize ( ) ;
// TODO(tec) : For external SST files we could omit the seqno and type.
InvalidatePageCache ( false /* closing */ ) ;
r - > ikey . Set ( user_key , 0 /* Sequence Number */ ,
ValueType : : kTypeValue /* Put */ ) ;
r - > builder - > Add ( r - > ikey . Encode ( ) , value ) ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
@ -166,10 +181,13 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
}
}
Status s = r - > builder - > Finish ( ) ;
Status s = r - > builder - > Finish ( ) ;
r - > file_info . file_size = r - > builder - > FileSize ( ) ;
if ( s . ok ( ) ) {
if ( s . ok ( ) ) {
if ( ! r - > ioptions . disable_data_sync ) {
if ( ! r - > ioptions . disable_data_sync ) {
s = r - > file_writer - > Sync ( r - > ioptions . use_fsync ) ;
s = r - > file_writer - > Sync ( r - > ioptions . use_fsync ) ;
}
}
InvalidatePageCache ( true /* closing */ ) ;
if ( s . ok ( ) ) {
if ( s . ok ( ) ) {
s = r - > file_writer - > Close ( ) ;
s = r - > file_writer - > Close ( ) ;
}
}
@ -181,8 +199,7 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
r - > ioptions . env - > DeleteFile ( r - > file_info . file_path ) ;
r - > ioptions . env - > DeleteFile ( r - > file_info . file_path ) ;
}
}
if ( s . ok ( ) & & file_info ! = nullptr ) {
if ( file_info ! = nullptr ) {
r - > file_info . file_size = r - > builder - > FileSize ( ) ;
* file_info = r - > file_info ;
* file_info = r - > file_info ;
}
}
@ -190,6 +207,24 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
return s ;
return s ;
}
}
void SstFileWriter : : InvalidatePageCache ( bool closing ) {
Rep * r = rep_ ;
if ( r - > invalidate_page_cache = = false ) {
// Fadvise disabled
return ;
}
uint64_t bytes_since_last_fadvise =
r - > builder - > FileSize ( ) - r - > last_fadvise_size ;
if ( bytes_since_last_fadvise > kFadviseTrigger | | closing ) {
TEST_SYNC_POINT_CALLBACK ( " SstFileWriter::InvalidatePageCache " ,
& ( bytes_since_last_fadvise ) ) ;
// Tell the OS that we dont need this file in page cache
r - > file_writer - > InvalidateCache ( 0 , 0 ) ;
r - > last_fadvise_size = r - > builder - > FileSize ( ) ;
}
}
uint64_t SstFileWriter : : FileSize ( ) {
uint64_t SstFileWriter : : FileSize ( ) {
return rep_ - > file_info . file_size ;
return rep_ - > file_info . file_size ;
}
}