Add IsDirectory() to Env and FS (#6711)

Summary:
IsDirectory() is a common API to check whether a path is a regular file or
directory.
POSIX: call stat() and use S_ISDIR(st_mode)
Windows: PathIsDirectoryA() and PathIsDirectoryW()
HDFS: FileSystem.IsDirectory()
Java: File.IsDirectory()
...
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6711

Test Plan: make check

Reviewed By: anand1976

Differential Revision: D21053520

Pulled By: riversand963

fbshipit-source-id: 680aadfd8ce982b63689190cf31b3145d5a89e27
main
Yanqin Jin 5 years ago committed by Facebook GitHub Bot
parent 63d82e57b9
commit 243852ec15
  1. 1
      HISTORY.md
  2. 10
      env/composite_env_wrapper.h
  3. 12
      env/env_hdfs.cc
  4. 33
      env/env_test.cc
  5. 22
      env/fs_posix.cc
  6. 6
      hdfs/env_hdfs.h
  7. 31
      include/rocksdb/env.h
  8. 8
      include/rocksdb/file_system.h
  9. 12
      port/win/env_win.cc
  10. 4
      port/win/env_win.h
  11. 2
      port/win/port_win.h

@ -6,6 +6,7 @@
### Public API Change ### Public API Change
* Add NewFileChecksumGenCrc32cFactory to the file checksum public API, such that the builtin Crc32c based file checksum generator factory can be used by applications. * Add NewFileChecksumGenCrc32cFactory to the file checksum public API, such that the builtin Crc32c based file checksum generator factory can be used by applications.
* Add IsDirectory to Env and FS to indicate if a path is a directory.
### New Features ### New Features
* Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism. * Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism.

@ -492,6 +492,12 @@ class CompositeEnvWrapper : public Env {
return file_system_->NewLogger(fname, io_opts, result, &dbg); return file_system_->NewLogger(fname, io_opts, result, &dbg);
} }
Status IsDirectory(const std::string& path, bool* is_dir) override {
IOOptions io_opts;
IODebugContext dbg;
return file_system_->IsDirectory(path, io_opts, is_dir, &dbg);
}
#if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION) #if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
Status LoadLibrary(const std::string& lib_name, Status LoadLibrary(const std::string& lib_name,
const std::string& search_path, const std::string& search_path,
@ -1081,6 +1087,10 @@ class LegacyFileSystemWrapper : public FileSystem {
uint64_t* diskfree, IODebugContext* /*dbg*/) override { uint64_t* diskfree, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFreeSpace(path, diskfree)); return status_to_io_status(target_->GetFreeSpace(path, diskfree));
} }
IOStatus IsDirectory(const std::string& path, const IOOptions& /*options*/,
bool* is_dir, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->IsDirectory(path, is_dir));
}
private: private:
Env* target_; Env* target_;

12
env/env_hdfs.cc vendored

