fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							284 lines
						
					
					
						
							9.7 KiB
						
					
					
				
			
		
		
	
	
							284 lines
						
					
					
						
							9.7 KiB
						
					
					
				| // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 | |
| // Use of this source code is governed by a BSD-style license that can be
 | |
| // found in the LICENSE file. See the AUTHORS file for names of contributors.
 | |
| #ifndef ROCKSDB_LITE
 | |
| 
 | |
| #include "utilities/ttl/db_ttl_impl.h"
 | |
| 
 | |
| #include "rocksdb/utilities/db_ttl.h"
 | |
| #include "db/filename.h"
 | |
| #include "db/write_batch_internal.h"
 | |
| #include "util/coding.h"
 | |
| #include "rocksdb/env.h"
 | |
| #include "rocksdb/iterator.h"
 | |
| 
 | |
| namespace rocksdb {
 | |
| 
 | |
| void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
 | |
|                                     Env* env) {
 | |
|   if (options->compaction_filter) {
 | |
|     options->compaction_filter =
 | |
|         new TtlCompactionFilter(ttl, env, options->compaction_filter);
 | |
|   } else {
 | |
|     options->compaction_filter_factory =
 | |
|         std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
 | |
|             ttl, env, options->compaction_filter_factory));
 | |
|   }
 | |
| 
 | |
|   if (options->merge_operator) {
 | |
|     options->merge_operator.reset(
 | |
|         new TtlMergeOperator(options->merge_operator, env));
 | |
|   }
 | |
| }
 | |
| 
 | |
| // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
 | |
| DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {}
 | |
| 
 | |
| DBWithTTLImpl::~DBWithTTLImpl() { delete GetOptions().compaction_filter; }
 | |
| 
 | |
| Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
 | |
|                             StackableDB** dbptr, int32_t ttl, bool read_only) {
 | |
|   DBWithTTL* db;
 | |
|   Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
 | |
|   if (s.ok()) {
 | |
|     *dbptr = db;
 | |
|   } else {
 | |
|     *dbptr = nullptr;
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status DBWithTTL::Open(const Options& options, const std::string& dbname,
 | |
|                        DBWithTTL** dbptr, int32_t ttl, bool read_only) {
 | |
| 
 | |
|   DBOptions db_options(options);
 | |
|   ColumnFamilyOptions cf_options(options);
 | |
|   std::vector<ColumnFamilyDescriptor> column_families;
 | |
|   column_families.push_back(
 | |
|       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
 | |
|   std::vector<ColumnFamilyHandle*> handles;
 | |
|   Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
 | |
|                              dbptr, {ttl}, read_only);
 | |
|   if (s.ok()) {
 | |
|     assert(handles.size() == 1);
 | |
|     // i can delete the handle since DBImpl is always holding a reference to
 | |
|     // default column family
 | |
|     delete handles[0];
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status DBWithTTL::Open(
 | |
|     const DBOptions& db_options, const std::string& dbname,
 | |
|     const std::vector<ColumnFamilyDescriptor>& column_families,
 | |
|     std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
 | |
|     std::vector<int32_t> ttls, bool read_only) {
 | |
| 
 | |
|   if (ttls.size() != column_families.size()) {
 | |
|     return Status::InvalidArgument(
 | |
|         "ttls size has to be the same as number of column families");
 | |
|   }
 | |
| 
 | |
|   std::vector<ColumnFamilyDescriptor> column_families_sanitized =
 | |
|       column_families;
 | |
|   for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
 | |
|     DBWithTTLImpl::SanitizeOptions(
 | |
|         ttls[i], &column_families_sanitized[i].options,
 | |
|         db_options.env == nullptr ? Env::Default() : db_options.env);
 | |
|   }
 | |
|   DB* db;
 | |
| 
 | |
|   Status st;
 | |
|   if (read_only) {
 | |
|     st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
 | |
|                              handles, &db);
 | |
|   } else {
 | |
|     st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
 | |
|   }
 | |
|   if (st.ok()) {
 | |
|     *dbptr = new DBWithTTLImpl(db);
 | |
|   } else {
 | |
|     *dbptr = nullptr;
 | |
|   }
 | |
|   return st;
 | |
| }
 | |
| 
 | |
| Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
 | |
|     const ColumnFamilyOptions& options, const std::string& column_family_name,
 | |
|     ColumnFamilyHandle** handle, int ttl) {
 | |
|   ColumnFamilyOptions sanitized_options = options;
 | |
|   DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv());
 | |
| 
 | |
|   return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
 | |
|                                        handle);
 | |
| }
 | |
| 
 | |
| Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
 | |
|                                          const std::string& column_family_name,
 | |
|                                          ColumnFamilyHandle** handle) {
 | |
|   return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
 | |
| }
 | |
| 
 | |
| // Appends the current timestamp to the string.
 | |
| // Returns false if could not get the current_time, true if append succeeds
 | |
| Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
 | |
|                                Env* env) {
 | |
|   val_with_ts->reserve(kTSLength + val.size());
 | |
|   char ts_string[kTSLength];
 | |
|   int64_t curtime;
 | |
|   Status st = env->GetCurrentTime(&curtime);
 | |
|   if (!st.ok()) {
 | |
|     return st;
 | |
|   }
 | |
|   EncodeFixed32(ts_string, (int32_t)curtime);
 | |
|   val_with_ts->append(val.data(), val.size());
 | |
|   val_with_ts->append(ts_string, kTSLength);
 | |
|   return st;
 | |
| }
 | |
| 
 | |
| // Returns corruption if the length of the string is lesser than timestamp, or
 | |
| // timestamp refers to a time lesser than ttl-feature release time
 | |
| Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
 | |
|   if (str.size() < kTSLength) {
 | |
|     return Status::Corruption("Error: value's length less than timestamp's\n");
 | |
|   }
 | |
