@ -14,19 +14,20 @@
namespace rocksdb {
namespace rocksdb {
void DBWithTTLImpl : : SanitizeOptions ( int32_t ttl , ColumnFamilyOptions * options ) {
void DBWithTTLImpl : : SanitizeOptions ( int32_t ttl , ColumnFamilyOptions * options ,
Env * env ) {
if ( options - > compaction_filter ) {
if ( options - > compaction_filter ) {
options - > compaction_filter =
options - > compaction_filter =
new TtlCompactionFilter ( ttl , options - > compaction_filter ) ;
new TtlCompactionFilter ( ttl , env , options - > compaction_filter ) ;
} else {
} else {
options - > compaction_filter_factory =
options - > compaction_filter_factory =
std : : shared_ptr < CompactionFilterFactory > ( new TtlCompactionFilterFactory (
std : : shared_ptr < CompactionFilterFactory > ( new TtlCompactionFilterFactory (
ttl , options - > compaction_filter_factory ) ) ;
ttl , env , options - > compaction_filter_factory ) ) ;
}
}
if ( options - > merge_operator ) {
if ( options - > merge_operator ) {
options - > merge_operator . reset (
options - > merge_operator . reset (
new TtlMergeOperator ( options - > merge_operator ) ) ;
new TtlMergeOperator ( options - > merge_operator , env ) ) ;
}
}
}
}
@ -81,8 +82,9 @@ Status DBWithTTL::Open(
std : : vector < ColumnFamilyDescriptor > column_families_sanitized =
std : : vector < ColumnFamilyDescriptor > column_families_sanitized =
column_families ;
column_families ;
for ( size_t i = 0 ; i < column_families_sanitized . size ( ) ; + + i ) {
for ( size_t i = 0 ; i < column_families_sanitized . size ( ) ; + + i ) {
DBWithTTLImpl : : SanitizeOptions ( ttls [ i ] ,
DBWithTTLImpl : : SanitizeOptions (
& column_families_sanitized [ i ] . options ) ;
ttls [ i ] , & column_families_sanitized [ i ] . options ,
db_options . env = = nullptr ? Env : : Default ( ) : db_options . env ) ;
}
}
DB * db ;
DB * db ;
@ -105,7 +107,7 @@ Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
const ColumnFamilyOptions & options , const std : : string & column_family_name ,
const ColumnFamilyOptions & options , const std : : string & column_family_name ,
ColumnFamilyHandle * * handle , int ttl ) {
ColumnFamilyHandle * * handle , int ttl ) {
ColumnFamilyOptions sanitized_options = options ;
ColumnFamilyOptions sanitized_options = options ;
DBWithTTLImpl : : SanitizeOptions ( ttl , & sanitized_options ) ;
DBWithTTLImpl : : SanitizeOptions ( ttl , & sanitized_options , GetEnv ( ) ) ;
return DBWithTTL : : CreateColumnFamily ( sanitized_options , column_family_name ,
return DBWithTTL : : CreateColumnFamily ( sanitized_options , column_family_name ,
handle ) ;
handle ) ;
@ -117,18 +119,14 @@ Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
return CreateColumnFamilyWithTtl ( options , column_family_name , handle , 0 ) ;
return CreateColumnFamilyWithTtl ( options , column_family_name , handle , 0 ) ;
}
}
// Gives back the current time
Status DBWithTTLImpl : : GetCurrentTime ( int64_t * curtime ) {
return Env : : Default ( ) - > GetCurrentTime ( curtime ) ;
}
// Appends the current timestamp to the string.
// Appends the current timestamp to the string.
// Returns false if could not get the current_time, true if append succeeds
// Returns false if could not get the current_time, true if append succeeds
Status DBWithTTLImpl : : AppendTS ( const Slice & val , std : : string * val_with_ts ) {
Status DBWithTTLImpl : : AppendTS ( const Slice & val , std : : string * val_with_ts ,
Env * env ) {
val_with_ts - > reserve ( kTSLength + val . size ( ) ) ;
val_with_ts - > reserve ( kTSLength + val . size ( ) ) ;
char ts_string [ kTSLength ] ;
char ts_string [ kTSLength ] ;
int64_t curtime ;
int64_t curtime ;
Status st = GetCurrentTime ( & curtime ) ;
Status st = env - > GetCurrentTime ( & curtime ) ;
if ( ! st . ok ( ) ) {
if ( ! st . ok ( ) ) {
return st ;
return st ;
}
}
@ -154,12 +152,12 @@ Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
}
}
// Checks if the string is stale or not according to TTl provided
// Checks if the string is stale or not according to TTl provided
bool DBWithTTLImpl : : IsStale ( const Slice & value , int32_t ttl ) {
bool DBWithTTLImpl : : IsStale ( const Slice & value , int32_t ttl , Env * env ) {
if ( ttl < = 0 ) { // Data is fresh if TTL is non-positive
if ( ttl < = 0 ) { // Data is fresh if TTL is non-positive
return false ;
return false ;
}
}
int64_t curtime ;
int64_t curtime ;
if ( ! GetCurrentTime ( & curtime ) . ok ( ) ) {
if ( ! env - > GetCurrentTime ( & curtime ) . ok ( ) ) {
return false ; // Treat the data as fresh if could not get current time
return false ; // Treat the data as fresh if could not get current time
}
}
int32_t timestamp_value =
int32_t timestamp_value =
@ -232,12 +230,13 @@ Status DBWithTTLImpl::Merge(const WriteOptions& options,
Status DBWithTTLImpl : : Write ( const WriteOptions & opts , WriteBatch * updates ) {
Status DBWithTTLImpl : : Write ( const WriteOptions & opts , WriteBatch * updates ) {
class Handler : public WriteBatch : : Handler {
class Handler : public WriteBatch : : Handler {
public :
public :
explicit Handler ( Env * env ) : env_ ( env ) { }
WriteBatch updates_ttl ;
WriteBatch updates_ttl ;
Status batch_rewrite_status ;
Status batch_rewrite_status ;
virtual Status PutCF ( uint32_t column_family_id , const Slice & key ,
virtual Status PutCF ( uint32_t column_family_id , const Slice & key ,
const Slice & value ) {
const Slice & value ) {
std : : string value_with_ts ;
std : : string value_with_ts ;
Status st = AppendTS ( value , & value_with_ts ) ;
Status st = AppendTS ( value , & value_with_ts , env_ ) ;
if ( ! st . ok ( ) ) {
if ( ! st . ok ( ) ) {
batch_rewrite_status = st ;
batch_rewrite_status = st ;
} else {
} else {
@ -249,7 +248,7 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
virtual Status MergeCF ( uint32_t column_family_id , const Slice & key ,
virtual Status MergeCF ( uint32_t column_family_id , const Slice & key ,
const Slice & value ) {
const Slice & value ) {
std : : string value_with_ts ;
std : : string value_with_ts ;
Status st = AppendTS ( value , & value_with_ts ) ;
Status st = AppendTS ( value , & value_with_ts , env_ ) ;
if ( ! st . ok ( ) ) {
if ( ! st . ok ( ) ) {
batch_rewrite_status = st ;
batch_rewrite_status = st ;
} else {
} else {
@ -263,8 +262,11 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
virtual void LogData ( const Slice & blob ) { updates_ttl . PutLogData ( blob ) ; }
virtual void LogData ( const Slice & blob ) { updates_ttl . PutLogData ( blob ) ; }
private :
Env * env_ ;
} ;
} ;
Handler handler ;
Handler handler ( GetEnv ( ) ) ;
updates - > Iterate ( & handler ) ;
updates - > Iterate ( & handler ) ;
if ( ! handler . batch_rewrite_status . ok ( ) ) {
if ( ! handler . batch_rewrite_status . ok ( ) ) {
return handler . batch_rewrite_status ;
return handler . batch_rewrite_status ;