diff --git a/db/c.cc b/db/c.cc index 4676526f3..eb28c4bdc 100644 --- a/db/c.cc +++ b/db/c.cc @@ -529,6 +529,10 @@ void leveldb_options_set_db_log_dir( opt->rep.db_log_dir = db_log_dir; } +void leveldb_options_set_WAL_ttl_seconds(leveldb_options_t* opt, uint64_t ttl) { + opt->rep.WAL_ttl_seconds = ttl; +} + leveldb_comparator_t* leveldb_comparator_create( void* state, void (*destructor)(void*), diff --git a/db/db_bench.cc b/db/db_bench.cc index f83a99761..c4adaf9b7 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -225,6 +225,9 @@ static bool FLAGS_disable_auto_compactions = false; // that compacts Levelk with LevelK+1 static int FLAGS_source_compaction_factor = 1; +// Set the TTL for the WAL Files. +static uint64_t FLAGS_WAL_ttl_seconds = 0; + extern bool useOsBuffer; extern bool useFsReadAhead; extern bool useMmapRead; @@ -963,6 +966,7 @@ class Benchmark { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type; + options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds; if (FLAGS_min_level_to_compress >= 0) { assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); options.compression_per_level = new CompressionType[FLAGS_num_levels]; @@ -1439,6 +1443,8 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--source_compaction_factor=%d%c", &n, &junk) == 1 && n > 0) { FLAGS_source_compaction_factor = n; + } else if (sscanf(argv[i], "--wal_ttl=%d%c", &n, &junk) == 1) { + FLAGS_WAL_ttl_seconds = static_cast(n); } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 9384fce76..5bffb8cda 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -5,12 +5,13 @@ #include "db/db_impl.h" #include +#include +#include #include #include #include -#include #include -#include + #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -42,6 +43,8 @@ namespace leveldb { void dumpLeveldbBuildVersion(Logger * log); +const std::string DBImpl::ARCHIVAL_DIR = "archive"; + static Status NewLogger(const std::string& dbname, const std::string& db_log_dir, Env* env, @@ -225,6 +228,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) host_name_ = "localhost"; } last_log_ts = 0; + } DBImpl::~DBImpl() { @@ -329,6 +333,18 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } } +std::string DBImpl::GetArchivalDirectoryName() { + return dbname_ + "/" + ARCHIVAL_DIR; +} + +const Status DBImpl::CreateArchivalDirectory() { + if (options_.WAL_ttl_seconds > 0) { + std::string archivalPath = GetArchivalDirectoryName(); + return env_->CreateDirIfMissing(archivalPath); + } + return Status::OK(); +} + // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'allfiles'. void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { @@ -372,6 +388,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { uint64_t number; FileType type; std::vector old_log_files; + for (size_t i = 0; i < state.allfiles.size(); i++) { if (ParseFileName(state.allfiles[i], &number, &type)) { bool keep = true; @@ -406,6 +423,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { } if (!keep) { + const std::string currentFile = state.allfiles[i]; if (type == kTableFile) { // record the files to be evicted from the cache state.files_to_evict.push_back(number); @@ -413,11 +431,22 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { Log(options_.info_log, "Delete type=%d #%lld\n", int(type), static_cast(number)); - Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]); - if(!st.ok()) { - Log(options_.info_log, "Delete type=%d #%lld FAILED\n", + if (type == kLogFile && options_.WAL_ttl_seconds > 0) { + Status st = env_->RenameFile(dbname_ + "/" + currentFile, + dbname_ + "/" + ARCHIVAL_DIR + "/" + currentFile); + + if (!st.ok()) { + Log(options_.info_log, "RenameFile type=%d #%lld FAILED\n", int(type), static_cast(number)); + } + } else { + Status st = env_->DeleteFile(dbname_ + "/" + currentFile); + if(!st.ok()) { + Log(options_.info_log, "Delete type=%d #%lld FAILED\n", + int(type), + static_cast(number)); + } } } } @@ -446,12 +475,40 @@ void DBImpl::EvictObsoleteFiles(DeletionState& state) { void DBImpl::DeleteObsoleteFiles() { mutex_.AssertHeld(); DeletionState deletion_state; - std::set live; - std::vector allfiles; - std::vector files_to_evict; FindObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state); EvictObsoleteFiles(deletion_state); + PurgeObsoleteWALFiles(); +} + +void DBImpl::PurgeObsoleteWALFiles() { + if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) { + std::vector WALFiles; + std::string archivalDir = dbname_ + "/" + ARCHIVAL_DIR; + env_->GetChildren(archivalDir, &WALFiles); + int64_t currentTime; + const Status status = env_->GetCurrentTime(¤tTime); + assert(status.ok()); + for (std::vector::iterator it = WALFiles.begin(); + it != WALFiles.end(); + ++it) { + + uint64_t fileMTime; + const std::string filePath = archivalDir + "/" + *it; + const Status s = env_->GetFileModificationTime(filePath, &fileMTime); + if (s.ok()) { + if (status.ok() && + (currentTime - fileMTime > options_.WAL_ttl_seconds)) { + Status delStatus = env_->DeleteFile(filePath); + if (!delStatus.ok()) { + Log(options_.info_log, + "Failed Deleting a WAL file Error : i%s", + delStatus.ToString().c_str()); + } + } + } // Ignore errors. + } + } } Status DBImpl::Recover(VersionEdit* edit, bool no_log_recory, @@ -2056,9 +2113,14 @@ Status DB::Open(const Options& options, const std::string& dbname, "no_block_cache is true while block_cache is not NULL"); } DBImpl* impl = new DBImpl(options, dbname); + Status s = impl->CreateArchivalDirectory(); + if (!s.ok()) { + delete impl; + return s; + } impl->mutex_.Lock(); VersionEdit edit(impl->NumberLevels()); - Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists + s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); WritableFile* lfile; diff --git a/db/db_impl.h b/db/db_impl.h index 0fa2f8f24..833932def 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -55,6 +55,9 @@ class DBImpl : public DB { virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size); + // Return's the path of the archival directory. + std::string GetArchivalDirectoryName(); + // Extra methods (for testing) that are not in the public DB interface // Compact any files in the named level that overlap [*begin,*end] @@ -113,6 +116,8 @@ protected: void MaybeIgnoreError(Status* s) const; + const Status CreateArchivalDirectory(); + // Delete any unneeded files and stale in-memory entries. void DeleteObsoleteFiles(); @@ -172,6 +177,7 @@ protected: // Removes the file listed in files_to_evict from the table_cache void EvictObsoleteFiles(DeletionState& deletion_state); + void PurgeObsoleteWALFiles(); // Constant after construction const InternalFilterPolicy internal_filter_policy_; bool owns_info_log_; @@ -292,6 +298,7 @@ protected: CompactionStats* stats_; static const int KEEP_LOG_FILE_NUM = 1000; + static const std::string ARCHIVAL_DIR; std::string db_absolute_path_; // count of the number of contiguous delaying writes diff --git a/db/db_test.cc b/db/db_test.cc index 12f022b62..5cf46ed69 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3,6 +3,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include +#include + #include "leveldb/db.h" #include "leveldb/filter_policy.h" #include "db/db_impl.h" @@ -1188,7 +1190,7 @@ TEST(DBTest, RepeatedWritesToSameKey) { // We must have at most one file per level except for level-0, // which may have up to kL0_StopWritesTrigger files. const int kMaxFiles = dbfull()->NumberLevels() + - dbfull()->Level0StopWriteTrigger(); + dbfull()->Level0StopWriteTrigger(); Random rnd(301); std::string value = RandomString(&rnd, 2 * options.write_buffer_size); @@ -1703,7 +1705,7 @@ TEST(DBTest, DeletionMarkers2) { TEST(DBTest, OverlapInLevel0) { do { - int tmp = dbfull()->MaxMemCompactionLevel(); + int tmp = dbfull()->MaxMemCompactionLevel(); ASSERT_EQ(tmp, 2) << "Fix test to match config"; // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0. @@ -2194,6 +2196,73 @@ TEST(DBTest, SnapshotFiles) { dbfull()->DisableFileDeletions(); } +void ListLogFiles(Env* env, + const std::string& path, + std::vector* logFiles) { + std::vector files; + env->GetChildren(path, &files); + uint64_t number; + FileType type; + for (size_t i = 0; i < files.size(); ++i) { + if (ParseFileName(files[i], &number, &type)) { + if (type == kLogFile) { + logFiles->push_back(number); + } + } + } +} + +TEST(DBTest, WALArchival) { + std::string value(1024, '1'); + Options options = CurrentOptions(); + options.create_if_missing = true; + options.WAL_ttl_seconds = 1000; + DestroyAndReopen(&options); + + + // TEST : Create DB with a ttl. + // Put some keys. Count the log files present in the DB just after insert. + // Re-open db. Causes deletion/archival to take place. + // Assert that the files moved under "/archive". + + std::string archiveDir = dbfull()->GetArchivalDirectoryName(); + + for (int i = 0; i < 10; ++i) { + + for (int j = 0; j < 10; ++j) { + ASSERT_OK(Put(Key(10*i+j), value)); + } + + std::vector logFiles; + ListLogFiles(env_, dbname_, &logFiles); + + options.create_if_missing = false; + Reopen(&options); + + std::vector logs; + ListLogFiles(env_, archiveDir, &logs); + std::set archivedFiles(logs.begin(), logs.end()); + + for (std::vector::iterator it = logFiles.begin(); + it != logFiles.end(); + ++it) { + ASSERT_TRUE(archivedFiles.find(*it) != archivedFiles.end()); + } + } + + // REOPEN database with 0 TTL. all files should have been deleted. + std::vector logFiles; + ListLogFiles(env_, archiveDir, &logFiles); + ASSERT_TRUE(logFiles.size() > 0); + options.WAL_ttl_seconds = 1; + env_->SleepForMicroseconds(2*1000*1000); + Reopen(&options); + + logFiles.clear(); + ListLogFiles(env_, archiveDir, &logFiles); + ASSERT_TRUE(logFiles.size() == 0); + +} TEST(DBTest, ReadCompaction) { std::string value(4096, '4'); // a string of size 4K diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 4c07cc63c..29855a351 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -90,10 +90,15 @@ class HdfsEnv : public Env { virtual Status CreateDir(const std::string& name); + virtual Status CreateDirIfMissing(const std::string& name); + virtual Status DeleteDir(const std::string& name); virtual Status GetFileSize(const std::string& fname, uint64_t* size); + virtual Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime); + virtual Status RenameFile(const std::string& src, const std::string& target); virtual Status LockFile(const std::string& fname, FileLock** lock); @@ -247,10 +252,17 @@ class HdfsEnv : public Env { virtual Status CreateDir(const std::string& name){return notsup;} + virtual Status CreateDirIfMissing(const std::string& name){return notsup;} + virtual Status DeleteDir(const std::string& name){return notsup;} virtual Status GetFileSize(const std::string& fname, uint64_t* size){return notsup;} + virtual Status GetFileModificationTime(const std::string& fname, + uint64_t* time) { + return notsup; + } + virtual Status RenameFile(const std::string& src, const std::string& target){return notsup;} virtual Status LockFile(const std::string& fname, FileLock** lock){return notsup;} diff --git a/helpers/memenv/memenv.cc b/helpers/memenv/memenv.cc index 2082083b3..954702420 100644 --- a/helpers/memenv/memenv.cc +++ b/helpers/memenv/memenv.cc @@ -316,6 +316,10 @@ class InMemoryEnv : public EnvWrapper { return Status::OK(); } + virtual Status CreateDirIfMissing(const std::string& dirname) { + return Status::OK(); + } + virtual Status DeleteDir(const std::string& dirname) { return Status::OK(); } @@ -330,6 +334,11 @@ class InMemoryEnv : public EnvWrapper { return Status::OK(); } + virtual Status GetFileModificationTime(const std::string& fname, + uint64_t* time) { + return Status::NotSupported("getFileMTime", "Not supported in MemEnv"); + } + virtual Status RenameFile(const std::string& src, const std::string& target) { MutexLock lock(&mutex_); diff --git a/include/leveldb/env.h b/include/leveldb/env.h index 7d4291656..ade56f72c 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -81,15 +81,22 @@ class Env { // Delete the named file. virtual Status DeleteFile(const std::string& fname) = 0; - // Create the specified directory. + // Create the specified directory. Returns error if directory exists. virtual Status CreateDir(const std::string& dirname) = 0; + // Creates directory if missing. Return Ok if it exists, or successful in + // Creating. + virtual Status CreateDirIfMissing(const std::string& dirname) = 0; + // Delete the specified directory. virtual Status DeleteDir(const std::string& dirname) = 0; // Store the size of fname in *file_size. virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0; + // Store the last modification time of fname in *file_mtime. + virtual Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) = 0; // Rename file src to target. virtual Status RenameFile(const std::string& src, const std::string& target) = 0; @@ -323,10 +330,19 @@ class EnvWrapper : public Env { } Status DeleteFile(const std::string& f) { return target_->DeleteFile(f); } Status CreateDir(const std::string& d) { return target_->CreateDir(d); } + Status CreateDirIfMissing(const std::string& d) { + return target_->CreateDirIfMissing(d); + } Status DeleteDir(const std::string& d) { return target_->DeleteDir(d); } Status GetFileSize(const std::string& f, uint64_t* s) { return target_->GetFileSize(f, s); } + + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) { + return target_->GetFileModificationTime(fname, file_mtime); + } + Status RenameFile(const std::string& s, const std::string& t) { return target_->RenameFile(s, t); } diff --git a/include/leveldb/options.h b/include/leveldb/options.h index c51c29521..27d8f6635 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -343,6 +343,16 @@ struct Options { // Disable automatic compactions. Manual compactions can still // be issued on this database. bool disable_auto_compactions; + + // The number of seconds a WAL(write ahead log) should be kept after it has + // been marked as Not Live. If the value is set. The WAL files are moved to + // the archive direcotory and deleted after the given TTL. + // If set to 0, WAL files are deleted as soon as they are not required by + // the database. + // If set to std::numeric_limits::max() the WAL files will never be + // deleted. + // Default : 0 + uint64_t WAL_ttl_seconds; }; // Options that control read operations diff --git a/util/env_hdfs.cc b/util/env_hdfs.cc index 90e511fa2..095737ca3 100644 --- a/util/env_hdfs.cc +++ b/util/env_hdfs.cc @@ -431,6 +431,16 @@ Status HdfsEnv::CreateDir(const std::string& name) { return IOError(name, errno); }; +Status HdfsEnv::CreateDirIfMissing(const std::string& name) { + const int value = hdfsExists(fileSys_, name.c_str()); + // Not atomic. state might change b/w hdfsExists and CreateDir. + if (value == 0) { + return Status::OK(); + } else { + return CreateDir(name); + } +}; + Status HdfsEnv::DeleteDir(const std::string& name) { return DeleteFile(name); }; @@ -446,6 +456,18 @@ Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) { return IOError(fname, errno); } +Status HdfsEnv::GetFileModificationTime(const std::string& fname, + uint64_t* time) { + hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); + if (pFileInfo != NULL) { + *time = static_cast(pFileInfo->mLastMod); + hdfsFreeFileInfo(pFileInfo, 1); + return Status::OK(); + } + return IOError(fname, errno); + +} + // The rename is not atomic. HDFS does not allow a renaming if the // target already exists. So, we delete the target before attemting the // rename. diff --git a/util/env_posix.cc b/util/env_posix.cc index cb10af8d6..fae7ac6e0 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -601,6 +601,16 @@ class PosixEnv : public Env { return result; }; + virtual Status CreateDirIfMissing(const std::string& name) { + Status result; + if (mkdir(name.c_str(), 0755) != 0) { + if (errno != EEXIST) { + result = IOError(name, errno); + } + } + return result; + }; + virtual Status DeleteDir(const std::string& name) { Status result; if (rmdir(name.c_str()) != 0) { @@ -621,6 +631,15 @@ class PosixEnv : public Env { return s; } + virtual Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) { + struct stat s; + if (stat(fname.c_str(), &s) !=0) { + return IOError(fname, errno); + } + *file_mtime = static_cast(s.st_mtime); + return Status::OK(); + } virtual Status RenameFile(const std::string& src, const std::string& target) { Status result; if (rename(src.c_str(), target.c_str()) != 0) { diff --git a/util/options.cc b/util/options.cc index e34a7e0f7..7805cee98 100644 --- a/util/options.cc +++ b/util/options.cc @@ -53,7 +53,8 @@ Options::Options() table_cache_numshardbits(4), compaction_filter_args(NULL), CompactionFilter(NULL), - disable_auto_compactions(false) { + disable_auto_compactions(false), + WAL_ttl_seconds(0){ } void @@ -140,6 +141,8 @@ Options::Dump( CompactionFilter); Log(log," Options.disable_auto_compactions: %d", disable_auto_compactions); + Log(log," Options.WAL_ttl_seconds: %ld", + WAL_ttl_seconds); } // Options::Dump } // namespace leveldb