Make Leveldb save data into HDFS files. You have to set USE_HDFS in your environment variable to compile leveldb with HDFS support.

Test Plan: Run benchmark.

Differential Revision: https://reviews.facebook.net/D3549
main
Dhruba Borthakur 13 years ago
parent 338939e5c1
commit a35e574344
  1. 15
      build_detect_platform
  2. 26
      db/db_bench.cc
  3. 14
      fbcode.sh
  4. 26
      hdfs/README
  5. 253
      hdfs/env_hdfs.h
  6. 490
      hdfs/hdfs.h
  7. 499
      util/env_hdfs.cc

@ -148,6 +148,21 @@ EOF
fi
fi
# shall we use HDFS?
if test "$USE_HDFS"; then
if test -z "$JAVA_HOME"; then
echo "JAVA_HOME has to be set for HDFS usage."
exit 1
fi
HDFS_CCFLAGS+=" -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS"
HDFS_LDFLAGS+=" -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64"
HDFS_LDFLAGS+=" -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib"
HDFS_LDFLAGS+=" -ldl -lverify -ljava -ljvm"
COMMON_FLAGS+=$HDFS_CCFLAGS
PLATFORM_LDFLAGS+=$HDFS_LDFLAGS
fi
PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS"
PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS"

@ -19,6 +19,7 @@
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/testutil.h"
#include "hdfs/env_hdfs.h"
// Comma-separated list of operations to run in the specified order
// Actual benchmarks:
@ -122,6 +123,9 @@ static long FLAGS_writes = -1;
// Sync all writes to disk
static bool FLAGS_sync = false;
// posix or hdfs environment
static leveldb::Env* FLAGS_env = leveldb::Env::Default();
extern bool useOsBuffer;
namespace leveldb {
@ -202,7 +206,7 @@ class Stats {
done_ = 0;
bytes_ = 0;
seconds_ = 0;
start_ = Env::Default()->NowMicros();
start_ = FLAGS_env->NowMicros();
finish_ = start_;
message_.clear();
}
@ -220,7 +224,7 @@ class Stats {
}
void Stop() {
finish_ = Env::Default()->NowMicros();
finish_ = FLAGS_env->NowMicros();
seconds_ = (finish_ - start_) * 1e-6;
}
@ -230,7 +234,7 @@ class Stats {
void FinishedSingleOp() {
if (FLAGS_histogram) {
double now = Env::Default()->NowMicros();
double now = FLAGS_env->NowMicros();
double micros = now - last_op_finish_;
hist_.Add(micros);
if (micros > 20000) {
@ -437,10 +441,10 @@ class Benchmark {
writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
heap_counter_(0) {
std::vector<std::string> files;
Env::Default()->GetChildren(FLAGS_db, &files);
FLAGS_env->GetChildren(FLAGS_db, &files);
for (int i = 0; i < files.size(); i++) {
if (Slice(files[i]).starts_with("heap-")) {
Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
FLAGS_env->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
}
}
if (!FLAGS_use_existing_db) {
@ -623,7 +627,7 @@ class Benchmark {
arg[i].shared = &shared;
arg[i].thread = new ThreadState(i);
arg[i].thread->shared = &shared;
Env::Default()->StartThread(ThreadBody, &arg[i]);
FLAGS_env->StartThread(ThreadBody, &arg[i]);
}
shared.mu.Lock();
@ -740,6 +744,7 @@ class Benchmark {
options.filter_policy = filter_policy_;
options.max_open_files = FLAGS_open_files;
options.statistics = dbstats;
options.env = FLAGS_env;
Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -758,7 +763,7 @@ class Benchmark {
void DoWrite(ThreadState* thread, bool seq) {
if (num_ != FLAGS_num) {
char msg[100];
snprintf(msg, sizeof(msg), "(%d ops)", num_);
snprintf(msg, sizeof(msg), "(%ld ops)", num_);
thread->stats.AddMessage(msg);
}
@ -952,7 +957,7 @@ class Benchmark {
char fname[100];
snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
WritableFile* file;
Status s = Env::Default()->NewWritableFile(fname, &file);
Status s = FLAGS_env->NewWritableFile(fname, &file);
if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
return;
@ -961,7 +966,7 @@ class Benchmark {
delete file;
if (!ok) {
fprintf(stderr, "heap profiling not supported\n");
Env::Default()->DeleteFile(fname);
FLAGS_env->DeleteFile(fname);
}
}
};
@ -977,6 +982,7 @@ int main(int argc, char** argv) {
int n;
long l;
char junk;
char hdfsname[2048];
if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
} else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
@ -1024,6 +1030,8 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_sync = n;
} else if (sscanf(argv[i], "--hdfs=%s", &hdfsname) == 1) {
FLAGS_env = new leveldb::HdfsEnv(hdfsname);
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);

@ -1,3 +1,4 @@
#!/bin/sh
#
# Set environment variables so that we can compile leveldb using
# fbcode settings. It uses the latest g++ compiler and also
@ -7,8 +8,18 @@ TOOLCHAIN_REV=d28c90311ca14f9f0b2bb720f4e34b285513d4f4
TOOLCHAIN_EXECUTABLES="/mnt/gvfs/third-party/$TOOLCHAIN_REV/centos5.2-native"
TOOLCHAIN_LIB_BASE="/mnt/gvfs/third-party/$TOOLCHAIN_REV/gcc-4.6.2-glibc-2.13"
# location of libhdfs libraries
if test "$USE_HDFS"; then
JAVA_HOME="/usr/local/jdk-6u22-64"
JINCLUDE="-I$JAVA_HOME/include -I$JAVA_HOME/include/linux"
GLIBC_RUNTIME_PATH="/usr/local/fbcode/gcc-4.6.2-glibc-2.13"
HDFSLIB=" -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64 "
HDFSLIB+=" -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib "
HDFSLIB+=" -ldl -lverify -ljava -ljvm "
fi
CC="$TOOLCHAIN_EXECUTABLES/gcc/gcc-4.6.2-glibc-2.13/bin/gcc"
CXX="$TOOLCHAIN_EXECUTABLES/gcc/gcc-4.6.2-glibc-2.13/bin/g++"
CXX="$TOOLCHAIN_EXECUTABLES/gcc/gcc-4.6.2-glibc-2.13/bin/g++ $JINCLUDE"
AR=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ar
RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib
@ -17,5 +28,6 @@ CFLAGS+=" -I $TOOLCHAIN_LIB_BASE/jemalloc/jemalloc-2.2.5/96de4f9/include -DHAVE_
EXEC_LDFLAGS=" -Wl,--whole-archive $TOOLCHAIN_LIB_BASE/jemalloc/jemalloc-2.2.4/96de4f9/lib/libjemalloc.a "
EXEC_LDFLAGS+="-Wl,--no-whole-archive $TOOLCHAIN_LIB_BASE/libunwind/libunwind-20100810/4bc2c16/lib/libunwind.a"
EXEC_LDFLAGS+=$HDFSLIB
export CC CXX AR RANLIB CFLAGS EXEC_LDFLAGS

@ -0,0 +1,26 @@
This directory contains the hdfs extensions needed to make leveldb store
files in HDFS.
The hdfs.h file is copied from the Apache Hadoop 1.0 source code.
It defines the libhdfs library
(http://hadoop.apache.org/common/docs/r0.20.2/libhdfs.html) to access
data in HDFS. The libhdfs.a is copied from the Apache Hadoop 1.0 build.
It implements the API defined in hdfs.h. If your hadoop cluster is running
a different hadoop release, then install these two files manually from your
hadoop distribution and then recompile leveldb.
The env_hdfs.h file defines the leveldb objects that are needed to talk to an
underlying filesystem.
If you want to compile leveldb with hdfs support, please set the following
enviroment variables appropriately:
USE_HDFS=1
JAVA_HOME=/usr/local/jdk-6u22-64
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/
make clean all db_bench
To run dbbench,
set CLASSPATH to include your hadoop distribution
db_bench --hdfs="hdfs://hbaseudbperf001.snc1.facebook.com:9000"

@ -0,0 +1,253 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Copyright (c) 2012 Facebook. All rights reserved.
#ifndef LEVELDB_HDFS_FILE_H
#define LEVELDB_HDFS_FILE_H
#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
#include <iostream>
#include "leveldb/env.h"
#include "leveldb/status.h"
#ifdef USE_HDFS
#include "hdfs/hdfs.h"
namespace leveldb {
static const std::string kProto = "hdfs://";
static const std::string pathsep = "/";
// Thrown during execution when there is an issue with the supplied
// arguments.
class HdfsUsageException : public std::exception { };
// A simple exception that indicates something went wrong that is not
// recoverable. The intention is for the message to be printed (with
// nothing else) and the process terminate.
class HdfsFatalException : public std::exception {
public:
explicit HdfsFatalException(const std::string& s) : what_(s) { }
virtual ~HdfsFatalException() throw() { }
virtual const char* what() const throw() {
return what_.c_str();
}
private:
const std::string what_;
};
//
// The HDFS environment for leveldb. This class overrides all the
// file/dir access methods and delegates the thread-mgmt methods to the
// default posix environment.
//
class HdfsEnv : public Env {
public:
HdfsEnv(const std::string& fsname) : fsname_(fsname) {
posixEnv = Env::Default();
fileSys_ = connectToPath(fsname_);
}
virtual ~HdfsEnv() {
fprintf(stderr, "Destroying HdfsEnv::Default()\n");
hdfsDisconnect(fileSys_);
}
virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result);
virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result);
virtual Status NewWritableFile(const std::string& fname,
WritableFile** result);
virtual bool FileExists(const std::string& fname);
virtual Status GetChildren(const std::string& path,
std::vector<std::string>* result);
virtual Status DeleteFile(const std::string& fname);
virtual Status CreateDir(const std::string& name);
virtual Status DeleteDir(const std::string& name);
virtual Status GetFileSize(const std::string& fname, uint64_t* size);
virtual Status RenameFile(const std::string& src, const std::string& target);
virtual Status LockFile(const std::string& fname, FileLock** lock);
virtual Status UnlockFile(FileLock* lock);
virtual Status NewLogger(const std::string& fname, Logger** result);
virtual void Schedule( void (*function)(void* arg), void* arg) {
posixEnv->Schedule(function, arg);
}
virtual void StartThread(void (*function)(void* arg), void* arg) {
posixEnv->StartThread(function, arg);
}
virtual Status GetTestDirectory(std::string* path) {
return posixEnv->GetTestDirectory(path);
}
virtual uint64_t NowMicros() {
return posixEnv->NowMicros();
}
virtual void SleepForMicroseconds(int micros) {
posixEnv->SleepForMicroseconds(micros);
}
static uint64_t gettid() {
assert(sizeof(pthread_t) <= sizeof(uint64_t));
return (uint64_t)pthread_self();
}
private:
std::string fsname_; // string of the form "hdfs://hostname:port/"
hdfsFS fileSys_; // a single FileSystem object for all files
Env* posixEnv; // This object is derived from Env, but not from
// posixEnv. We have posixnv as an encapsulated
// object here so that we can use posix timers,
// posix threads, etc.
/**
* If the URI is specified of the form hdfs://server:port/path,
* then connect to the specified cluster
* else connect to default.
*/
hdfsFS connectToPath(const std::string& uri) {
if (uri.empty()) {
return NULL;
}
if (uri.find(kProto) != 0) {
// uri doesn't start with hdfs:// -> use default:0, which is special
// to libhdfs.
return hdfsConnectNewInstance("default", 0);
}
const std::string hostport = uri.substr(kProto.length());
std::vector <std::string> parts;
split(hostport, ':', parts);
if (parts.size() != 2) {
throw HdfsFatalException("Bad uri for hdfs " + uri);
}
// parts[0] = hosts, parts[1] = port/xxx/yyy
std::string host(parts[0]);
std::string remaining(parts[1]);
int rem = remaining.find(pathsep);
std::string portStr = (rem == 0 ? remaining :
remaining.substr(0, rem));
tPort port;
port = atoi(portStr.c_str());
if (port == 0) {
throw HdfsFatalException("Bad host-port for hdfs " + uri);
}
hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
return fs;
}
void split(const std::string &s, char delim,
std::vector<std::string> &elems) {
elems.clear();
size_t prev = 0;
size_t pos = s.find(delim);
while (pos != std::string::npos) {
elems.push_back(s.substr(prev, pos));
prev = pos + 1;
pos = s.find(delim, prev);
}
elems.push_back(s.substr(prev, s.size()));
}
};
} // namespace leveldb
#else // USE_HDFS
namespace leveldb {
class HdfsEnv : public Env {
public:
HdfsEnv(const std::string& fsname) {
fprintf(stderr, "You have not build leveldb with HDFS support\n");
fprintf(stderr, "Please see hdfs/README for details\n");
throw new std::exception();
}
virtual ~HdfsEnv() {
}
virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result);
virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) {}
virtual Status NewWritableFile(const std::string& fname,
WritableFile** result){}
virtual bool FileExists(const std::string& fname){}
virtual Status GetChildren(const std::string& path,
std::vector<std::string>* result){}
virtual Status DeleteFile(const std::string& fname){}
virtual Status CreateDir(const std::string& name){}
virtual Status DeleteDir(const std::string& name){}
virtual Status GetFileSize(const std::string& fname, uint64_t* size){}
virtual Status RenameFile(const std::string& src, const std::string& target){}
virtual Status LockFile(const std::string& fname, FileLock** lock){}
virtual Status UnlockFile(FileLock* lock){}
virtual Status NewLogger(const std::string& fname, Logger** result){}
virtual void Schedule( void (*function)(void* arg), void* arg) {}
virtual void StartThread(void (*function)(void* arg), void* arg) {}
virtual Status GetTestDirectory(std::string* path) {}
virtual uint64_t NowMicros() {}
virtual void SleepForMicroseconds(int micros) {}
};
}
#endif // USE_HDFS
#endif // LEVELDB_HDFS_FILE_H

@ -0,0 +1,490 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFS_HDFS_H
#define LIBHDFS_HDFS_H
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <errno.h>
#include <jni.h>
#ifndef O_RDONLY
#define O_RDONLY 1
#endif
#ifndef O_WRONLY
#define O_WRONLY 2
#endif
#ifndef EINTERNAL
#define EINTERNAL 255
#endif
/** All APIs set errno to meaningful values */
#ifdef __cplusplus
extern "C" {
#endif
/**
* Some utility decls used in libhdfs.
*/
typedef int32_t tSize; /// size of data for read/write io ops
typedef time_t tTime; /// time type in seconds
typedef int64_t tOffset;/// offset within the file
typedef uint16_t tPort; /// port
typedef enum tObjectKind {
kObjectKindFile = 'F',
kObjectKindDirectory = 'D',
} tObjectKind;
/**
* The C reflection of org.apache.org.hadoop.FileSystem .
*/
typedef void* hdfsFS;
/**
* The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
*/
enum hdfsStreamType
{
UNINITIALIZED = 0,
INPUT = 1,
OUTPUT = 2,
};
/**
* The 'file-handle' to a file in hdfs.
*/
struct hdfsFile_internal {
void* file;
enum hdfsStreamType type;
};
typedef struct hdfsFile_internal* hdfsFile;
/**
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
* Connect to the hdfs.
* @param host A string containing either a host name, or an ip address
* of the namenode of a hdfs cluster. 'host' should be passed as NULL if
* you want to connect to local filesystem. 'host' should be passed as
* 'default' (and port as 0) to used the 'configured' filesystem
* (core-site/core-default.xml).
* @param port The port on which the server is listening.
* @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port)
* @param groups the groups (these are hadoop domain groups)
* @return Returns a handle to the filesystem or NULL on error.
*/
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const char *groups[], int groups_size );
/**
* hdfsConnect - Connect to a hdfs file system.
* Connect to the hdfs.
* @param host A string containing either a host name, or an ip address
* of the namenode of a hdfs cluster. 'host' should be passed as NULL if
* you want to connect to local filesystem. 'host' should be passed as
* 'default' (and port as 0) to used the 'configured' filesystem
* (core-site/core-default.xml).
* @param port The port on which the server is listening.
* @return Returns a handle to the filesystem or NULL on error.
*/
hdfsFS hdfsConnect(const char* host, tPort port);
/**
* This are the same as hdfsConnectAsUser except that every invocation returns a new FileSystem handle.
* Applications should call a hdfsDisconnect for every call to hdfsConnectAsUserNewInstance.
*/
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char *groups[], int groups_size );
hdfsFS hdfsConnectNewInstance(const char* host, tPort port);
hdfsFS hdfsConnectPath(const char* uri);
/**
* hdfsDisconnect - Disconnect from the hdfs file system.
* Disconnect from hdfs.
* @param fs The configured filesystem handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsDisconnect(hdfsFS fs);
/**
* hdfsOpenFile - Open a hdfs file in given mode.
* @param fs The configured filesystem handle.
* @param path The full path to the file.
* @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
* O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP.
* @param bufferSize Size of buffer for read/write - pass 0 if you want
* to use the default configured values.
* @param replication Block replication - pass 0 if you want to use
* the default configured values.
* @param blocksize Size of block - pass 0 if you want to use the
* default configured values.
* @return Returns the handle to the open file or NULL on error.
*/
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blocksize);
/**
* hdfsCloseFile - Close an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCloseFile(hdfsFS fs, hdfsFile file);
/**
* hdfsExists - Checks if a given path exsits on the filesystem
* @param fs The configured filesystem handle.
* @param path The path to look for
* @return Returns 0 on exists, 1 on non-exists, -1/-2 on error.
*/
int hdfsExists(hdfsFS fs, const char *path);
/**
* hdfsSeek - Seek to given offset in file.
* This works only for files opened in read-only mode.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param desiredPos Offset into the file to seek into.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos);
/**
* hdfsTell - Get the current offset in the file, in bytes.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Current offset, -1 on error.
*/
tOffset hdfsTell(hdfsFS fs, hdfsFile file);
/**
* hdfsRead - Read data from an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param buffer The buffer to copy read bytes into.
* @param length The length of the buffer.
* @return Returns the number of bytes actually read, possibly less
* than than length;-1 on error.
*/
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
/**
* hdfsPread - Positional read of data from an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param position Position from which to read
* @param buffer The buffer to copy read bytes into.
* @param length The length of the buffer.
* @return Returns the number of bytes actually read, possibly less than
* than length;-1 on error.
*/
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
void* buffer, tSize length);
/**
* hdfsWrite - Write data into an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param buffer The data.
* @param length The no. of bytes to write.
* @return Returns the number of bytes written, -1 on error.
*/
tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer,
tSize length);
/**
* hdfsWrite - Flush the data.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsFlush(hdfsFS fs, hdfsFile file);
/**
* hdfsSync - Sync the data to persistent store.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSync(hdfsFS fs, hdfsFile file);
/**
* hdfsGetNumReplicasInPipeline - get number of remaining replicas in
* pipeline
* @param fs The configured filesystem handle
* @param file the file handle
* @return returns the # of datanodes in the write pipeline; -1 on error
*/
int hdfsGetNumCurrentReplicas(hdfsFS, hdfsFile file);
/**
* hdfsAvailable - Number of bytes that can be read from this
* input stream without blocking.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns available bytes; -1 on error.
*/
int hdfsAvailable(hdfsFS fs, hdfsFile file);
/**
* hdfsCopy - Copy file from one filesystem to another.
* @param srcFS The handle to source filesystem.
* @param src The path of source file.
* @param dstFS The handle to destination filesystem.
* @param dst The path of destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
/**
* hdfsMove - Move file from one filesystem to another.
* @param srcFS The handle to source filesystem.
* @param src The path of source file.
* @param dstFS The handle to destination filesystem.
* @param dst The path of destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
/**
* hdfsDelete - Delete file.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsDelete(hdfsFS fs, const char* path);
/**
* hdfsRename - Rename file.
* @param fs The configured filesystem handle.
* @param oldPath The path of the source file.
* @param newPath The path of the destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath);
/**
* hdfsGetWorkingDirectory - Get the current working directory for
* the given filesystem.
* @param fs The configured filesystem handle.
* @param buffer The user-buffer to copy path of cwd into.
* @param bufferSize The length of user-buffer.
* @return Returns buffer, NULL on error.
*/
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize);
/**
* hdfsSetWorkingDirectory - Set the working directory. All relative
* paths will be resolved relative to it.
* @param fs The configured filesystem handle.
* @param path The path of the new 'cwd'.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path);
/**
* hdfsCreateDirectory - Make the given file and all non-existent
* parents into directories.
* @param fs The configured filesystem handle.
* @param path The path of the directory.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCreateDirectory(hdfsFS fs, const char* path);
/**
* hdfsSetReplication - Set the replication of the specified
* file to the supplied value
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication);
/**
* hdfsFileInfo - Information about a file/directory.
*/
typedef struct {
tObjectKind mKind; /* file or directory */
char *mName; /* the name of the file */
tTime mLastMod; /* the last modification time for the file in seconds */
tOffset mSize; /* the size of the file in bytes */
short mReplication; /* the count of replicas */
tOffset mBlockSize; /* the block size for the file */
char *mOwner; /* the owner of the file */
char *mGroup; /* the group associated with the file */
short mPermissions; /* the permissions associated with the file */
tTime mLastAccess; /* the last access time for the file in seconds */
} hdfsFileInfo;
/**
* hdfsListDirectory - Get list of files/directories for a given
* directory-path. hdfsFreeFileInfo should be called to deallocate memory if
* the function returns non-NULL value.
* @param fs The configured filesystem handle.
* @param path The path of the directory.
* @param numEntries Set to the number of files/directories in path.
* @return Returns a dynamically-allocated array of hdfsFileInfo
* objects; NULL if empty or on error.
* on error, numEntries will be -1.
*/
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path,
int *numEntries);
/**
* hdfsGetPathInfo - Get information about a path as a (dynamically
* allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be
* called when the pointer is no longer needed.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns a dynamically-allocated hdfsFileInfo object;
* NULL on error.
*/
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path);
/**
* hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields)
* @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
* objects.
* @param numEntries The size of the array.
*/
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries);
/**
* hdfsGetHosts - Get hostnames where a particular block (determined by
* pos & blocksize) of a file is stored. The last element in the array
* is NULL. Due to replication, a single block could be present on
* multiple hosts.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @param start The start of the block.
* @param length The length of the block.
* @return Returns a dynamically-allocated 2-d array of blocks-hosts;
* NULL on error.
*/
char*** hdfsGetHosts(hdfsFS fs, const char* path,
tOffset start, tOffset length);
/**
* hdfsFreeHosts - Free up the structure returned by hdfsGetHosts
* @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
* objects.
* @param numEntries The size of the array.
*/
void hdfsFreeHosts(char ***blockHosts);
/**
* hdfsGetDefaultBlockSize - Get the optimum blocksize.
* @param fs The configured filesystem handle.
* @return Returns the blocksize; -1 on error.
*/
tOffset hdfsGetDefaultBlockSize(hdfsFS fs);
/**
* hdfsGetCapacity - Return the raw capacity of the filesystem.
* @param fs The configured filesystem handle.
* @return Returns the raw-capacity; -1 on error.
*/
tOffset hdfsGetCapacity(hdfsFS fs);
/**
* hdfsGetUsed - Return the total raw size of all files in the filesystem.
* @param fs The configured filesystem handle.
* @return Returns the total-size; -1 on error.
*/
tOffset hdfsGetUsed(hdfsFS fs);
/**
* hdfsChown
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param owner this is a string in Hadoop land. Set to null or "" if only setting group
* @param group this is a string in Hadoop land. Set to null or "" if only setting user
* @return 0 on success else -1
*/
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group);
/**
* hdfsChmod
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param mode the bitmask to set it to
* @return 0 on success else -1
*/
int hdfsChmod(hdfsFS fs, const char* path, short mode);
/**
* hdfsUtime
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param mtime new modification time or 0 for only set access time in seconds
* @param atime new access time or 0 for only set modification time in seconds
* @return 0 on success else -1
*/
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
#ifdef __cplusplus
}
#endif
#endif /*LIBHDFS_HDFS_H*/
/**
* vim: ts=4: sw=4: et
*/

