// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. #include "rocksdb/utilities/env_librados.h" #include "util/random.h" #include #include namespace rocksdb { /* GLOBAL DIFINE */ // #define DEBUG #ifdef DEBUG #include #include #include #define LOG_DEBUG(...) do{\ printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\ printf(__VA_ARGS__);\ }while(0) #else #define LOG_DEBUG(...) #endif /* GLOBAL CONSTANT */ const char *default_db_name = "default_envlibrados_db"; const char *default_pool_name = "default_envlibrados_pool"; const char *default_config_path = "CEPH_CONFIG_PATH"; // the env variable name of ceph configure file // maximum dir/file that can store in the fs const int MAX_ITEMS_IN_FS = 1 << 30; // root dir tag const std::string ROOT_DIR_KEY = "/"; const std::string DIR_ID_VALUE = ""; /** * @brief convert error code to status * @details Convert internal linux error code to Status * * @param r [description] * @return [description] */ Status err_to_status(int r) { switch (r) { case 0: return Status::OK(); case -ENOENT: return Status::IOError(); case -ENODATA: case -ENOTDIR: return Status::NotFound(Status::kNone); case -EINVAL: return Status::InvalidArgument(Status::kNone); case -EIO: return Status::IOError(Status::kNone); default: // FIXME :( assert(0 == "unrecognized error code"); return Status::NotSupported(Status::kNone); } } /** * @brief split file path into dir path and file name * @details * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format * For example: * b/c => dir '/b', file 'c' * /a/b/c => dir '/b', file 'c' * * @param fn [description] * @param dir [description] * @param file [description] */ void split(const std::string &fn, std::string *dir, std::string *file) { LOG_DEBUG("[IN]%s\n", fn.c_str()); int pos = fn.size() - 1; while ('/' == fn[pos]) --pos; size_t fstart = fn.rfind('/', pos); *file = fn.substr(fstart + 1, pos - fstart); pos = fstart; while (pos >= 0 && '/' == fn[pos]) --pos; if (pos < 0) { *dir = "/"; } else { size_t dstart = fn.rfind('/', pos); *dir = fn.substr(dstart + 1, pos - dstart); *dir = std::string("/") + *dir; } LOG_DEBUG("[OUT]%s | %s\n", dir->c_str(), file->c_str()); } // A file abstraction for reading sequentially through a file class LibradosSequentialFile : public SequentialFile { librados::IoCtx * _io_ctx; std::string _fid; std::string _hint; int _offset; public: LibradosSequentialFile(librados::IoCtx * io_ctx, std::string fid, std::string hint): _io_ctx(io_ctx), _fid(fid), _hint(hint), _offset(0) {} ~LibradosSequentialFile() {} /** * @brief read file * @details * Read up to "n" bytes from the file. "scratch[0..n-1]" may be * written by this routine. Sets "*result" to the data that was * read (including if fewer than "n" bytes were successfully read). * May set "*result" to point at data in "scratch[0..n-1]", so * "scratch[0..n-1]" must be live when "*result" is used. * If an error was encountered, returns a non-OK status. * * REQUIRES: External synchronization * * @param n [description] * @param result [description] * @param scratch [description] * @return [description] */ Status Read(size_t n, Slice* result, char* scratch) { LOG_DEBUG("[IN]%i\n", (int)n); librados::bufferlist buffer; Status s; int r = _io_ctx->read(_fid, buffer, n, _offset); if (r >= 0) { buffer.copy(0, r, scratch); *result = Slice(scratch, r); _offset += r; s = Status::OK(); } else { s = err_to_status(r); if (s == Status::IOError()) { *result = Slice(); s = Status::OK(); } } LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str()); return s; } /** * @brief skip "n" bytes from the file * @details * Skip "n" bytes from the file. This is guaranteed to be no * slower that reading the same data, but may be faster. * * If end of file is reached, skipping will stop at the end of the * file, and Skip will return OK. * * REQUIRES: External synchronization * * @param n [description] * @return [description] */ Status Skip(uint64_t n) { _offset += n; return Status::OK(); } /** * @brief noop * @details * rocksdb has it's own caching capabilities that we should be able to use, * without relying on a cache here. This can safely be a no-op. * * @param offset [description] * @param length [description] * * @return [description] */ Status InvalidateCache(size_t offset, size_t length) { return Status::OK(); } }; // A file abstraction for randomly reading the contents of a file. class LibradosRandomAccessFile : public RandomAccessFile { librados::IoCtx * _io_ctx; std::string _fid; std::string _hint; public: LibradosRandomAccessFile(librados::IoCtx * io_ctx, std::string fid, std::string hint): _io_ctx(io_ctx), _fid(fid), _hint(hint) {} ~LibradosRandomAccessFile() {} /** * @brief read file * @details similar to LibradosSequentialFile::Read * * @param offset [description] * @param n [description] * @param result [description] * @param scratch [description] * @return [description] */ Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { LOG_DEBUG("[IN]%i\n", (int)n); librados::bufferlist buffer; Status s; int r = _io_ctx->read(_fid, buffer, n, offset); if (r >= 0) { buffer.copy(0, r, scratch); *result = Slice(scratch, r); s = Status::OK(); } else { s = err_to_status(r); if (s == Status::IOError()) { *result = Slice(); s = Status::OK(); } } LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str()); return s; } /** * @brief [brief description] * @details Get unique id for each file and guarantee this id is different for each file * * @param id [description] * @param max_size max size of id, it shoud be larger than 16 * * @return [description] */ size_t GetUniqueId(char* id, size_t max_size) const { // All fid has the same db_id prefix, so we need to ignore db_id prefix size_t s = std::min(max_size, _fid.size()); strncpy(id, _fid.c_str() + (_fid.size() - s), s); id[s - 1] = '\0'; return s; }; //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED }; void Hint(AccessPattern pattern) { /* Do nothing */ } /** * @brief noop * @details [long description] * * @param offset [description] * @param length [description] * * @return [description] */ Status InvalidateCache(size_t offset, size_t length) { return Status::OK(); } }; // A file abstraction for sequential writing. The implementation // must provide buffering since callers may append small fragments // at a time to the file. class LibradosWritableFile : public WritableFile { librados::IoCtx * _io_ctx; std::string _fid; std::string _hint; const EnvLibrados * const _env; std::mutex _mutex; // used to protect modification of all following variables librados::bufferlist _buffer; // write buffer uint64_t _buffer_size; // write buffer size uint64_t _file_size; // this file size doesn't include buffer size /** * @brief assuming caller holds lock * @details [long description] * @return [description] */ int _SyncLocked() { // 1. sync append data to RADOS int r = _io_ctx->append(_fid, _buffer, _buffer_size); assert(r >= 0); // 2. update local variables if (0 == r) { _buffer.clear(); _file_size += _buffer_size; _buffer_size = 0; } return r; } public: LibradosWritableFile(librados::IoCtx* io_ctx, std::string fid, std::string hint, const EnvLibrados* const env, const EnvOptions& options) : WritableFile(options), _io_ctx(io_ctx), _fid(fid), _hint(hint), _env(env), _buffer(), _buffer_size(0), _file_size(0) { int ret = _io_ctx->stat(_fid, &_file_size, nullptr); // if file not exist if (ret < 0) { _file_size = 0; } } ~LibradosWritableFile() { // sync before closeing writable file Sync(); } /** * @brief append data to file * @details * Append will save all written data in buffer util buffer size * reaches buffer max size. Then, it will write buffer into rados * * @param data [description] * @return [description] */ Status Append(const Slice& data) { // append buffer LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data()); int r = 0; std::lock_guard lock(_mutex); _buffer.append(data.data(), data.size()); _buffer_size += data.size(); if (_buffer_size > _env->_write_buffer_size) { r = _SyncLocked(); } LOG_DEBUG("[OUT] %i\n", r); return err_to_status(r); } /** * @brief not supported * @details [long description] * @return [description] */ Status PositionedAppend( const Slice& /* data */, uint64_t /* offset */) { return Status::NotSupported(); } /** * @brief truncate file to assigned size * @details [long description] * * @param size [description] * @return [description] */ Status Truncate(uint64_t size) { LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size); int r = 0; std::lock_guard lock(_mutex); if (_file_size > size) { r = _io_ctx->trunc(_fid, size); if (r == 0) { _buffer.clear(); _buffer_size = 0; _file_size = size; } } else if (_file_size == size) { _buffer.clear(); _buffer_size = 0; } else { librados::bufferlist tmp; tmp.claim(_buffer); _buffer.substr_of(tmp, 0, size - _file_size); _buffer_size = size - _file_size; } LOG_DEBUG("[OUT] %i\n", r); return err_to_status(r); } /** * @brief close file * @details [long description] * @return [description] */ Status Close() { LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size); return Sync(); } /** * @brief flush file, * @details initiate an aio write and not wait * * @return [description] */ Status Flush() { librados::AioCompletion *write_completion = librados::Rados::aio_create_completion(); int r = 0; std::lock_guard lock(_mutex); r = _io_ctx->aio_append(_fid, write_completion, _buffer, _buffer_size); if (0 == r) { _file_size += _buffer_size; _buffer.clear(); _buffer_size = 0; } write_completion->release(); return err_to_status(r); } /** * @brief write buffer data to rados * @details initiate an aio write and wait for result * @return [description] */ Status Sync() { // sync data int r = 0; std::lock_guard lock(_mutex); if (_buffer_size > 0) { r = _SyncLocked(); } return err_to_status(r); } /** * @brief [brief description] * @details [long description] * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush(). */ bool IsSyncThreadSafe() const { return true; } /** * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO. * @details [long description] * @return [description] */ bool use_direct_io() const { return false; } /** * @brief Get file size * @details * This API will use cached file_size. * @return [description] */ uint64_t GetFileSize() { LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size); std::lock_guard lock(_mutex); int file_size = _file_size + _buffer_size; return file_size; } /** * @brief For documentation, refer to RandomAccessFile::GetUniqueId() * @details [long description] * * @param id [description] * @param max_size [description] * * @return [description] */ size_t GetUniqueId(char* id, size_t max_size) const { // All fid has the same db_id prefix, so we need to ignore db_id prefix size_t s = std::min(max_size, _fid.size()); strncpy(id, _fid.c_str() + (_fid.size() - s), s); id[s - 1] = '\0'; return s; } /** * @brief noop * @details [long description] * * @param offset [description] * @param length [description] * * @return [description] */ Status InvalidateCache(size_t offset, size_t length) { return Status::OK(); } using WritableFile::RangeSync; /** * @brief No RangeSync support, just call Sync() * @details [long description] * * @param offset [description] * @param nbytes [description] * * @return [description] */ Status RangeSync(off_t offset, off_t nbytes) { return Sync(); } protected: using WritableFile::Allocate; /** * @brief noop * @details [long description] * * @param offset [description] * @param len [description] * * @return [description] */ Status Allocate(off_t offset, off_t len) { return Status::OK(); } }; // Directory object represents collection of files and implements // filesystem operations that can be executed on directories. class LibradosDirectory : public Directory { librados::IoCtx * _io_ctx; std::string _fid; public: explicit LibradosDirectory(librados::IoCtx * io_ctx, std::string fid): _io_ctx(io_ctx), _fid(fid) {} // Fsync directory. Can be called concurrently from multiple threads. Status Fsync() { return Status::OK(); } }; // Identifies a locked file. // This is exclusive lock and can't nested lock by same thread class LibradosFileLock : public FileLock { librados::IoCtx * _io_ctx; const std::string _obj_name; const std::string _lock_name; const std::string _cookie; int lock_state; public: LibradosFileLock( librados::IoCtx * io_ctx, const std::string obj_name): _io_ctx(io_ctx), _obj_name(obj_name), _lock_name("lock_name"), _cookie("cookie") { // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit. while (!_io_ctx->lock_exclusive( _obj_name, _lock_name, _cookie, "description", nullptr, 0)); } ~LibradosFileLock() { _io_ctx->unlock(_obj_name, _lock_name, _cookie); } }; // -------------------- // --- EnvLibrados ---- // -------------------- /** * @brief EnvLibrados ctor * @details [long description] * * @param db_name unique database name * @param config_path the configure file path for rados */ EnvLibrados::EnvLibrados(const std::string& db_name, const std::string& config_path, const std::string& db_pool) : EnvLibrados("client.admin", "ceph", 0, db_name, config_path, db_pool, "/wal", db_pool, 1 << 20) {} /** * @brief EnvLibrados ctor * @details [long description] * * @param client_name first 3 parameters is for RADOS client init * @param cluster_name * @param flags * @param db_name unique database name, used as db_id key * @param config_path the configure file path for rados * @param db_pool the pool for db data * @param wal_pool the pool for WAL data * @param write_buffer_size WritableFile buffer max size */ EnvLibrados::EnvLibrados(const std::string& client_name, const std::string& cluster_name, const uint64_t flags, const std::string& db_name, const std::string& config_path, const std::string& db_pool, const std::string& wal_dir, const std::string& wal_pool, const uint64_t write_buffer_size) : EnvWrapper(Env::Default()), _client_name(client_name), _cluster_name(cluster_name), _flags(flags), _db_name(db_name), _config_path(config_path), _db_pool_name(db_pool), _wal_dir(wal_dir), _wal_pool_name(wal_pool), _write_buffer_size(write_buffer_size) { int ret = 0; // 1. create a Rados object and initialize it ret = _rados.init2(_client_name.c_str(), _cluster_name.c_str(), _flags); // just use the client.admin keyring if (ret < 0) { // let's handle any error that might have come back std::cerr << "couldn't initialize rados! error " << ret << std::endl; ret = EXIT_FAILURE; goto out; } // 2. read configure file ret = _rados.conf_read_file(_config_path.c_str()); if (ret < 0) { // This could fail if the config file is malformed, but it'd be hard. std::cerr << "failed to parse config file " << _config_path << "! error" << ret << std::endl; ret = EXIT_FAILURE; goto out; } // 3. we actually connect to the cluster ret = _rados.connect(); if (ret < 0) { std::cerr << "couldn't connect to cluster! error " << ret << std::endl; ret = EXIT_FAILURE; goto out; } // 4. create db_pool if not exist ret = _rados.pool_create(_db_pool_name.c_str()); if (ret < 0 && ret != -EEXIST && ret != -EPERM) { std::cerr << "couldn't create pool! error " << ret << std::endl; goto out; } // 5. create db_pool_ioctx ret = _rados.ioctx_create(_db_pool_name.c_str(), _db_pool_ioctx); if (ret < 0) { std::cerr << "couldn't set up ioctx! error " << ret << std::endl; ret = EXIT_FAILURE; goto out; } // 6. create wal_pool if not exist ret = _rados.pool_create(_wal_pool_name.c_str()); if (ret < 0 && ret != -EEXIST && ret != -EPERM) { std::cerr << "couldn't create pool! error " << ret << std::endl; goto out; } // 7. create wal_pool_ioctx ret = _rados.ioctx_create(_wal_pool_name.c_str(), _wal_pool_ioctx); if (ret < 0) { std::cerr << "couldn't set up ioctx! error " << ret << std::endl; ret = EXIT_FAILURE; goto out; } // 8. add root dir _AddFid(ROOT_DIR_KEY, DIR_ID_VALUE); out: LOG_DEBUG("rados connect result code : %i\n", ret); } /**************************************************** private functions to handle fid operation. Dir also have fid, but the value is DIR_ID_VALUE ****************************************************/ /** * @brief generate a new fid * @details [long description] * @return [description] */ std::string EnvLibrados::_CreateFid() { return _db_name + "." + GenerateUniqueId(); } /** * @brief get fid * @details [long description] * * @param fname [description] * @param fid [description] * * @return * Status::OK() * Status::NotFound() */ Status EnvLibrados::_GetFid( const std::string &fname, std::string& fid) { std::set keys; std::map kvs; keys.insert(fname); int r = _db_pool_ioctx.omap_get_vals_by_keys(_db_name, keys, &kvs); if (0 == r && 0 == kvs.size()) { return Status::NotFound(); } else if (0 == r && 0 != kvs.size()) { fid.assign(kvs[fname].c_str(), kvs[fname].length()); return Status::OK(); } else { return err_to_status(r); } } /** * @brief rename fid * @details Only modify object in rados once, * so this rename operation is atomic in term of rados * * @param old_fname [description] * @param new_fname [description] * * @return [description] */ Status EnvLibrados::_RenameFid(const std::string& old_fname, const std::string& new_fname) { std::string fid; Status s = _GetFid(old_fname, fid); if (Status::OK() != s) { return s; } librados::bufferlist bl; std::set keys; std::map kvs; librados::ObjectWriteOperation o; bl.append(fid); keys.insert(old_fname); kvs[new_fname] = bl; o.omap_rm_keys(keys); o.omap_set(kvs); int r = _db_pool_ioctx.operate(_db_name, &o); return err_to_status(r); } /** * @brief add to metadata object. It may overwrite exist key. * @details [long description] * * @param fname [description] * @param fid [description] * * @return [description] */ Status EnvLibrados::_AddFid( const std::string& fname, const std::string& fid) { std::map kvs; librados::bufferlist value; value.append(fid); kvs[fname] = value; int r = _db_pool_ioctx.omap_set(_db_name, kvs); return err_to_status(r); } /** * @brief return subfile names of dir. * @details * RocksDB has a 2-level structure, so all keys * that have dir as prefix are subfiles of dir. * So we can just return these files' name. * * @param dir [description] * @param result [description] * * @return [description] */ Status EnvLibrados::_GetSubFnames( const std::string& dir, std::vector * result ) { std::string start_after(dir); std::string filter_prefix(dir); std::map kvs; _db_pool_ioctx.omap_get_vals(_db_name, start_after, filter_prefix, MAX_ITEMS_IN_FS, &kvs); result->clear(); for (auto i = kvs.begin(); i != kvs.end(); i++) { result->push_back(i->first.substr(dir.size() + 1)); } return Status::OK(); } /** * @brief delete key fname from metadata object * @details [long description] * * @param fname [description] * @return [description] */ Status EnvLibrados::_DelFid( const std::string& fname) { std::set keys; keys.insert(fname); int r = _db_pool_ioctx.omap_rm_keys(_db_name, keys); return err_to_status(r); } /** * @brief get match IoCtx from _prefix_pool_map * @details [long description] * * @param prefix [description] * @return [description] * */ librados::IoCtx* EnvLibrados::_GetIoctx(const std::string& fpath) { auto is_prefix = [](const std::string & s1, const std::string & s2) { auto it1 = s1.begin(), it2 = s2.begin(); while (it1 != s1.end() && it2 != s2.end() && *it1 == *it2) ++it1, ++it2; return it1 == s1.end(); }; if (is_prefix(_wal_dir, fpath)) { return &_wal_pool_ioctx; } else { return &_db_pool_ioctx; } } /************************************************************ public functions ************************************************************/ /** * @brief generate unique id * @details Combine system time and random number. * @return [description] */ std::string EnvLibrados::GenerateUniqueId() { Random64 r(time(nullptr)); uint64_t random_uuid_portion = r.Uniform(std::numeric_limits::max()); uint64_t nanos_uuid_portion = NowNanos(); char uuid2[200]; snprintf(uuid2, 200, "%16lx-%16lx", (unsigned long)nanos_uuid_portion, (unsigned long)random_uuid_portion); return uuid2; } /** * @brief create a new sequential read file handler * @details it will check the existence of fname * * @param fname [description] * @param result [description] * @param options [description] * @return [description] */ Status EnvLibrados::NewSequentialFile( const std::string& fname, std::unique_ptr* result, const EnvOptions& options) { LOG_DEBUG("[IN]%s\n", fname.c_str()); std::string dir, file, fid; split(fname, &dir, &file); Status s; std::string fpath = dir + "/" + file; do { s = _GetFid(dir, fid); if (!s.ok() || fid != DIR_ID_VALUE) { if (fid != DIR_ID_VALUE) s = Status::IOError(); break; } s = _GetFid(fpath, fid); if (Status::NotFound() == s) { s = Status::IOError(); errno = ENOENT; break; } result->reset(new LibradosSequentialFile(_GetIoctx(fpath), fid, fpath)); s = Status::OK(); } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief create a new random access file handler * @details it will check the existence of fname * * @param fname [description] * @param result [description] * @param options [description] * @return [description] */ Status EnvLibrados::NewRandomAccessFile( const std::string& fname, std::unique_ptr* result, const EnvOptions& options) { LOG_DEBUG("[IN]%s\n", fname.c_str()); std::string dir, file, fid; split(fname, &dir, &file); Status s; std::string fpath = dir + "/" + file; do { s = _GetFid(dir, fid); if (!s.ok() || fid != DIR_ID_VALUE) { s = Status::IOError(); break; } s = _GetFid(fpath, fid); if (Status::NotFound() == s) { s = Status::IOError(); errno = ENOENT; break; } result->reset(new LibradosRandomAccessFile(_GetIoctx(fpath), fid, fpath)); s = Status::OK(); } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief create a new write file handler * @details it will check the existence of fname * * @param fname [description] * @param result [description] * @param options [description] * @return [description] */ Status EnvLibrados::NewWritableFile( const std::string& fname, std::unique_ptr* result, const EnvOptions& options) { LOG_DEBUG("[IN]%s\n", fname.c_str()); std::string dir, file, fid; split(fname, &dir, &file); Status s; std::string fpath = dir + "/" + file; do { // 1. check if dir exist s = _GetFid(dir, fid); if (!s.ok()) { break; } if (fid != DIR_ID_VALUE) { s = Status::IOError(); break; } // 2. check if file exist. // 2.1 exist, use it // 2.2 not exist, create it s = _GetFid(fpath, fid); if (Status::NotFound() == s) { fid = _CreateFid(); _AddFid(fpath, fid); } result->reset( new LibradosWritableFile(_GetIoctx(fpath), fid, fpath, this, options)); s = Status::OK(); } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief reuse write file handler * @details * This function will rename old_fname to new_fname, * then return the handler of new_fname * * @param new_fname [description] * @param old_fname [description] * @param result [description] * @param options [description] * @return [description] */ Status EnvLibrados::ReuseWritableFile( const std::string& new_fname, const std::string& old_fname, std::unique_ptr* result, const EnvOptions& options) { LOG_DEBUG("[IN]%s => %s\n", old_fname.c_str(), new_fname.c_str()); std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file; split(old_fname, &src_dir, &src_file); split(new_fname, &dst_dir, &dst_file); std::string src_fpath = src_dir + "/" + src_file; std::string dst_fpath = dst_dir + "/" + dst_file; Status r = Status::OK(); do { r = _RenameFid(src_fpath, dst_fpath); if (!r.ok()) { break; } result->reset(new LibradosWritableFile(_GetIoctx(dst_fpath), src_fid, dst_fpath, this, options)); } while (0); LOG_DEBUG("[OUT]%s\n", r.ToString().c_str()); return r; } /** * @brief create a new directory handler * @details [long description] * * @param name [description] * @param result [description] * * @return [description] */ Status EnvLibrados::NewDirectory( const std::string& name, std::unique_ptr* result) { LOG_DEBUG("[IN]%s\n", name.c_str()); std::string fid, dir, file; /* just want to get dir name */ split(name + "/tmp", &dir, &file); Status s; do { s = _GetFid(dir, fid); if (!s.ok() || DIR_ID_VALUE != fid) { s = Status::IOError(name, strerror(-ENOENT)); break; } if (Status::NotFound() == s) { s = _AddFid(dir, DIR_ID_VALUE); if (!s.ok()) break; } else if (!s.ok()) { break; } result->reset(new LibradosDirectory(_GetIoctx(dir), dir)); s = Status::OK(); } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief check if fname is exist * @details [long description] * * @param fname [description] * @return [description] */ Status EnvLibrados::FileExists(const std::string& fname) { LOG_DEBUG("[IN]%s\n", fname.c_str()); std::string fid, dir, file; split(fname, &dir, &file); Status s = _GetFid(dir + "/" + file, fid); if (s.ok() && fid != DIR_ID_VALUE) { s = Status::OK(); } LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief get subfile name of dir_in * @details [long description] * * @param dir_in [description] * @param result [description] * * @return [description] */ Status EnvLibrados::GetChildren( const std::string& dir_in, std::vector* result) { LOG_DEBUG("[IN]%s\n", dir_in.c_str()); std::string fid, dir, file; split(dir_in + "/temp", &dir, &file); Status s; do { s = _GetFid(dir, fid); if (!s.ok()) { break; } if (fid != DIR_ID_VALUE) { s = Status::IOError(); break; } s = _GetSubFnames(dir, result); } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief delete fname * @details [long description] * * @param fname [description] * @return [description] */ Status EnvLibrados::DeleteFile(const std::string& fname) { LOG_DEBUG("[IN]%s\n", fname.c_str()); std::string fid, dir, file; split(fname, &dir, &file); Status s = _GetFid(dir + "/" + file, fid); if (s.ok() && DIR_ID_VALUE != fid) { s = _DelFid(dir + "/" + file); } else { s = Status::NotFound(); } LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief create new dir * @details [long description] * * @param dirname [description] * @return [description] */ Status EnvLibrados::CreateDir(const std::string& dirname) { LOG_DEBUG("[IN]%s\n", dirname.c_str()); std::string fid, dir, file; split(dirname + "/temp", &dir, &file); Status s = _GetFid(dir + "/" + file, fid); do { if (Status::NotFound() != s && fid != DIR_ID_VALUE) { break; } else if (Status::OK() == s && fid == DIR_ID_VALUE) { break; } s = _AddFid(dir, DIR_ID_VALUE); } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief create dir if missing * @details [long description] * * @param dirname [description] * @return [description] */ Status EnvLibrados::CreateDirIfMissing(const std::string& dirname) { LOG_DEBUG("[IN]%s\n", dirname.c_str()); std::string fid, dir, file; split(dirname + "/temp", &dir, &file); Status s = Status::OK(); do { s = _GetFid(dir, fid); if (Status::NotFound() != s) { break; } s = _AddFid(dir, DIR_ID_VALUE); } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief delete dir * @details * * @param dirname [description] * @return [description] */ Status EnvLibrados::DeleteDir(const std::string& dirname) { LOG_DEBUG("[IN]%s\n", dirname.c_str()); std::string fid, dir, file; split(dirname + "/temp", &dir, &file); Status s = Status::OK(); s = _GetFid(dir, fid); if (s.ok() && DIR_ID_VALUE == fid) { std::vector subs; s = _GetSubFnames(dir, &subs); // if subfiles exist, can't delete dir if (subs.size() > 0) { s = Status::IOError(); } else { s = _DelFid(dir); } } else { s = Status::NotFound(); } LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief return file size * @details [long description] * * @param fname [description] * @param file_size [description] * * @return [description] */ Status EnvLibrados::GetFileSize( const std::string& fname, uint64_t* file_size) { LOG_DEBUG("[IN]%s\n", fname.c_str()); std::string fid, dir, file; split(fname, &dir, &file); time_t mtime; Status s; do { std::string fpath = dir + "/" + file; s = _GetFid(fpath, fid); if (!s.ok()) { break; } int ret = _GetIoctx(fpath)->stat(fid, file_size, &mtime); if (ret < 0) { LOG_DEBUG("%i\n", ret); if (-ENOENT == ret) { *file_size = 0; s = Status::OK(); } else { s = err_to_status(ret); } } else { s = Status::OK(); } } while (0); LOG_DEBUG("[OUT]%s|%lld\n", s.ToString().c_str(), (long long)*file_size); return s; } /** * @brief get file modification time * @details [long description] * * @param fname [description] * @param file_mtime [description] * * @return [description] */ Status EnvLibrados::GetFileModificationTime(const std::string& fname, uint64_t* file_mtime) { LOG_DEBUG("[IN]%s\n", fname.c_str()); std::string fid, dir, file; split(fname, &dir, &file); time_t mtime; uint64_t file_size; Status s = Status::OK(); do { std::string fpath = dir + "/" + file; s = _GetFid(dir + "/" + file, fid); if (!s.ok()) { break; } int ret = _GetIoctx(fpath)->stat(fid, &file_size, &mtime); if (ret < 0) { if (Status::NotFound() == err_to_status(ret)) { *file_mtime = static_cast(mtime); s = Status::OK(); } else { s = err_to_status(ret); } } else { s = Status::OK(); } } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief rename file * @details * * @param src [description] * @param target_in [description] * * @return [description] */ Status EnvLibrados::RenameFile( const std::string& src, const std::string& target_in) { LOG_DEBUG("[IN]%s => %s\n", src.c_str(), target_in.c_str()); std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file; split(src, &src_dir, &src_file); split(target_in, &dst_dir, &dst_file); auto s = _RenameFid(src_dir + "/" + src_file, dst_dir + "/" + dst_file); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief not support * @details [long description] * * @param src [description] * @param target_in [description] * * @return [description] */ Status EnvLibrados::LinkFile( const std::string& src, const std::string& target_in) { LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str()); return Status::NotSupported(); } /** * @brief lock file. create if missing. * @details [long description] * * It seems that LockFile is used for preventing other instance of RocksDB * from opening up the database at the same time. From RocksDB source code, * the invokes of LockFile are at following locations: * * ./db/db_impl.cc:1159: s = env_->LockFile(LockFileName(dbname_), &db_lock_); // DBImpl::Recover * ./db/db_impl.cc:5839: Status result = env->LockFile(lockname, &lock); // Status DestroyDB * * When db recovery and db destroy, RocksDB will call LockFile * * @param fname [description] * @param lock [description] * * @return [description] */ Status EnvLibrados::LockFile( const std::string& fname, FileLock** lock) { LOG_DEBUG("[IN]%s\n", fname.c_str()); std::string fid, dir, file; split(fname, &dir, &file); Status s = Status::OK(); do { std::string fpath = dir + "/" + file; s = _GetFid(fpath, fid); if (Status::OK() != s && Status::NotFound() != s) { break; } else if (Status::NotFound() == s) { s = _AddFid(fpath, _CreateFid()); if (!s.ok()) { break; } } else if (Status::OK() == s && DIR_ID_VALUE == fid) { s = Status::IOError(); break; } *lock = new LibradosFileLock(_GetIoctx(fpath), fpath); } while (0); LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); return s; } /** * @brief unlock file * @details [long description] * * @param lock [description] * @return [description] */ Status EnvLibrados::UnlockFile(FileLock* lock) { LOG_DEBUG("[IO]%p\n", lock); if (nullptr != lock) { delete lock; } return Status::OK(); } /** * @brief not support * @details [long description] * * @param db_path [description] * @param output_path [description] * * @return [description] */ Status EnvLibrados::GetAbsolutePath( const std::string& db_path, std::string* output_path) { LOG_DEBUG("[IO]%s\n", db_path.c_str()); return Status::NotSupported(); } /** * @brief Get default EnvLibrados * @details [long description] * @return [description] */ EnvLibrados* EnvLibrados::Default() { static EnvLibrados default_env(default_db_name, std::getenv(default_config_path), default_pool_name); return &default_env; } } // namespace rocksdb