@ -609,6 +609,18 @@ Status HdfsEnv::NewLogger(const std::string& fname,
return Status::OK(); return Status::OK();
} }
Status HdfsEnv::IsDirectory(const std::string& path, bool* is_dir) {
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, path.c_str());
if (pFileInfo != nullptr) {
if (is_dir != nullptr) {
*is_dir = (pFileInfo->mKind == tObjectKindDirectory);
}
hdfsFreeFileInfo(pFileInfo, 1);
return Status::OK();
}
return IOError(path, errno);
}
// The factory method for creating an HDFS Env // The factory method for creating an HDFS Env
Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) { Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
*hdfs_env = new HdfsEnv(fsname); *hdfs_env = new HdfsEnv(fsname);

33
env/env_test.cc vendored

@ -1937,7 +1937,13 @@ class TestEnv : public EnvWrapper {
int close_count; int close_count;
}; };
class EnvTest : public testing::Test {}; class EnvTest : public testing::Test {
public:
EnvTest() : test_directory_(test::PerThreadDBPath("env_test")) {}
protected:
const std::string test_directory_;
};
TEST_F(EnvTest, Close) { TEST_F(EnvTest, Close) {
TestEnv* env = new TestEnv(); TestEnv* env = new TestEnv();
@ -2090,6 +2096,31 @@ TEST_F(EnvTest, MultipleCompositeEnv) {
ASSERT_EQ(env2->GetBackgroundThreads(Env::HIGH), 8); ASSERT_EQ(env2->GetBackgroundThreads(Env::HIGH), 8);
} }
TEST_F(EnvTest, IsDirectory) {
Status s = Env::Default()->CreateDirIfMissing(test_directory_);
ASSERT_OK(s);
const std::string test_sub_dir = test_directory_ + "sub1";
const std::string test_file_path = test_directory_ + "file1";
ASSERT_OK(Env::Default()->CreateDirIfMissing(test_sub_dir));
bool is_dir = false;
ASSERT_OK(Env::Default()->IsDirectory(test_sub_dir, &is_dir));
ASSERT_TRUE(is_dir);
{
std::unique_ptr<FSWritableFile> wfile;
s = Env::Default()->GetFileSystem()->NewWritableFile(
test_file_path, FileOptions(), &wfile, /*dbg=*/nullptr);
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> fwriter;
fwriter.reset(new WritableFileWriter(std::move(wfile), test_file_path,
FileOptions(), Env::Default()));
constexpr char buf[] = "test";
s = fwriter->Append(buf);
ASSERT_OK(s);
}
ASSERT_OK(Env::Default()->IsDirectory(test_file_path, &is_dir));
ASSERT_FALSE(is_dir);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

22
env/fs_posix.cc vendored

@ -877,6 +877,28 @@ class PosixFileSystem : public FileSystem {
return IOStatus::OK(); return IOStatus::OK();
} }
IOStatus IsDirectory(const std::string& path, const IOOptions& /*opts*/,
bool* is_dir, IODebugContext* /*dbg*/) override {
// First open
int fd = -1;
int flags = cloexec_flags(O_RDONLY, nullptr);
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(path.c_str(), flags);
}
if (fd < 0) {
return IOError("While open for IsDirectory()", path, errno);
}
struct stat sbuf;
if (fstat(fd, &sbuf) < 0) {
return IOError("While doing stat for IsDirectory()", path, errno);
}
if (nullptr != is_dir) {
*is_dir = S_ISDIR(sbuf.st_mode);
}
return IOStatus::OK();
}
FileOptions OptimizeForLogWrite(const FileOptions& file_options, FileOptions OptimizeForLogWrite(const FileOptions& file_options,
const DBOptions& db_options) const override { const DBOptions& db_options) const override {
FileOptions optimized = file_options; FileOptions optimized = file_options;

@ -101,6 +101,8 @@ class HdfsEnv : public Env {
Status NewLogger(const std::string& fname, Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override; std::shared_ptr<Logger>* result) override;
Status IsDirectory(const std::string& path, bool* is_dir) override;
void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW, void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
void* tag = nullptr, void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override { void (*unschedFunction)(void* arg) = 0) override {
@ -329,6 +331,10 @@ class HdfsEnv : public Env {
return notsup; return notsup;
} }
Status IsDirectory(const std::string& /*path*/, bool* /*is_dir*/) override {
return notsup;
}
virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/, virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,
Priority /*pri*/ = LOW, void* /*tag*/ = nullptr, Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,
void (* /*unschedFunction*/)(void* arg) = 0) override {} void (* /*unschedFunction*/)(void* arg) = 0) override {}

@ -227,7 +227,7 @@ class Env {
virtual Status ReopenWritableFile(const std::string& /*fname*/, virtual Status ReopenWritableFile(const std::string& /*fname*/,
std::unique_ptr<WritableFile>* /*result*/, std::unique_ptr<WritableFile>* /*result*/,
const EnvOptions& /*options*/) { const EnvOptions& /*options*/) {
return Status::NotSupported(); return Status::NotSupported("Env::ReopenWritableFile() not supported.");
} }
// Reuse an existing file by renaming it and opening it as writable. // Reuse an existing file by renaming it and opening it as writable.
@ -461,7 +461,7 @@ class Env {
virtual int GetBackgroundThreads(Priority pri = LOW) = 0; virtual int GetBackgroundThreads(Priority pri = LOW) = 0;
virtual Status SetAllowNonOwnerAccess(bool /*allow_non_owner_access*/) { virtual Status SetAllowNonOwnerAccess(bool /*allow_non_owner_access*/) {
return Status::NotSupported("Not supported."); return Status::NotSupported("Env::SetAllowNonOwnerAccess() not supported.");
} }
// Enlarge number of background worker threads of a specific thread pool // Enlarge number of background worker threads of a specific thread pool
@ -518,7 +518,7 @@ class Env {
// Returns the status of all threads that belong to the current Env. // Returns the status of all threads that belong to the current Env.
virtual Status GetThreadList(std::vector<ThreadStatus>* /*thread_list*/) { virtual Status GetThreadList(std::vector<ThreadStatus>* /*thread_list*/) {
return Status::NotSupported("Not supported."); return Status::NotSupported("Env::GetThreadList() not supported.");
} }
// Returns the pointer to ThreadStatusUpdater. This function will be // Returns the pointer to ThreadStatusUpdater. This function will be
@ -537,7 +537,12 @@ class Env {
// Get the amount of free disk space // Get the amount of free disk space
virtual Status GetFreeSpace(const std::string& /*path*/, virtual Status GetFreeSpace(const std::string& /*path*/,
uint64_t* /*diskfree*/) { uint64_t* /*diskfree*/) {
return Status::NotSupported(); return Status::NotSupported("Env::GetFreeSpace() not supported.");
}
// Check whether the specified path is a directory
virtual Status IsDirectory(const std::string& /*path*/, bool* /*is_dir*/) {
return Status::NotSupported("Env::IsDirectory() not supported.");
} }
virtual void SanitizeEnvOptions(EnvOptions* /*env_opts*/) const {} virtual void SanitizeEnvOptions(EnvOptions* /*env_opts*/) const {}
@ -599,14 +604,16 @@ class SequentialFile {
// of this file. If the length is 0, then it refers to the end of file. // of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop. // If the system is not caching the file contents, then this is a noop.
virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) { virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
return Status::NotSupported("InvalidateCache not supported."); return Status::NotSupported(
"SequentialFile::InvalidateCache not supported.");
} }
// Positioned Read for direct I/O // Positioned Read for direct I/O
// If Direct I/O enabled, offset, n, and scratch should be properly aligned // If Direct I/O enabled, offset, n, and scratch should be properly aligned
virtual Status PositionedRead(uint64_t /*offset*/, size_t /*n*/, virtual Status PositionedRead(uint64_t /*offset*/, size_t /*n*/,
Slice* /*result*/, char* /*scratch*/) { Slice* /*result*/, char* /*scratch*/) {
return Status::NotSupported(); return Status::NotSupported(
"SequentialFile::PositionedRead() not supported.");
} }
// If you're adding methods here, remember to add them to // If you're adding methods here, remember to add them to
@ -709,7 +716,8 @@ class RandomAccessFile {
// of this file. If the length is 0, then it refers to the end of file. // of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop. // If the system is not caching the file contents, then this is a noop.
virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) { virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
return Status::NotSupported("InvalidateCache not supported."); return Status::NotSupported(
"RandomAccessFile::InvalidateCache not supported.");
} }
// If you're adding methods here, remember to add them to // If you're adding methods here, remember to add them to
@ -767,7 +775,8 @@ class WritableFile {
// required is queried via GetRequiredBufferAlignment() // required is queried via GetRequiredBufferAlignment()
virtual Status PositionedAppend(const Slice& /* data */, virtual Status PositionedAppend(const Slice& /* data */,
uint64_t /* offset */) { uint64_t /* offset */) {
return Status::NotSupported(); return Status::NotSupported(
"WritableFile::PositionedAppend() not supported.");
} }
// Truncate is necessary to trim the file to the correct size // Truncate is necessary to trim the file to the correct size
@ -842,7 +851,7 @@ class WritableFile {
// If the system is not caching the file contents, then this is a noop. // If the system is not caching the file contents, then this is a noop.
// This call has no effect on dirty pages in the cache. // This call has no effect on dirty pages in the cache.
virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) { virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
return Status::NotSupported("InvalidateCache not supported."); return Status::NotSupported("WritableFile::InvalidateCache not supported.");
} }
// Sync a file range with disk. // Sync a file range with disk.
@ -1279,6 +1288,10 @@ class EnvWrapper : public Env {
Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); } Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); }
Status IsDirectory(const std::string& path, bool* is_dir) override {
return target_->IsDirectory(path, is_dir);
}
Status LoadLibrary(const std::string& lib_name, Status LoadLibrary(const std::string& lib_name,
const std::string& search_path, const std::string& search_path,
std::shared_ptr<DynamicLibrary>* result) override { std::shared_ptr<DynamicLibrary>* result) override {

@ -522,6 +522,10 @@ class FileSystem {
return IOStatus::NotSupported(); return IOStatus::NotSupported();
} }
virtual IOStatus IsDirectory(const std::string& /*path*/,
const IOOptions& options, bool* is_dir,
IODebugContext* /*dgb*/) = 0;
// If you're adding methods here, remember to add them to EnvWrapper too. // If you're adding methods here, remember to add them to EnvWrapper too.
private: private:
@ -1193,6 +1197,10 @@ class FileSystemWrapper : public FileSystem {
uint64_t* diskfree, IODebugContext* dbg) override { uint64_t* diskfree, IODebugContext* dbg) override {
return target_->GetFreeSpace(path, options, diskfree, dbg); return target_->GetFreeSpace(path, options, diskfree, dbg);
} }
IOStatus IsDirectory(const std::string& path, const IOOptions& options,
bool* is_dir, IODebugContext* dbg) override {
return target_->IsDirectory(path, options, is_dir, dbg);
}
private: private:
std::shared_ptr<FileSystem> target_; std::shared_ptr<FileSystem> target_;

