@ -81,8 +81,8 @@ class NullWritableFile : public WritableFile {
// Fix user-supplied options to be reasonable
template < class T , class V >
static void ClipToRange ( T * ptr , V minvalue , V maxvalue ) {
if ( * ptr > maxvalue ) * ptr = maxvalue ;
if ( * ptr < minvalue ) * ptr = minvalue ;
if ( static_cast < V > ( * ptr ) > maxvalue ) * ptr = maxvalue ;
if ( static_cast < V > ( * ptr ) < minvalue ) * ptr = minvalue ;
}
Options SanitizeOptions ( const std : : string & dbname ,
const InternalKeyComparator * icmp ,
@ -91,7 +91,6 @@ Options SanitizeOptions(const std::string& dbname,
result . comparator = icmp ;
ClipToRange ( & result . max_open_files , 20 , 50000 ) ;
ClipToRange ( & result . write_buffer_size , 64 < < 10 , 1 < < 30 ) ;
ClipToRange ( & result . large_value_threshold , 16 < < 10 , 1 < < 30 ) ;
ClipToRange ( & result . block_size , 1 < < 10 , 4 < < 20 ) ;
if ( result . info_log = = NULL ) {
// Open a log file in the same directory as the db
@ -213,15 +212,12 @@ void DBImpl::DeleteObsoleteFiles() {
std : : set < uint64_t > live = pending_outputs_ ;
versions_ - > AddLiveFiles ( & live ) ;
versions_ - > CleanupLargeValueRefs ( live ) ;
std : : vector < std : : string > filenames ;
env_ - > GetChildren ( dbname_ , & filenames ) ; // Ignoring errors on purpose
uint64_t number ;
LargeValueRef large_ref ;
FileType type ;
for ( in t i = 0 ; i < filenames . size ( ) ; i + + ) {
if ( ParseFileName ( filenames [ i ] , & number , & large_ref , & type ) ) {
for ( size_ t i = 0 ; i < filenames . size ( ) ; i + + ) {
if ( ParseFileName ( filenames [ i ] , & number , & type ) ) {
bool keep = true ;
switch ( type ) {
case kLogFile :
@ -241,9 +237,6 @@ void DBImpl::DeleteObsoleteFiles() {
// be recorded in pending_outputs_, which is inserted into "live"
keep = ( live . find ( number ) ! = live . end ( ) ) ;
break ;
case kLargeValueFile :
keep = versions_ - > LargeValueIsLive ( large_ref ) ;
break ;
case kCurrentFile :
case kDBLockFile :
case kInfoLogFile :
@ -599,7 +592,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact) {
assert ( compact - > outfile = = NULL ) ;
}
delete compact - > outfile ;
for ( in t i = 0 ; i < compact - > outputs . size ( ) ; i + + ) {
for ( size_ t i = 0 ; i < compact - > outputs . size ( ) ; i + + ) {
const CompactionState : : Output & out = compact - > outputs [ i ] ;
pending_outputs_ . erase ( out . number ) ;
}
@ -695,7 +688,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
// Add compaction outputs
compact - > compaction - > AddInputDeletions ( compact - > compaction - > edit ( ) ) ;
const int level = compact - > compaction - > level ( ) ;
for ( in t i = 0 ; i < compact - > outputs . size ( ) ; i + + ) {
for ( size_ t i = 0 ; i < compact - > outputs . size ( ) ; i + + ) {
const CompactionState : : Output & out = compact - > outputs [ i ] ;
compact - > compaction - > edit ( ) - > AddFile (
level + 1 ,
@ -710,7 +703,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
DeleteObsoleteFiles ( ) ;
} else {
// Discard any files we may have created during this failed compaction
for ( in t i = 0 ; i < compact - > outputs . size ( ) ; i + + ) {
for ( size_ t i = 0 ; i < compact - > outputs . size ( ) ; i + + ) {
env_ - > DeleteFile ( TableFileName ( dbname_ , compact - > outputs [ i ] . number ) ) ;
}
}
@ -811,7 +804,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
" %d smallest_snapshot: %d " ,
ikey . user_key . ToString ( ) . c_str ( ) ,
( int ) ikey . sequence , ikey . type , kTypeLarge ValueRef , drop ,
( int ) ikey . sequence , ikey . type , kTypeValue , drop ,
compact - > compaction - > IsBaseLevelForKey ( ikey . user_key ) ,
( int ) last_sequence_for_key , ( int ) compact - > smallest_snapshot ) ;
# endif
@ -828,26 +821,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact - > current_output ( ) - > smallest . DecodeFrom ( key ) ;
}
compact - > current_output ( ) - > largest . DecodeFrom ( key ) ;
if ( ikey . type = = kTypeLargeValueRef ) {
if ( input - > value ( ) . size ( ) ! = LargeValueRef : : ByteSize ( ) ) {
if ( options_ . paranoid_checks ) {
status = Status : : Corruption ( " invalid large value ref " ) ;
break ;
} else {
Log ( env_ , options_ . info_log ,
" compaction found invalid large value ref " ) ;
}
} else {
compact - > compaction - > edit ( ) - > AddLargeValueRef (
LargeValueRef : : FromRef ( input - > value ( ) ) ,
compact - > current_output ( ) - > number ,
input - > key ( ) ) ;
compact - > builder - > Add ( key , input - > value ( ) ) ;
}
} else {
compact - > builder - > Add ( key , input - > value ( ) ) ;
}
// Close output file if it is big enough
if ( compact - > builder - > FileSize ( ) > =
@ -881,7 +855,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
stats . bytes_read + = compact - > compaction - > input ( which , i ) - > file_size ;
}
}
for ( in t i = 0 ; i < compact - > outputs . size ( ) ; i + + ) {
for ( size_ t i = 0 ; i < compact - > outputs . size ( ) ; i + + ) {
stats . bytes_written + = compact - > outputs [ i ] . file_size ;
}
@ -985,40 +959,27 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
Status DBImpl : : Write ( const WriteOptions & options , WriteBatch * updates ) {
Status status ;
WriteBatch * final = NULL ;
{
MutexLock l ( & mutex_ ) ;
status = MakeRoomForWrite ( false ) ; // May temporarily release lock and wait
uint64_t last_sequence = versions_ - > LastSequence ( ) ;
if ( status . ok ( ) ) {
status = HandleLargeValues ( last_sequence + 1 , updates , & final ) ;
}
if ( status . ok ( ) ) {
WriteBatchInternal : : SetSequence ( final , last_sequence + 1 ) ;
last_sequence + = WriteBatchInternal : : Count ( final ) ;
WriteBatchInternal : : SetSequence ( updates , last_sequence + 1 ) ;
last_sequence + = WriteBatchInternal : : Count ( updates ) ;
versions_ - > SetLastSequence ( last_sequence ) ;
// Add to log and apply to memtable
status = log_ - > AddRecord ( WriteBatchInternal : : Contents ( final ) ) ;
status = log_ - > AddRecord ( WriteBatchInternal : : Contents ( updates ) ) ;
if ( status . ok ( ) & & options . sync ) {
status = logfile_ - > Sync ( ) ;
}
if ( status . ok ( ) ) {
status = WriteBatchInternal : : InsertInto ( final , mem_ ) ;
status = WriteBatchInternal : : InsertInto ( updates , mem_ ) ;
}
}
if ( options . post_write_snapshot ! = NULL ) {
* options . post_write_snapshot =
status . ok ( ) ? snapshots_ . New ( last_sequence ) : NULL ;
}
}
if ( final ! = updates ) {
delete final ;
}
return status ;
}
@ -1070,124 +1031,6 @@ Status DBImpl::MakeRoomForWrite(bool force) {
return s ;
}
bool DBImpl : : HasLargeValues ( const WriteBatch & batch ) const {
if ( WriteBatchInternal : : ByteSize ( & batch ) > = options_ . large_value_threshold ) {
for ( WriteBatchInternal : : Iterator it ( batch ) ; ! it . Done ( ) ; it . Next ( ) ) {
if ( it . op ( ) = = kTypeValue & &
it . value ( ) . size ( ) > = options_ . large_value_threshold ) {
return true ;
}
}
}
return false ;
}
// Given "raw_value", determines the appropriate compression format to use
// and stores the data that should be written to the large value file in
// "*file_bytes", and sets "*ref" to the appropriate large value reference.
// May use "*scratch" as backing store for "*file_bytes".
void DBImpl : : MaybeCompressLargeValue (
const Slice & raw_value ,
Slice * file_bytes ,
std : : string * scratch ,
LargeValueRef * ref ) {
switch ( options_ . compression ) {
case kSnappyCompression : {
if ( port : : Snappy_Compress ( raw_value . data ( ) , raw_value . size ( ) , scratch ) & &
( scratch - > size ( ) < ( raw_value . size ( ) / 8 ) * 7 ) ) {
* file_bytes = * scratch ;
* ref = LargeValueRef : : Make ( raw_value , kSnappyCompression ) ;
return ;
}
// Less than 12.5% compression: just leave as uncompressed data
break ;
}
case kNoCompression :
// Use default code outside of switch
break ;
}
// Store as uncompressed data
* file_bytes = raw_value ;
* ref = LargeValueRef : : Make ( raw_value , kNoCompression ) ;
}
Status DBImpl : : HandleLargeValues ( SequenceNumber assigned_seq ,
WriteBatch * updates ,
WriteBatch * * final ) {
if ( ! HasLargeValues ( * updates ) ) {
// Fast path: no large values found
* final = updates ;
} else {
// Copy *updates to a new WriteBatch, replacing the references to
* final = new WriteBatch ;
SequenceNumber seq = assigned_seq ;
for ( WriteBatchInternal : : Iterator it ( * updates ) ; ! it . Done ( ) ; it . Next ( ) ) {
switch ( it . op ( ) ) {
case kTypeValue :
if ( it . value ( ) . size ( ) < options_ . large_value_threshold ) {
( * final ) - > Put ( it . key ( ) , it . value ( ) ) ;
} else {
std : : string scratch ;
Slice file_bytes ;
LargeValueRef large_ref ;
MaybeCompressLargeValue (
it . value ( ) , & file_bytes , & scratch , & large_ref ) ;
InternalKey ikey ( it . key ( ) , seq , kTypeLargeValueRef ) ;
if ( versions_ - > RegisterLargeValueRef (
large_ref , versions_ - > LogNumber ( ) , ikey ) ) {
// TODO(opt): avoid holding the lock here (but be careful about
// another thread doing a Write and switching logs or
// having us get a different "assigned_seq" value).
uint64_t tmp_number = versions_ - > NewFileNumber ( ) ;
pending_outputs_ . insert ( tmp_number ) ;
std : : string tmp = TempFileName ( dbname_ , tmp_number ) ;
WritableFile * file ;
Status s = env_ - > NewWritableFile ( tmp , & file ) ;
if ( ! s . ok ( ) ) {
return s ; // Caller will delete *final
}
file - > Append ( file_bytes ) ;
s = file - > Close ( ) ;
delete file ;
if ( s . ok ( ) ) {
const std : : string fname =
LargeValueFileName ( dbname_ , large_ref ) ;
s = env_ - > RenameFile ( tmp , fname ) ;
} else {
Log ( env_ , options_ . info_log , " Write large value: %s " ,
s . ToString ( ) . c_str ( ) ) ;
}
pending_outputs_ . erase ( tmp_number ) ;
if ( ! s . ok ( ) ) {
env_ - > DeleteFile ( tmp ) ; // Cleanup; intentionally ignoring error
return s ; // Caller will delete *final
}
}
// Put an indirect reference in the write batch in place
// of large value
WriteBatchInternal : : PutLargeValueRef ( * final , it . key ( ) , large_ref ) ;
}
break ;
case kTypeLargeValueRef :
return Status : : Corruption ( " Corrupted write batch " ) ;
break ;
case kTypeDeletion :
( * final ) - > Delete ( it . key ( ) ) ;
break ;
}
seq = seq + 1 ;
}
}
return Status : : OK ( ) ;
}
bool DBImpl : : GetProperty ( const Slice & property , std : : string * value ) {
value - > clear ( ) ;
@ -1205,7 +1048,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
return false ;
} else {
char buf [ 100 ] ;
snprintf ( buf , sizeof ( buf ) , " %d " , versions_ - > NumLevelFiles ( level ) ) ;
snprintf ( buf , sizeof ( buf ) , " %d " ,
versions_ - > NumLevelFiles ( static_cast < int > ( level ) ) ) ;
* value = buf ;
return true ;
}
@ -1325,10 +1169,9 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
Status result = env - > LockFile ( LockFileName ( dbname ) , & lock ) ;
if ( result . ok ( ) ) {
uint64_t number ;
LargeValueRef large_ref ;
FileType type ;
for ( in t i = 0 ; i < filenames . size ( ) ; i + + ) {
if ( ParseFileName ( filenames [ i ] , & number , & large_ref , & type ) ) {
for ( size_ t i = 0 ; i < filenames . size ( ) ; i + + ) {
if ( ParseFileName ( filenames [ i ] , & number , & type ) ) {
Status del = env - > DeleteFile ( dbname + " / " + filenames [ i ] ) ;
if ( result . ok ( ) & & ! del . ok ( ) ) {
result = del ;