Fix db_stress for custom env (#5122)

Summary:
Fix some hdfs-related code so that it can compile and run 'db_stress'
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5122

Differential Revision: D14675495

Pulled By: riversand963

fbshipit-source-id: cac280479efcf5451982558947eac1732e8bc45a
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent dae3b5545c
commit d77476ef55
  1. 4
      build_tools/build_detect_platform
  2. 40
      env/env_hdfs.cc
  3. 99
      hdfs/env_hdfs.h
  4. 4
      hdfs/setup.sh
  5. 9
      tools/db_stress.cc

@ -518,8 +518,8 @@ if test "$USE_HDFS"; then
echo "JAVA_HOME has to be set for HDFS usage."
exit 1
fi
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64"
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS -I$HADOOP_HOME/include"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64 -L$HADOOP_HOME/lib/native"
HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib"
HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm"
COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS"

40
env/env_hdfs.cc vendored

@ -11,13 +11,14 @@
#ifndef ROCKSDB_HDFS_FILE_C
#define ROCKSDB_HDFS_FILE_C
#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
#include <algorithm>
#include <iostream>
#include <sstream>
#include "rocksdb/status.h"
#include "util/logging.h"
#include "util/string_util.h"
#define HDFS_EXISTS 0
@ -224,7 +225,7 @@ class HdfsWritableFile: public WritableFile {
filename_.c_str());
const char* src = data.data();
size_t left = data.size();
size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
filename_.c_str());
if (ret != left) {
@ -254,7 +255,8 @@ class HdfsWritableFile: public WritableFile {
// This is used by HdfsLogger to write data to the debug log file
virtual Status Append(const char* src, size_t size) {
if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
static_cast<tSize>(size)) {
return IOError(filename_, errno);
}
return Status::OK();
@ -282,11 +284,10 @@ class HdfsLogger : public Logger {
Status HdfsCloseHelper() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
file_->getName().c_str());
Status s = file_->Close();
if (mylog != nullptr && mylog == this) {
mylog = nullptr;
}
return s;
return Status::OK();
}
protected:
@ -299,14 +300,15 @@ class HdfsLogger : public Logger {
file_->getName().c_str());
}
virtual ~HdfsLogger() {
~HdfsLogger() override {
if (!closed_) {
closed_ = true;
HdfsCloseHelper();
}
}
virtual void Logv(const char* format, va_list ap) {
using Logger::Logv;
void Logv(const char* format, va_list ap) override {
const uint64_t thread_id = (*gettid_)();
// We try twice: the first time with a fixed-size stack allocated buffer,
@ -384,7 +386,7 @@ const std::string HdfsEnv::pathsep = "/";
// open a file for sequential reading
Status HdfsEnv::NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
@ -399,7 +401,7 @@ Status HdfsEnv::NewSequentialFile(const std::string& fname,
// open a file for random reading
Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
@ -414,7 +416,7 @@ Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
// create a new file for writing
Status HdfsEnv::NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
Status s;
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
@ -432,7 +434,9 @@ class HdfsDirectory : public Directory {
explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory() {}
virtual Status Fsync() { return Status::OK(); }
Status Fsync() override { return Status::OK(); }
int GetFd() const { return fd_; }
private:
int fd_;
@ -477,10 +481,10 @@ Status HdfsEnv::GetChildren(const std::string& path,
pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
if (numEntries >= 0) {
for(int i = 0; i < numEntries; i++) {
char* pathname = pHdfsFileInfo[i].mName;
char* filename = std::rindex(pathname, '/');
if (filename != nullptr) {
result->push_back(filename+1);
std::string pathname(pHdfsFileInfo[i].mName);
size_t pos = pathname.rfind("/");
if (std::string::npos != pos) {
result->push_back(pathname.substr(pos + 1));
}
}
if (pHdfsFileInfo != nullptr) {
@ -571,16 +575,14 @@ Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
return IOError(src, errno);
}
Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
// there isn's a very good way to atomically check and create
// a file via libhdfs
*lock = nullptr;
return Status::OK();
}
Status HdfsEnv::UnlockFile(FileLock* lock) {
return Status::OK();
}
Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
Status HdfsEnv::NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) {

@ -54,110 +54,109 @@ class HdfsEnv : public Env {
hdfsDisconnect(fileSys_);
}
virtual Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options);
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override;
virtual Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options);
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override;
virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options);
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;
virtual Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result);
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;
virtual Status FileExists(const std::string& fname);
Status FileExists(const std::string& fname) override;
virtual Status GetChildren(const std::string& path,
std::vector<std::string>* result);
Status GetChildren(const std::string& path,
std::vector<std::string>* result) override;
virtual Status DeleteFile(const std::string& fname);
Status DeleteFile(const std::string& fname) override;
virtual Status CreateDir(const std::string& name);
Status CreateDir(const std::string& name) override;
virtual Status CreateDirIfMissing(const std::string& name);
Status CreateDirIfMissing(const std::string& name) override;
virtual Status DeleteDir(const std::string& name);
Status DeleteDir(const std::string& name) override;
virtual Status GetFileSize(const std::string& fname, uint64_t* size);
Status GetFileSize(const std::string& fname, uint64_t* size) override;
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime);
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override;
virtual Status RenameFile(const std::string& src, const std::string& target);
Status RenameFile(const std::string& src, const std::string& target) override;
virtual Status LinkFile(const std::string& src, const std::string& target) {
Status LinkFile(const std::string& /*src*/,
const std::string& /*target*/) override {
return Status::NotSupported(); // not supported
}
virtual Status LockFile(const std::string& fname, FileLock** lock);
Status LockFile(const std::string& fname, FileLock** lock) override;
virtual Status UnlockFile(FileLock* lock);
Status UnlockFile(FileLock* lock) override;
virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result);
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;
virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW, void* tag = nullptr, void (*unschedFunction)(void* arg) = 0) {
void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override {
posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
}
virtual int UnSchedule(void* tag, Priority pri) {
int UnSchedule(void* tag, Priority pri) override {
return posixEnv->UnSchedule(tag, pri);
}
virtual void StartThread(void (*function)(void* arg), void* arg) {
void StartThread(void (*function)(void* arg), void* arg) override {
posixEnv->StartThread(function, arg);
}
virtual void WaitForJoin() { posixEnv->WaitForJoin(); }
void WaitForJoin() override { posixEnv->WaitForJoin(); }
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const
override {
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return posixEnv->GetThreadPoolQueueLen(pri);
}
virtual Status GetTestDirectory(std::string* path) {
Status GetTestDirectory(std::string* path) override {
return posixEnv->GetTestDirectory(path);
}
virtual uint64_t NowMicros() {
return posixEnv->NowMicros();
}
uint64_t NowMicros() override { return posixEnv->NowMicros(); }
virtual void SleepForMicroseconds(int micros) {
void SleepForMicroseconds(int micros) override {
posixEnv->SleepForMicroseconds(micros);
}
virtual Status GetHostName(char* name, uint64_t len) {
Status GetHostName(char* name, uint64_t len) override {
return posixEnv->GetHostName(name, len);
}
virtual Status GetCurrentTime(int64_t* unix_time) {
Status GetCurrentTime(int64_t* unix_time) override {
return posixEnv->GetCurrentTime(unix_time);
}
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override {
return posixEnv->GetAbsolutePath(db_path, output_path);
}
virtual void SetBackgroundThreads(int number, Priority pri = LOW) {
void SetBackgroundThreads(int number, Priority pri = LOW) override {
posixEnv->SetBackgroundThreads(number, pri);
}
virtual int GetBackgroundThreads(Priority pri = LOW) {
int GetBackgroundThreads(Priority pri = LOW) override {
return posixEnv->GetBackgroundThreads(pri);
}
virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
}
virtual std::string TimeToString(uint64_t number) {
std::string TimeToString(uint64_t number) override {
return posixEnv->TimeToString(number);
}
@ -166,9 +165,7 @@ class HdfsEnv : public Env {
return (uint64_t)pthread_self();
}
virtual uint64_t GetThreadID() const override {
return HdfsEnv::gettid();
}
uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }
private:
std::string fsname_; // string of the form "hdfs://hostname:port/"
@ -206,7 +203,7 @@ class HdfsEnv : public Env {
std::string host(parts[0]);
std::string remaining(parts[1]);
int rem = remaining.find(pathsep);
int rem = static_cast<int>(remaining.find(pathsep));
std::string portStr = (rem == 0 ? remaining :
remaining.substr(0, rem));

@ -1,8 +1,8 @@
# shellcheck disable=SC2148
export USE_HDFS=1
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:/usr/lib/hadoop/lib/native
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$HADOOP_HOME/lib/native
export CLASSPATH=
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
for f in `find /usr/lib/hadoop-hdfs | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop/client | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done

@ -1403,7 +1403,14 @@ class StressTest {
FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
}
}
DestroyDB(FLAGS_db, Options());
Options options;
options.env = FLAGS_env;
Status s = DestroyDB(FLAGS_db, options);
if (!s.ok()) {
fprintf(stderr, "Cannot destroy original db: %s\n",
s.ToString().c_str());
exit(1);
}
}
}

Loading…
Cancel
Save