@ -955,6 +955,14 @@ Status WinEnvIO::NewLogger(const std::string& fname,
return s; return s;
} }
Status WinEnvIO::IsDirectory(const std::string& path, bool* is_dir) {
BOOL ret = RX_PathIsDirectory(RX_FN(path).c_str());
if (is_dir) {
*is_dir = ret ? true : false;
}
return Status::OK();
}
uint64_t WinEnvIO::NowMicros() { uint64_t WinEnvIO::NowMicros() {
if (GetSystemTimePreciseAsFileTime_ != NULL) { if (GetSystemTimePreciseAsFileTime_ != NULL) {
@ -1433,6 +1441,10 @@ Status WinEnv::NewLogger(const std::string& fname,
return winenv_io_.NewLogger(fname, result); return winenv_io_.NewLogger(fname, result);
} }
Status WinEnv::IsDirectory(const std::string& path, bool* is_dir) {
return winenv_io_.IsDirectory(path, is_dir);
}
uint64_t WinEnv::NowMicros() { uint64_t WinEnv::NowMicros() {
return winenv_io_.NowMicros(); return winenv_io_.NowMicros();
} }

@ -155,6 +155,8 @@ public:
virtual Status NewLogger(const std::string& fname, virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result); std::shared_ptr<Logger>* result);
virtual Status IsDirectory(const std::string& path, bool* is_dir);
virtual uint64_t NowMicros(); virtual uint64_t NowMicros();
virtual uint64_t NowNanos(); virtual uint64_t NowNanos();
@ -287,6 +289,8 @@ public:
Status NewLogger(const std::string& fname, Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override; std::shared_ptr<Logger>* result) override;
Status IsDirectory(const std::string& path, bool* is_dir) override;
uint64_t NowMicros() override; uint64_t NowMicros() override;
uint64_t NowNanos() override; uint64_t NowNanos() override;

@ -365,6 +365,7 @@ extern void SetCpuPriority(ThreadId id, CpuPriority priority);
#define RX_PathIsRelative PathIsRelativeW #define RX_PathIsRelative PathIsRelativeW
#define RX_GetCurrentDirectory GetCurrentDirectoryW #define RX_GetCurrentDirectory GetCurrentDirectoryW
#define RX_GetDiskFreeSpaceEx GetDiskFreeSpaceExW #define RX_GetDiskFreeSpaceEx GetDiskFreeSpaceExW
#define RX_PathIsDirectory PathIsDirectoryW
#else #else
@ -389,6 +390,7 @@ extern void SetCpuPriority(ThreadId id, CpuPriority priority);
#define RX_PathIsRelative PathIsRelativeA #define RX_PathIsRelative PathIsRelativeA
#define RX_GetCurrentDirectory GetCurrentDirectoryA #define RX_GetCurrentDirectory GetCurrentDirectoryA
#define RX_GetDiskFreeSpaceEx GetDiskFreeSpaceExA #define RX_GetDiskFreeSpaceEx GetDiskFreeSpaceExA
#define RX_PathIsDirectory PathIsDirectoryA
#endif #endif

Loading…
Cancel
Save