|   // Checks that TS is not lesser than kMinTimestamp
 | |
|   // Gaurds against corruption & normal database opened incorrectly in ttl mode
 | |
|   int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
 | |
|   if (timestamp_value < kMinTimestamp) {
 | |
|     return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
 | |
|   }
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| // Checks if the string is stale or not according to TTl provided
 | |
| bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
 | |
|   if (ttl <= 0) {  // Data is fresh if TTL is non-positive
 | |
|     return false;
 | |
|   }
 | |
|   int64_t curtime;
 | |
|   if (!env->GetCurrentTime(&curtime).ok()) {
 | |
|     return false;  // Treat the data as fresh if could not get current time
 | |
|   }
 | |
|   int32_t timestamp_value =
 | |
|       DecodeFixed32(value.data() + value.size() - kTSLength);
 | |
|   return (timestamp_value + ttl) < curtime;
 | |
| }
 | |
| 
 | |
| // Strips the TS from the end of the string
 | |
| Status DBWithTTLImpl::StripTS(std::string* str) {
 | |
|   Status st;
 | |
|   if (str->length() < kTSLength) {
 | |
|     return Status::Corruption("Bad timestamp in key-value");
 | |
|   }
 | |
|   // Erasing characters which hold the TS
 | |
|   str->erase(str->length() - kTSLength, kTSLength);
 | |
|   return st;
 | |
| }
 | |
| 
 | |
| Status DBWithTTLImpl::Put(const WriteOptions& options,
 | |
|                           ColumnFamilyHandle* column_family, const Slice& key,
 | |
|                           const Slice& val) {
 | |
|   WriteBatch batch;
 | |
|   batch.Put(column_family, key, val);
 | |
|   return Write(options, &batch);
 | |
| }
 | |
| 
 | |
| Status DBWithTTLImpl::Get(const ReadOptions& options,
 | |
|                           ColumnFamilyHandle* column_family, const Slice& key,
 | |
|                           std::string* value) {
 | |
|   Status st = db_->Get(options, column_family, key, value);
 | |
|   if (!st.ok()) {
 | |
|     return st;
 | |
|   }
 | |
|   st = SanityCheckTimestamp(*value);
 | |
|   if (!st.ok()) {
 | |
|     return st;
 | |
|   }
 | |
|   return StripTS(value);
 | |
| }
 | |
| 
 | |
| std::vector<Status> DBWithTTLImpl::MultiGet(
 | |
|     const ReadOptions& options,
 | |
|     const std::vector<ColumnFamilyHandle*>& column_family,
 | |
|     const std::vector<Slice>& keys, std::vector<std::string>* values) {
 | |
|   return std::vector<Status>(
 | |
|       keys.size(), Status::NotSupported("MultiGet not supported with TTL"));
 | |
| }
 | |
| 
 | |
| bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
 | |
|                                 ColumnFamilyHandle* column_family,
 | |
|                                 const Slice& key, std::string* value,
 | |
|                                 bool* value_found) {
 | |
|   bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
 | |
|   if (ret && value != nullptr && value_found != nullptr && *value_found) {
 | |
|     if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
 | |
|       return false;
 | |
|     }
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| Status DBWithTTLImpl::Merge(const WriteOptions& options,
 | |
|                             ColumnFamilyHandle* column_family, const Slice& key,
 | |
|                             const Slice& value) {
 | |
|   WriteBatch batch;
 | |
|   batch.Merge(column_family, key, value);
 | |
|   return Write(options, &batch);
 | |
| }
 | |
| 
 | |
| Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
 | |
|   class Handler : public WriteBatch::Handler {
 | |
|    public:
 | |
|     explicit Handler(Env* env) : env_(env) {}
 | |
|     WriteBatch updates_ttl;
 | |
|     Status batch_rewrite_status;
 | |
|     virtual Status PutCF(uint32_t column_family_id, const Slice& key,
 | |
|                          const Slice& value) {
 | |
|       std::string value_with_ts;
 | |
|       Status st = AppendTS(value, &value_with_ts, env_);
 | |
|       if (!st.ok()) {
 | |
|         batch_rewrite_status = st;
 | |
|       } else {
 | |
|         WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
 | |
|                                 value_with_ts);
 | |
|       }
 | |
|       return Status::OK();
 | |
|     }
 | |
|     virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
 | |
|                            const Slice& value) {
 | |
|       std::string value_with_ts;
 | |
|       Status st = AppendTS(value, &value_with_ts, env_);
 | |
|       if (!st.ok()) {
 | |
|         batch_rewrite_status = st;
 | |
|       } else {
 | |
|         WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
 | |
|                                   value_with_ts);
 | |
|       }
 | |
|       return Status::OK();
 | |
|     }
 | |
|     virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
 | |
|       WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
 | |
|       return Status::OK();
 | |
|     }
 | |
|     virtual void LogData(const Slice& blob) { updates_ttl.PutLogData(blob); }
 | |
| 
 | |
|    private:
 | |
|     Env* env_;
 | |
|   };
 | |
|   Handler handler(GetEnv());
 | |
|   updates->Iterate(&handler);
 | |
|   if (!handler.batch_rewrite_status.ok()) {
 | |
|     return handler.batch_rewrite_status;
 | |
|   } else {
 | |
|     return db_->Write(opts, &(handler.updates_ttl));
 | |
|   }
 | |
| }
 | |
| 
 | |
| Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
 | |
|                                      ColumnFamilyHandle* column_family) {
 | |
|   return new TtlIterator(db_->NewIterator(opts, column_family));
 | |
| }
 | |
| 
 | |
| }  // namespace rocksdb
 | |
| #endif  // ROCKSDB_LITE
 | |
| 
 |