- hdfs cleanup; fix to NewDirectory to comply with definition in env.h

- fix compile error with env_test; static casts added
main
Mike Orr 11 years ago
parent c2fda55cfe
commit d788bb8f71
  1. 35
      util/env_hdfs.cc
  2. 8
      util/env_test.cc

@ -401,16 +401,9 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname,
class HdfsDirectory : public Directory { class HdfsDirectory : public Directory {
public: public:
explicit HdfsDirectory(int fd) : fd_(fd) {} explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory() { ~HdfsDirectory() {}
//close(fd_);
}
virtual Status Fsync() { virtual Status Fsync() { return Status::OK(); }
//if (fsync(fd_) == -1) {
// return IOError("directory", errno);
// }
return Status::OK();
}
private: private:
int fd_; int fd_;
@ -418,15 +411,16 @@ class HdfsDirectory : public Directory {
Status HdfsEnv::NewDirectory(const std::string& name, Status HdfsEnv::NewDirectory(const std::string& name,
unique_ptr<Directory>* result) { unique_ptr<Directory>* result) {
int value = hdfsExists(fileSys_, name.c_str());
int value = hdfsCreateDirectory(fileSys_, name.c_str());
result->reset(new HdfsDirectory(0));
switch (value) { switch (value) {
case HDFS_SUCCESS: // directory created case HDFS_EXISTS:
return Status::OK(); result->reset(new HdfsDirectory(0));
default:
Log(mylog, "directory already exists ");
return Status::OK(); return Status::OK();
default: // fail if the directory doesn't exist
Log(mylog, "NewDirectory hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " +
std::to_string(value) + " on path " + name +
".\n");
} }
} }
@ -440,8 +434,9 @@ bool HdfsEnv::FileExists(const std::string& fname) {
return false; return false;
default: // anything else should be an error default: // anything else should be an error
Log(mylog, "FileExists hdfsExists call failed"); Log(mylog, "FileExists hdfsExists call failed");
throw HdfsFatalException("1. hdfsExists call failed with error " + throw HdfsFatalException("hdfsExists call failed with error " +
std::to_string(value) + " on path " + fname + ".\n"); std::to_string(value) + " on path " + fname +
".\n");
} }
} }
@ -477,13 +472,13 @@ Status HdfsEnv::GetChildren(const std::string& path,
default: // anything else should be an error default: // anything else should be an error
Log(mylog, "GetChildren hdfsExists call failed"); Log(mylog, "GetChildren hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " + throw HdfsFatalException("hdfsExists call failed with error " +
std::to_string(value) + " on path " + path.c_str() + ".\n"); std::to_string(value) + ".\n");
} }
return Status::OK(); return Status::OK();
} }
Status HdfsEnv::DeleteFile(const std::string& fname) { Status HdfsEnv::DeleteFile(const std::string& fname) {
if (hdfsDelete(fileSys_, fname.c_str(),1) == 0) { if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
return Status::OK(); return Status::OK();
} }
return IOError(fname, errno); return IOError(fname, errno);

@ -285,7 +285,7 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) {
// Increase to 5 threads. Task 0 and 2 running. // Increase to 5 threads. Task 0 and 2 running.
env_->SetBackgroundThreads(5, Env::Priority::HIGH); env_->SetBackgroundThreads(5, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[2].IsSleeping()); ASSERT_TRUE(tasks[2].IsSleeping());
@ -330,7 +330,7 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) {
tasks[4].WakeUp(); tasks[4].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
for (size_t i = 5; i < 8; i++) { for (size_t i = 5; i < 8; i++) {
ASSERT_TRUE(tasks[i].IsSleeping()); ASSERT_TRUE(tasks[i].IsSleeping());
} }
@ -360,13 +360,13 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) {
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9], env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9],
Env::Priority::HIGH); Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0); ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0);
ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping()); ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
// Increase to 4 threads. Task 5, 8, 9 running. // Increase to 4 threads. Task 5, 8, 9 running.
env_->SetBackgroundThreads(4, Env::Priority::HIGH); env_->SetBackgroundThreads(4, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[8].IsSleeping()); ASSERT_TRUE(tasks[8].IsSleeping());
ASSERT_TRUE(tasks[9].IsSleeping()); ASSERT_TRUE(tasks[9].IsSleeping());

Loading…
Cancel
Save