From 6c1b040cc999a87781cda042ca4e3b644261ee5d Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Fri, 14 Nov 2014 11:38:26 -0800 Subject: [PATCH] Provide openable snapshots Summary: Store links to live files in directory on same disk Test Plan: Take snapshot and open it. Added a test GetSnapshotLink in db_test. Reviewers: sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D28713 --- db/db_filesnapshot.cc | 109 +++++++++++++++++++++++ db/db_impl.h | 5 ++ db/db_test.cc | 58 ++++++++++++ hdfs/env_hdfs.h | 6 ++ include/rocksdb/db.h | 6 ++ include/rocksdb/env.h | 9 ++ include/rocksdb/utilities/stackable_db.h | 4 + util/env_posix.cc | 11 +++ util/file_util.cc | 59 ++++++++++++ util/file_util.h | 18 ++++ util/mock_env.cc | 13 +++ util/mock_env.h | 2 + 12 files changed, 300 insertions(+) create mode 100644 util/file_util.cc create mode 100644 util/file_util.h diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index dcf54c8c6..64e5e437c 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -25,6 +25,7 @@ #include "port/port.h" #include "util/mutexlock.h" #include "util/sync_point.h" +#include "util/file_util.h" namespace rocksdb { @@ -134,6 +135,114 @@ Status DBImpl::GetLiveFiles(std::vector& ret, Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { return wal_manager_.GetSortedWalFiles(files); } + +// Builds an openable snapshot of RocksDB +Status DBImpl::CreateCheckpoint(const std::string& snapshot_dir) { + Status s; + std::vector live_files; + uint64_t manifest_file_size = 0; + uint64_t sequence_number = GetLatestSequenceNumber(); + bool same_fs = true; + + if (env_->FileExists(snapshot_dir)) { + return Status::InvalidArgument("Directory exists"); + } + + s = DisableFileDeletions(); + if (s.ok()) { + // this will return live_files prefixed with "/" + s = GetLiveFiles(live_files, &manifest_file_size, true); + } + if (!s.ok()) { + EnableFileDeletions(false); + return s; + } + + Log(db_options_.info_log, + "Started the snapshot process -- creating snapshot in directory %s", + snapshot_dir.c_str()); + + std::string full_private_path = snapshot_dir + ".tmp"; + + // create snapshot directory + s = env_->CreateDir(full_private_path); + + // copy/hard link live_files + for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { + uint64_t number; + FileType type; + bool ok = ParseFileName(live_files[i], &number, &type); + if (!ok) { + s = Status::Corruption("Can't parse file name. This is very bad"); + break; + } + // we should only get sst, manifest and current files here + assert(type == kTableFile || type == kDescriptorFile || + type == kCurrentFile); + assert(live_files[i].size() > 0 && live_files[i][0] == '/'); + std::string src_fname = live_files[i]; + + // rules: + // * if it's kTableFile, then it's shared + // * if it's kDescriptorFile, limit the size to manifest_file_size + // * always copy if cross-device link + if ((type == kTableFile) && same_fs) { + Log(db_options_.info_log, "Hard Linking %s", src_fname.c_str()); + s = env_->LinkFile(GetName() + src_fname, full_private_path + src_fname); + if (s.IsNotSupported()) { + same_fs = false; + s = Status::OK(); + } + } + if ((type != kTableFile) || (!same_fs)) { + Log(db_options_.info_log, "Copying %s", src_fname.c_str()); + s = CopyFile(env_, GetName() + src_fname, full_private_path + src_fname, + (type == kDescriptorFile) ? manifest_file_size : 0); + } + } + + // we copied all the files, enable file deletions + EnableFileDeletions(false); + + if (s.ok()) { + // move tmp private backup to real snapshot directory + s = env_->RenameFile(full_private_path, snapshot_dir); + } + if (s.ok()) { + unique_ptr snapshot_directory; + env_->NewDirectory(snapshot_dir, &snapshot_directory); + if (snapshot_directory != nullptr) { + s = snapshot_directory->Fsync(); + } + } + + if (!s.ok()) { + // clean all the files we might have created + Log(db_options_.info_log, "Snapshot failed -- %s", s.ToString().c_str()); + // we have to delete the dir and all its children + std::vector subchildren; + env_->GetChildren(full_private_path, &subchildren); + for (auto& subchild : subchildren) { + Status s1 = env_->DeleteFile(full_private_path + subchild); + if (s1.ok()) { + Log(db_options_.info_log, "Deleted %s", + (full_private_path + subchild).c_str()); + } + } + // finally delete the private dir + Status s1 = env_->DeleteDir(full_private_path); + Log(db_options_.info_log, "Deleted dir %s -- %s", full_private_path.c_str(), + s1.ToString().c_str()); + return s; + } + + // here we know that we succeeded and installed the new snapshot + Log(db_options_.info_log, "Snapshot DONE. All is good"); + Log(db_options_.info_log, "Snapshot sequence number: %" PRIu64, + sequence_number); + + return s; +} } #endif // ROCKSDB_LITE diff --git a/db/db_impl.h b/db/db_impl.h index 400f207b8..1106a281d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -169,6 +169,11 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, ColumnFamilyMetaData* metadata) override; + // Builds an openable snapshot of RocksDB on the same disk, which + // accepts an output directory on the same disk, and under the directory + // (1) hard-linked SST files pointing to existing live SST files + // (2) a copied manifest files and other files + virtual Status CreateCheckpoint(const std::string& snapshot_dir); #endif // ROCKSDB_LITE // checks if all live files exist on file system and that their file sizes diff --git a/db/db_test.cc b/db/db_test.cc index a42a15a13..a3ad82c51 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1606,6 +1606,60 @@ TEST(DBTest, GetSnapshot) { } while (ChangeOptions(kSkipHashCuckoo)); } +TEST(DBTest, GetSnapshotLink) { + do { + Options options; + const std::string snapshot_name = test::TmpDir(env_) + "/snapshot"; + DB* snapshotDB; + ReadOptions roptions; + std::string result; + + options = CurrentOptions(options); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + ASSERT_OK(DestroyDB(snapshot_name, options)); + env_->DeleteDir(snapshot_name); + + // Create a database + Status s; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + std::string key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + // Take a snapshot + ASSERT_OK(db_->CreateCheckpoint(snapshot_name)); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_OK(Flush()); + ASSERT_EQ("v2", Get(key)); + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + ASSERT_OK(DB::Open(options, snapshot_name, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, key, &result)); + ASSERT_EQ("v1", result); + delete snapshotDB; + snapshotDB = nullptr; + delete db_; + db_ = nullptr; + + // Destroy original DB + ASSERT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents + options.create_if_missing = false; + dbname_ = snapshot_name; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + ASSERT_EQ("v1", Get(key)); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + + // Restore DB name + dbname_ = test::TmpDir(env_) + "/db_test"; + } while (ChangeOptions()); +} + TEST(DBTest, GetLevel0Ordering) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -7468,6 +7522,10 @@ class ModelDB: public DB { ColumnFamilyHandle* column_family, ColumnFamilyMetaData* metadata) {} + virtual Status CreateCheckpoint(const std::string& snapshot_dir) { + return Status::NotSupported("Not supported in Model DB"); + } + private: class ModelIter: public Iterator { public: diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 82f317f73..475ea7cab 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -93,6 +93,8 @@ class HdfsEnv : public Env { virtual Status RenameFile(const std::string& src, const std::string& target); + virtual Status LinkFile(const std::string& src, const std::string& target); + virtual Status LockFile(const std::string& fname, FileLock** lock); virtual Status UnlockFile(FileLock* lock); @@ -291,6 +293,10 @@ class HdfsEnv : public Env { virtual Status RenameFile(const std::string& src, const std::string& target){return notsup;} + virtual Status LinkFile(const std::string& src, const std::string& target) { + return notsup; + } + virtual Status LockFile(const std::string& fname, FileLock** lock){return notsup;} virtual Status UnlockFile(FileLock* lock){return notsup;} diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 04460ad9e..52f157d82 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -521,6 +521,12 @@ class DB { virtual Status GetPropertiesOfAllTables(TablePropertiesCollection* props) { return GetPropertiesOfAllTables(DefaultColumnFamily(), props); } + + // Builds an openable snapshot of RocksDB on the same disk, which + // accepts an output directory on the same disk, and under the directory + // (1) hard-linked SST files pointing to existing live SST files + // (2) a copied manifest files and other files + virtual Status CreateCheckpoint(const std::string& snapshot_dir) = 0; #endif // ROCKSDB_LITE private: diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 36aa5a604..291676002 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -178,6 +178,10 @@ class Env { virtual Status RenameFile(const std::string& src, const std::string& target) = 0; + // Hard Link file src to target. + virtual Status LinkFile(const std::string& src, + const std::string& target) = 0; + // Lock the specified file. Used to prevent concurrent access to // the same db by multiple processes. On failure, stores nullptr in // *lock and returns non-OK. @@ -747,6 +751,11 @@ class EnvWrapper : public Env { Status RenameFile(const std::string& s, const std::string& t) { return target_->RenameFile(s, t); } + + Status LinkFile(const std::string& s, const std::string& t) { + return target_->LinkFile(s, t); + } + Status LockFile(const std::string& f, FileLock** l) { return target_->LockFile(f, l); } diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 7bdf9928e..9366bd84f 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -247,6 +247,10 @@ class StackableDB : public DB { return db_->DefaultColumnFamily(); } + virtual Status CreateCheckpoint(const std::string& snapshot_dir) override { + return db_->CreateCheckpoint(snapshot_dir); + } + protected: DB* db_; }; diff --git a/util/env_posix.cc b/util/env_posix.cc index 86343be30..af1801607 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1288,6 +1288,17 @@ class PosixEnv : public Env { return result; } + virtual Status LinkFile(const std::string& src, const std::string& target) { + Status result; + if (link(src.c_str(), target.c_str()) != 0) { + if (errno == EXDEV) { + return Status::NotSupported("No cross FS links allowed"); + } + result = IOError(src, errno); + } + return result; + } + virtual Status LockFile(const std::string& fname, FileLock** lock) { *lock = nullptr; Status result; diff --git a/util/file_util.cc b/util/file_util.cc new file mode 100644 index 000000000..c75d59c5f --- /dev/null +++ b/util/file_util.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#include +#include +#include "util/file_util.h" +#include "rocksdb/env.h" +#include "db/filename.h" + +namespace rocksdb { + +// Utility function to copy a file up to a specified length +Status CopyFile(Env* env, const std::string& source, + const std::string& destination, uint64_t size) { + const EnvOptions soptions; + unique_ptr srcfile; + Status s; + s = env->NewSequentialFile(source, &srcfile, soptions); + unique_ptr destfile; + if (s.ok()) { + s = env->NewWritableFile(destination, &destfile, soptions); + } else { + return s; + } + + if (size == 0) { + // default argument means copy everything + if (s.ok()) { + s = env->GetFileSize(source, &size); + } else { + return s; + } + } + + char buffer[4096]; + Slice slice; + while (size > 0) { + uint64_t bytes_to_read = + std::min(static_cast(sizeof(buffer)), size); + if (s.ok()) { + s = srcfile->Read(bytes_to_read, &slice, buffer); + } + if (s.ok()) { + if (slice.size() == 0) { + return Status::Corruption("file too small"); + } + s = destfile->Append(slice); + } + if (!s.ok()) { + return s; + } + size -= slice.size(); + } + return Status::OK(); +} + +} // namespace rocksdb diff --git a/util/file_util.h b/util/file_util.h new file mode 100644 index 000000000..84b37345b --- /dev/null +++ b/util/file_util.h @@ -0,0 +1,18 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#include + +#pragma once +#include "rocksdb/status.h" +#include "rocksdb/types.h" +#include "rocksdb/env.h" + +namespace rocksdb { + +extern Status CopyFile(Env* env, const std::string& source, + const std::string& destination, uint64_t size = 0); + +} // namespace rocksdb diff --git a/util/mock_env.cc b/util/mock_env.cc index 5a4c2c325..a88db18d5 100644 --- a/util/mock_env.cc +++ b/util/mock_env.cc @@ -559,6 +559,19 @@ Status MockEnv::RenameFile(const std::string& src, const std::string& dest) { return Status::OK(); } +Status MockEnv::LinkFile(const std::string& src, const std::string& dest) { + auto s = NormalizePath(src); + auto t = NormalizePath(dest); + MutexLock lock(&mutex_); + if (file_map_.find(s) == file_map_.end()) { + return Status::IOError(s, "File not found"); + } + + DeleteFileInternal(t); + file_map_[t] = file_map_[s]; + return Status::OK(); +} + Status MockEnv::NewLogger(const std::string& fname, shared_ptr* result) { auto fn = NormalizePath(fname); diff --git a/util/mock_env.h b/util/mock_env.h index b92caa5cf..bbd191d78 100644 --- a/util/mock_env.h +++ b/util/mock_env.h @@ -69,6 +69,8 @@ class MockEnv : public EnvWrapper { virtual Status RenameFile(const std::string& src, const std::string& target); + virtual Status LinkFile(const std::string& src, const std::string& target); + virtual Status NewLogger(const std::string& fname, shared_ptr* result);