@ -0,0 +1,499 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Copyright (c) 2012 Facebook. All rights reserved.
#ifdef USE_HDFS
#ifndef LEVELDB_HDFS_FILE_C
#define LEVELDB_HDFS_FILE_C
#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
#include <iostream>
#include <sstream>
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "hdfs/hdfs.h"
#include "hdfs/env_hdfs.h"
//
// This file defines an HDFS environment for leveldb. It uses the libhdfs
// api to access HDFS. All HDFS files created by one instance of leveldb
// will reside on the same HDFS cluster.
//
namespace leveldb {
namespace {
// Log error message
static Status IOError(const std::string& context, int err_number) {
return Status::IOError(context, strerror(err_number));
}
// assume that there is one global logger for now. It is not thread-safe,
// but need not be because the logger is initialized at db-open time.
static Logger* mylog = NULL;
// Used for reading a file from HDFS. It implements both sequential-read
// access methods as well as random read access methods.
class HdfsReadableFile: virtual public SequentialFile, virtual public RandomAccessFile {
private:
hdfsFS fileSys_;
std::string filename_;
hdfsFile hfile_;
public:
HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
: fileSys_(fileSys), filename_(fname), hfile_(NULL) {
Log(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
filename_.c_str());
hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
Log(mylog, "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
filename_.c_str(), hfile_);
}
virtual ~HdfsReadableFile() {
Log(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
filename_.c_str());
hdfsCloseFile(fileSys_, hfile_);
Log(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
filename_.c_str());
hfile_ = NULL;
}
bool isValid() {
return hfile_ != NULL;
}
// sequential access, read data at current offset in file
virtual Status Read(size_t n, Slice* result, char* scratch) {
Status s;
Log(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
filename_.c_str(), n);
size_t bytes_read = hdfsRead(fileSys_, hfile_, scratch, (tSize)n);
Log(mylog, "[hdfs] HdfsReadableFile read %s\n", filename_.c_str());
*result = Slice(scratch, bytes_read);
if (bytes_read < n) {
if (feof()) {
// We leave status as ok if we hit the end of the file
} else {
// A partial read with an error: return a non-ok status
s = IOError(filename_, errno);
}
}
return s;
}
// random access, read data from specified offset in file
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
Log(mylog, "[hdfs] HdfsReadableFile preading %s\n", filename_.c_str());
ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
(void*)scratch, (tSize)n);
Log(mylog, "[hdfs] HdfsReadableFile pread %s\n", filename_.c_str());
*result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
if (bytes_read < 0) {
// An error: return a non-ok status
s = IOError(filename_, errno);
}
return s;
}
virtual Status Skip(uint64_t n) {
Log(mylog, "[hdfs] HdfsReadableFile skip %s\n", filename_.c_str());
// get current offset from file
tOffset current = hdfsTell(fileSys_, hfile_);
if (current < 0) {
return IOError(filename_, errno);
}
// seek to new offset in file
tOffset newoffset = current + n;
int val = hdfsSeek(fileSys_, hfile_, newoffset);
if (val < 0) {
return IOError(filename_, errno);
}
return Status::OK();
}
private:
// returns true if we are at the end of file, false otherwise
bool feof() {
Log(mylog, "[hdfs] HdfsReadableFile feof %s\n", filename_.c_str());
if (hdfsTell(fileSys_, hfile_) == fileSize()) {
return true;
}
return false;
}
// the current size of the file
tOffset fileSize() {
Log(mylog, "[hdfs] HdfsReadableFile fileSize %s\n", filename_.c_str());
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
tOffset size = 0L;
if (pFileInfo != NULL) {
size = pFileInfo->mSize;
hdfsFreeFileInfo(pFileInfo, 1);
} else {
throw new leveldb::HdfsFatalException("fileSize on unknown file " +
filename_);
}
return size;
}
};
// Appends to an existing file in HDFS.
class HdfsWritableFile: public WritableFile {
private:
hdfsFS fileSys_;
std::string filename_;
hdfsFile hfile_;
public:
HdfsWritableFile(hdfsFS fileSys, const std::string& fname)
: fileSys_(fileSys), filename_(fname) , hfile_(NULL) {
Log(mylog, "[hdfs] HdfsWritableFile opening %s\n", filename_.c_str());
hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
Log(mylog, "[hdfs] HdfsWritableFile opened %s\n", filename_.c_str());
assert(hfile_ != NULL);
}
virtual ~HdfsWritableFile() {
if (hfile_ != NULL) {
Log(mylog, "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str());
hdfsCloseFile(fileSys_, hfile_);
Log(mylog, "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str());
hfile_ = NULL;
}
}
// If the file was successfully created, then this returns true.
// Otherwise returns false.
bool isValid() {
return hfile_ != NULL;
}
// The name of the file, mostly needed for debug logging.
const std::string& getName() {
return filename_;
}
virtual Status Append(const Slice& data) {
Log(mylog, "[hdfs] HdfsWritableFile Append %s\n", filename_.c_str());
const char* src = data.data();
size_t left = data.size();
size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
Log(mylog, "[hdfs] HdfsWritableFile Appended %s\n", filename_.c_str());
if (ret != left) {
return IOError(filename_, errno);
}
return Status::OK();
}
virtual Status Flush() {
return Status::OK();
}
virtual Status Sync() {
Status s;
Log(mylog, "[hdfs] HdfsWritableFile Sync %s\n", filename_.c_str());
if (hdfsFlush(fileSys_, hfile_) == -1) {
return IOError(filename_, errno);
}
if (hdfsSync(fileSys_, hfile_) == -1) {
return IOError(filename_, errno);
}
Log(mylog, "[hdfs] HdfsWritableFile Synced %s\n", filename_.c_str());
return Status::OK();
}
// This is used by HdfsLogger to write data to the debug log file
virtual Status Append(const char* src, size_t size) {
if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
return IOError(filename_, errno);
}
return Status::OK();
}
virtual Status Close() {
Log(mylog, "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str());
if (hdfsCloseFile(fileSys_, hfile_) != 0) {
return IOError(filename_, errno);
}
Log(mylog, "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str());
hfile_ = NULL;
return Status::OK();
}
};
// The object that implements the debug logs to reside in HDFS.
class HdfsLogger : public Logger {
private:
HdfsWritableFile* file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread
public:
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
: file_(f), gettid_(gettid) {
Log(mylog, "[hdfs] HdfsLogger opened %s\n",
file_->getName().c_str());
}
virtual ~HdfsLogger() {
Log(mylog, "[hdfs] HdfsLogger closed %s\n",
file_->getName().c_str());
delete file_;
if (mylog != NULL && mylog == this) {
mylog = NULL;
}
}
virtual void Logv(const char* format, va_list ap) {
const uint64_t thread_id = (*gettid_)();
// We try twice: the first time with a fixed-size stack allocated buffer,
// and the second time with a much larger dynamically allocated buffer.
char buffer[500];
for (int iter = 0; iter < 2; iter++) {
char* base;
int bufsize;
if (iter == 0) {
bufsize = sizeof(buffer);
base = buffer;
} else {
bufsize = 30000;
base = new char[bufsize];
}
char* p = base;
char* limit = base + bufsize;
struct timeval now_tv;
gettimeofday(&now_tv, NULL);
const time_t seconds = now_tv.tv_sec;
struct tm t;
localtime_r(&seconds, &t);
p += snprintf(p, limit - p,
"%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
t.tm_year + 1900,
t.tm_mon + 1,
t.tm_mday,
t.tm_hour,
t.tm_min,
t.tm_sec,
static_cast<int>(now_tv.tv_usec),
static_cast<long long unsigned int>(thread_id));
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Truncate to available space if necessary
if (p >= limit) {
if (iter == 0) {
continue; // Try again with larger buffer
} else {
p = limit - 1;
}
}
// Add newline if necessary
if (p == base || p[-1] != '\n') {
*p++ = '\n';
}
assert(p <= limit);
file_->Append(base, p-base);
file_->Flush();
if (base != buffer) {
delete[] base;
}
break;
}
}
};
} // namespace
// Finally, the hdfs environment
// open a file for sequential reading
Status HdfsEnv::NewSequentialFile(const std::string& fname,
SequentialFile** result) {
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == NULL) {
*result = NULL;
return IOError(fname, errno);
}
*result = dynamic_cast<SequentialFile*>(f);
return Status::OK();
}
// open a file for random reading
Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) {
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == NULL) {
*result = NULL;
return IOError(fname, errno);
}
*result = dynamic_cast<RandomAccessFile*>(f);
return Status::OK();
}
// create a new file for writing
Status HdfsEnv::NewWritableFile(const std::string& fname,
WritableFile** result) {
Status s;
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
if (f == NULL || !f->isValid()) {
*result = NULL;
return IOError(fname, errno);
}
*result = dynamic_cast<WritableFile*>(f);
return Status::OK();
}
bool HdfsEnv::FileExists(const std::string& fname) {
int value = hdfsExists(fileSys_, fname.c_str());
if (value == 0) {
return true;
}
return false;
}
Status HdfsEnv::GetChildren(const std::string& path,
std::vector<std::string>* result) {
int value = hdfsExists(fileSys_, path.c_str());
switch (value) {
case 0: {
int numEntries = 0;
hdfsFileInfo* pHdfsFileInfo = 0;
pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
if (numEntries >= 0) {
for(int i = 0; i < numEntries; i++) {
char* pathname = pHdfsFileInfo[i].mName;
char* filename = rindex(pathname, '/');
if (filename != NULL) {
result->push_back(filename+1);
}
}
if (pHdfsFileInfo != NULL) {
hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
}
} else {
// numEntries < 0 indicates error
Log(mylog, "hdfsListDirectory call failed with error ");
throw HdfsFatalException("hdfsListDirectory call failed negative error.\n");
}
break;
}
case 1: // directory does not exist, exit
break;
default: // anything else should be an error
Log(mylog, "hdfsListDirectory call failed with error ");
throw HdfsFatalException("hdfsListDirectory call failed with error.\n");
}
return Status::OK();
}
Status HdfsEnv::DeleteFile(const std::string& fname) {
if (hdfsDelete(fileSys_, fname.c_str()) == 0) {
return Status::OK();
}
return IOError(fname, errno);
};
Status HdfsEnv::CreateDir(const std::string& name) {
if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
return Status::OK();
}
return IOError(name, errno);
};
Status HdfsEnv::DeleteDir(const std::string& name) {
return DeleteFile(name);
};
Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
*size = 0L;
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
if (pFileInfo != NULL) {
*size = pFileInfo->mSize;
hdfsFreeFileInfo(pFileInfo, 1);
return Status::OK();
}
return IOError(fname, errno);
}
// The rename is not atomic. HDFS does not allow a renaming if the
// target already exists. So, we delete the target before attemting the
// rename.
Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
hdfsDelete(fileSys_, target.c_str());
if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
return Status::OK();
}
return IOError(src, errno);
}
Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
// there isn's a very good way to atomically check and create
// a file via libhdfs
*lock = NULL;
return Status::OK();
}
Status HdfsEnv::UnlockFile(FileLock* lock) {
return Status::OK();
}
Status HdfsEnv::NewLogger(const std::string& fname, Logger** result) {
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
if (f == NULL || !f->isValid()) {
*result = NULL;
return IOError(fname, errno);
}
HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
*result = h;
if (mylog == NULL) {
// mylog = h; // uncomment this for detailed logging
}
return Status::OK();
}
} // namespace leveldb
#endif // LEVELDB_HDFS_FILE_C
#else // USE_HDFS
// dummy placeholders used when HDFS is not available
#include "leveldb/env.h"
#include "hdfs/env_hdfs.h"
namespace leveldb {
Status HdfsEnv::NewSequentialFile(const std::string& fname,
SequentialFile** result) {}
}
#endif
Loading…
Cancel
Save