Ankit Gupta 11 years ago
commit 84cbafa5ce
  1. 3
      HISTORY.md
  2. 2
      Makefile
  3. 2
      build_tools/build_detect_platform
  4. 22
      db/column_family.cc
  5. 8
      db/compaction.cc
  6. 9
      db/compaction.h
  7. 71
      db/compaction_picker.cc
  8. 20
      db/compaction_picker.h
  9. 38
      db/db_impl.cc
  10. 3
      db/db_impl_debug.cc
  11. 64
      db/db_test.cc
  12. 20
      db/version_set.cc
  13. 1
      db/version_set.h
  14. 13
      hdfs/README
  15. 2
      hdfs/env_hdfs.h
  16. 477
      hdfs/hdfs.h
  17. BIN
      hdfs/libhdfs.a
  18. 7
      hdfs/setup.sh
  19. 15
      include/rocksdb/options.h
  20. 2
      include/rocksdb/version.h
  21. 10
      tools/db_crashtest2.py
  22. 37
      util/env_hdfs.cc
  23. 14
      util/env_posix.cc
  24. 8
      util/env_test.cc
  25. 3
      util/options.cc

@ -1,12 +1,13 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased (3.1.0) ## 3.1.0 (05/21/2014)
### Public API changes ### Public API changes
* Replaced ColumnFamilyOptions::table_properties_collectors with ColumnFamilyOptions::table_properties_collector_factories * Replaced ColumnFamilyOptions::table_properties_collectors with ColumnFamilyOptions::table_properties_collector_factories
### New Features ### New Features
* Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open. * Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open.
* FIFO compaction style
## 3.0.0 (05/05/2014) ## 3.0.0 (05/05/2014)

@ -146,7 +146,7 @@ SHARED = $(SHARED1)
else else
# Update db.h if you change these. # Update db.h if you change these.
SHARED_MAJOR = 3 SHARED_MAJOR = 3
SHARED_MINOR = 0 SHARED_MINOR = 2
SHARED1 = ${LIBNAME}.$(PLATFORM_SHARED_EXT) SHARED1 = ${LIBNAME}.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR) SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR) SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)

@ -289,7 +289,7 @@ if test "$USE_HDFS"; then
exit 1 exit 1
fi fi
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS" HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS"
HDFS_LDFLAGS="$HDFS_LDFLAGS -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64" HDFS_LDFLAGS="$HDFS_LDFLAGS -Wl,--no-whole-archive -lhdfs -L$JAVA_HOME/jre/lib/amd64"
HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib" HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib"
HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm" HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm"
COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS" COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS"

@ -12,6 +12,7 @@
#include <vector> #include <vector>
#include <string> #include <string>
#include <algorithm> #include <algorithm>
#include <limits>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/version_set.h" #include "db/version_set.h"
@ -116,6 +117,15 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
collector_factories.push_back( collector_factories.push_back(
std::make_shared<InternalKeyPropertiesCollectorFactory>()); std::make_shared<InternalKeyPropertiesCollectorFactory>());
if (result.compaction_style == kCompactionStyleFIFO) {
result.num_levels = 1;
// since we delete level0 files in FIFO compaction when there are too many
// of them, these options don't really mean anything
result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
}
return result; return result;
} }
@ -196,7 +206,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
options_(*db_options, SanitizeOptions(&internal_comparator_, options_(*db_options, SanitizeOptions(&internal_comparator_,
&internal_filter_policy_, options)), &internal_filter_policy_, options)),
mem_(nullptr), mem_(nullptr),
imm_(options.min_write_buffer_number_to_merge), imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr), super_version_(nullptr),
super_version_number_(0), super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
@ -209,16 +219,20 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
// if dummy_versions is nullptr, then this is a dummy column family. // if dummy_versions is nullptr, then this is a dummy column family.
if (dummy_versions != nullptr) { if (dummy_versions != nullptr) {
internal_stats_.reset(new InternalStats(options.num_levels, db_options->env, internal_stats_.reset(new InternalStats(
db_options->statistics.get())); options_.num_levels, db_options->env, db_options->statistics.get()));
table_cache_.reset( table_cache_.reset(
new TableCache(dbname, &options_, storage_options, table_cache)); new TableCache(dbname, &options_, storage_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) { if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset( compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_)); new UniversalCompactionPicker(&options_, &internal_comparator_));
} else { } else if (options_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset( compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_)); new LevelCompactionPicker(&options_, &internal_comparator_));
} else {
assert(options_.compaction_style == kCompactionStyleFIFO);
compaction_picker_.reset(
new FIFOCompactionPicker(&options_, &internal_comparator_));
} }
Log(options_.info_log, "Options for column family \"%s\":\n", Log(options_.info_log, "Options for column family \"%s\":\n",

@ -29,7 +29,8 @@ static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
Compaction::Compaction(Version* input_version, int level, int out_level, Compaction::Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, uint64_t max_grandparent_overlap_bytes,
bool seek_compaction, bool enable_compression) bool seek_compaction, bool enable_compression,
bool deletion_compaction)
: level_(level), : level_(level),
out_level_(out_level), out_level_(out_level),
max_output_file_size_(target_file_size), max_output_file_size_(target_file_size),
@ -39,6 +40,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
cfd_(input_version_->cfd_), cfd_(input_version_->cfd_),
seek_compaction_(seek_compaction), seek_compaction_(seek_compaction),
enable_compression_(enable_compression), enable_compression_(enable_compression),
deletion_compaction_(deletion_compaction),
grandparent_index_(0), grandparent_index_(0),
seen_key_(false), seen_key_(false),
overlapped_bytes_(0), overlapped_bytes_(0),
@ -83,6 +85,8 @@ bool Compaction::IsTrivialMove() const {
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
} }
bool Compaction::IsDeletionCompaction() const { return deletion_compaction_; }
void Compaction::AddInputDeletions(VersionEdit* edit) { void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) { for (int which = 0; which < 2; which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) { for (size_t i = 0; i < inputs_[which].size(); i++) {
@ -92,6 +96,7 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
} }
bool Compaction::IsBaseLevelForKey(const Slice& user_key) { bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_; return bottommost_level_;
} }
@ -155,6 +160,7 @@ void Compaction::MarkFilesBeingCompacted(bool value) {
// Is this compaction producing files at the bottommost level? // Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool isManual) { void Compaction::SetupBottomMostLevel(bool isManual) {
assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
// If universal compaction style is used and manual // If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that // compaction is occuring, then we are guaranteed that

@ -54,6 +54,9 @@ class Compaction {
// moving a single input file to the next level (no merging or splitting) // moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const; bool IsTrivialMove() const;
// If true, just delete all files in inputs_[0]
bool IsDeletionCompaction() const;
// Add all inputs to this compaction as delete operations to *edit. // Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit); void AddInputDeletions(VersionEdit* edit);
@ -91,11 +94,13 @@ class Compaction {
private: private:
friend class CompactionPicker; friend class CompactionPicker;
friend class UniversalCompactionPicker; friend class UniversalCompactionPicker;
friend class FIFOCompactionPicker;
friend class LevelCompactionPicker; friend class LevelCompactionPicker;
Compaction(Version* input_version, int level, int out_level, Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
bool seek_compaction = false, bool enable_compression = true); bool seek_compaction = false, bool enable_compression = true,
bool deletion_compaction = false);
int level_; int level_;
int out_level_; // levels to which output files are stored int out_level_; // levels to which output files are stored
@ -108,6 +113,8 @@ class Compaction {
bool seek_compaction_; bool seek_compaction_;
bool enable_compression_; bool enable_compression_;
// if true, just delete files in inputs_[0]
bool deletion_compaction_;
// Each compaction reads inputs from "level_" and "level_+1" // Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs

@ -9,6 +9,8 @@
#include "db/compaction_picker.h" #include "db/compaction_picker.h"
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <limits> #include <limits>
#include "util/log_buffer.h" #include "util/log_buffer.h"
#include "util/statistics.h" #include "util/statistics.h"
@ -307,6 +309,9 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
const InternalKey* begin, const InternalKey* begin,
const InternalKey* end, const InternalKey* end,
InternalKey** compaction_end) { InternalKey** compaction_end) {
// CompactionPickerFIFO has its own implementation of compact range
assert(options_->compaction_style != kCompactionStyleFIFO);
std::vector<FileMetaData*> inputs; std::vector<FileMetaData*> inputs;
bool covering_the_whole_range = true; bool covering_the_whole_range = true;
@ -886,4 +891,70 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
return c; return c;
} }
Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
assert(version->NumberLevels() == 1);
uint64_t total_size = 0;
for (const auto& file : version->files_[0]) {
total_size += file->file_size;
}
if (total_size <= options_->compaction_options_fifo.max_table_files_size ||
version->files_[0].size() == 0) {
// total size not exceeded
LogToBuffer(log_buffer,
"[%s] FIFO compaction: nothing to do. Total size %" PRIu64
", max size %" PRIu64 "\n",
version->cfd_->GetName().c_str(), total_size,
options_->compaction_options_fifo.max_table_files_size);
return nullptr;
}
if (compactions_in_progress_[0].size() > 0) {
LogToBuffer(log_buffer,
"[%s] FIFO compaction: Already executing compaction. No need "
"to run parallel compactions since compactions are very fast",
version->cfd_->GetName().c_str());
return nullptr;
}
Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false,
true /* is deletion compaction */);
// delete old files (FIFO)
for (auto ritr = version->files_[0].rbegin();
ritr != version->files_[0].rend(); ++ritr) {
auto f = *ritr;
total_size -= f->file_size;
c->inputs_[0].push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->file_size, tmp_fsize, sizeof(tmp_fsize));
LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
version->cfd_->GetName().c_str(), f->number, tmp_fsize);
if (total_size <= options_->compaction_options_fifo.max_table_files_size) {
break;
}
}
c->MarkFilesBeingCompacted(true);
compactions_in_progress_[0].insert(c);
return c;
}
Compaction* FIFOCompactionPicker::CompactRange(Version* version,
int input_level,
int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
assert(input_level == 0);
assert(output_level == 0);
*compaction_end = nullptr;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get());
auto c = PickCompaction(version, &log_buffer);
log_buffer.FlushBufferToLog();
return c;
}
} // namespace rocksdb } // namespace rocksdb

@ -47,8 +47,9 @@ class CompactionPicker {
// compaction_end will be set to nullptr. // compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called, // Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey! // *compaction_end should point to valid InternalKey!
Compaction* CompactRange(Version* version, int input_level, int output_level, virtual Compaction* CompactRange(Version* version, int input_level,
const InternalKey* begin, const InternalKey* end, int output_level, const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end); InternalKey** compaction_end);
// Free up the files that participated in a compaction // Free up the files that participated in a compaction
@ -162,4 +163,19 @@ class LevelCompactionPicker : public CompactionPicker {
Compaction* PickCompactionBySize(Version* version, int level, double score); Compaction* PickCompactionBySize(Version* version, int level, double score);
}; };
class FIFOCompactionPicker : public CompactionPicker {
public:
FIFOCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
virtual Compaction* CompactRange(Version* version, int input_level,
int output_level, const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) override;
};
} // namespace rocksdb } // namespace rocksdb

@ -1590,7 +1590,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
return s; return s;
} }
int max_level_with_files = 1; int max_level_with_files = 0;
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
Version* base = cfd->current(); Version* base = cfd->current();
@ -1604,6 +1604,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
// in case the compaction is unversal or if we're compacting the // in case the compaction is unversal or if we're compacting the
// bottom-most level, the output level will be the same as input one // bottom-most level, the output level will be the same as input one
if (cfd->options()->compaction_style == kCompactionStyleUniversal || if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO ||
level == max_level_with_files) { level == max_level_with_files) {
s = RunManualCompaction(cfd, level, level, begin, end); s = RunManualCompaction(cfd, level, level, begin, end);
} else { } else {
@ -1754,14 +1755,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
// For universal compaction, we enforce every manual compaction to compact // For universal compaction, we enforce every manual compaction to compact
// all files. // all files.
if (begin == nullptr || if (begin == nullptr ||
cfd->options()->compaction_style == kCompactionStyleUniversal) { cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
manual.begin = nullptr; manual.begin = nullptr;
} else { } else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
manual.begin = &begin_storage; manual.begin = &begin_storage;
} }
if (end == nullptr || if (end == nullptr ||
cfd->options()->compaction_style == kCompactionStyleUniversal) { cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
manual.end = nullptr; manual.end = nullptr;
} else { } else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0)); end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
@ -2150,6 +2153,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (!c) { if (!c) {
// Nothing to do // Nothing to do
LogToBuffer(log_buffer, "Compaction nothing to do"); LogToBuffer(log_buffer, "Compaction nothing to do");
} else if (c->IsDeletionCompaction()) {
// TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
// file if there is alive snapshot pointing to it
assert(c->num_input_files(1) == 0);
assert(c->level() == 0);
assert(c->column_family_data()->options()->compaction_style ==
kCompactionStyleFIFO);
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->number);
}
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
InstallSuperVersion(c->column_family_data(), deletion_state);
LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
c->ReleaseCompactionFiles(status);
*madeProgress = true;
} else if (!is_manual && c->IsTrivialMove()) { } else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level // Move file to next level
assert(c->num_input_files(0) == 1); assert(c->num_input_files(0) == 1);
@ -2219,8 +2240,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (!m->done) { if (!m->done) {
// We only compacted part of the requested range. Update *m // We only compacted part of the requested range. Update *m
// to the range that is left to be compacted. // to the range that is left to be compacted.
// Universal compaction should always compact the whole range // Universal and FIFO compactions should always compact the whole range
assert(m->cfd->options()->compaction_style != kCompactionStyleUniversal); assert(m->cfd->options()->compaction_style != kCompactionStyleUniversal);
assert(m->cfd->options()->compaction_style != kCompactionStyleFIFO);
m->tmp_storage = *manual_end; m->tmp_storage = *manual_end;
m->begin = &m->tmp_storage; m->begin = &m->tmp_storage;
} }
@ -4468,13 +4490,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) { if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->options()->compaction_style == kCompactionStyleUniversal) { if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
Version* current = cfd->current(); Version* current = cfd->current();
for (int i = 1; i < current->NumberLevels(); ++i) { for (int i = 1; i < current->NumberLevels(); ++i) {
int num_files = current->NumLevelFiles(i); int num_files = current->NumLevelFiles(i);
if (num_files > 0) { if (num_files > 0) {
s = Status::InvalidArgument("Not all files are at level 0. Cannot " s = Status::InvalidArgument(
"open with universal compaction style."); "Not all files are at level 0. Cannot "
"open with universal or FIFO compaction style.");
break; break;
} }
} }

@ -81,7 +81,8 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
cfd = cfh->cfd(); cfd = cfh->cfd();
} }
int output_level = int output_level =
(cfd->options()->compaction_style == kCompactionStyleUniversal) (cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO)
? level ? level
: level + 1; : level + 1;
return RunManualCompaction(cfd, level, output_level, begin, end); return RunManualCompaction(cfd, level, output_level, begin, end);

@ -317,6 +317,7 @@ class DBTest {
kCompressedBlockCache, kCompressedBlockCache,
kInfiniteMaxOpenFiles, kInfiniteMaxOpenFiles,
kxxHashChecksum, kxxHashChecksum,
kFIFOCompaction,
kEnd kEnd
}; };
int option_config_; int option_config_;
@ -339,7 +340,8 @@ class DBTest {
kSkipPlainTable = 8, kSkipPlainTable = 8,
kSkipHashIndex = 16, kSkipHashIndex = 16,
kSkipNoSeekToLast = 32, kSkipNoSeekToLast = 32,
kSkipHashCuckoo = 64 kSkipHashCuckoo = 64,
kSkipFIFOCompaction = 128,
}; };
DBTest() : option_config_(kDefault), DBTest() : option_config_(kDefault),
@ -391,6 +393,10 @@ class DBTest {
if ((skip_mask & kSkipHashCuckoo) && (option_config_ == kHashCuckoo)) { if ((skip_mask & kSkipHashCuckoo) && (option_config_ == kHashCuckoo)) {
continue; continue;
} }
if ((skip_mask & kSkipFIFOCompaction) &&
option_config_ == kFIFOCompaction) {
continue;
}
break; break;
} }
@ -503,6 +509,10 @@ class DBTest {
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
break; break;
} }
case kFIFOCompaction: {
options.compaction_style = kCompactionStyleFIFO;
break;
}
case kBlockBasedTableWithPrefixHashIndex: { case kBlockBasedTableWithPrefixHashIndex: {
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.index_type = BlockBasedTableOptions::kHashSearch; table_options.index_type = BlockBasedTableOptions::kHashSearch;
@ -1394,7 +1404,7 @@ TEST(DBTest, GetEncountersEmptyLevel) {
env_->SleepForMicroseconds(1000000); env_->SleepForMicroseconds(1000000);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX
} while (ChangeOptions(kSkipUniversalCompaction)); } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
} }
// KeyMayExist can lead to a few false positives, but not false negatives. // KeyMayExist can lead to a few false positives, but not false negatives.
@ -1460,7 +1470,8 @@ TEST(DBTest, KeyMayExist) {
// KeyMayExist function only checks data in block caches, which is not used // KeyMayExist function only checks data in block caches, which is not used
// by plain table format. // by plain table format.
} while (ChangeOptions(kSkipPlainTable | kSkipHashIndex)); } while (
ChangeOptions(kSkipPlainTable | kSkipHashIndex | kSkipFIFOCompaction));
} }
TEST(DBTest, NonBlockingIteration) { TEST(DBTest, NonBlockingIteration) {
@ -4387,7 +4398,8 @@ TEST(DBTest, ApproximateSizes) {
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0); ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
} }
// ApproximateOffsetOf() is not yet implemented in plain table format. // ApproximateOffsetOf() is not yet implemented in plain table format.
} while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable)); } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
kSkipPlainTable));
} }
TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) { TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
@ -4531,8 +4543,8 @@ TEST(DBTest, HiddenValuesAreRemoved) {
// ApproximateOffsetOf() is not yet implemented in plain table format, // ApproximateOffsetOf() is not yet implemented in plain table format,
// which is used by Size(). // which is used by Size().
// skip HashCuckooRep as it does not support snapshot // skip HashCuckooRep as it does not support snapshot
} while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable | } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction |
kSkipHashCuckoo)); kSkipPlainTable | kSkipHashCuckoo));
} }
TEST(DBTest, CompactBetweenSnapshots) { TEST(DBTest, CompactBetweenSnapshots) {
@ -4588,7 +4600,7 @@ TEST(DBTest, CompactBetweenSnapshots) {
ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ("sixth", Get(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
// skip HashCuckooRep as it does not support snapshot // skip HashCuckooRep as it does not support snapshot
} while (ChangeOptions(kSkipHashCuckoo)); } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction));
} }
TEST(DBTest, DeletionMarkers1) { TEST(DBTest, DeletionMarkers1) {
@ -4694,7 +4706,7 @@ TEST(DBTest, OverlapInLevel0) {
Flush(1); Flush(1);
ASSERT_EQ("3", FilesPerLevel(1)); ASSERT_EQ("3", FilesPerLevel(1));
ASSERT_EQ("NOT_FOUND", Get(1, "600")); ASSERT_EQ("NOT_FOUND", Get(1, "600"));
} while (ChangeOptions(kSkipUniversalCompaction)); } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction));
} }
TEST(DBTest, L0_CompactionBug_Issue44_a) { TEST(DBTest, L0_CompactionBug_Issue44_a) {
@ -6797,6 +6809,42 @@ TEST(DBTest, ChecksumTest) {
ASSERT_EQ("f", Get("e")); ASSERT_EQ("f", Get("e"));
ASSERT_EQ("h", Get("g")); ASSERT_EQ("h", Get("g"));
} }
TEST(DBTest, FIFOCompactionTest) {
for (int iter = 0; iter < 2; ++iter) {
// first iteration -- auto compaction
// second iteration -- manual compaction
Options options;
options.compaction_style = kCompactionStyleFIFO;
options.write_buffer_size = 100 << 10; // 100KB
options.compaction_options_fifo.max_table_files_size = 500 << 10; // 500KB
options.compression = kNoCompression;
options.create_if_missing = true;
if (iter == 1) {
options.disable_auto_compactions = true;
}
DestroyAndReopen(&options);
Random rnd(301);
for (int i = 0; i < 6; ++i) {
for (int j = 0; j < 100; ++j) {
ASSERT_OK(Put(std::to_string(i * 100 + j), RandomString(&rnd, 1024)));
}
// flush should happen here
}
if (iter == 0) {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
} else {
ASSERT_OK(db_->CompactRange(nullptr, nullptr));
}
// only 5 files should survive
ASSERT_EQ(NumTableFilesAtLevel(0), 5);
for (int i = 0; i < 50; ++i) {
// these keys should be deleted in previous compaction
ASSERT_EQ("NOT_FOUND", Get(std::to_string(i)));
}
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -711,7 +711,8 @@ void Version::ComputeCompactionScore(
int max_score_level = 0; int max_score_level = 0;
int num_levels_to_check = int num_levels_to_check =
(cfd_->options()->compaction_style != kCompactionStyleUniversal) (cfd_->options()->compaction_style != kCompactionStyleUniversal &&
cfd_->options()->compaction_style != kCompactionStyleFIFO)
? NumberLevels() - 1 ? NumberLevels() - 1
: 1; : 1;
@ -730,14 +731,18 @@ void Version::ComputeCompactionScore(
// setting, or very high compression ratios, or lots of // setting, or very high compression ratios, or lots of
// overwrites/deletions). // overwrites/deletions).
int numfiles = 0; int numfiles = 0;
uint64_t total_size = 0;
for (unsigned int i = 0; i < files_[level].size(); i++) { for (unsigned int i = 0; i < files_[level].size(); i++) {
if (!files_[level][i]->being_compacted) { if (!files_[level][i]->being_compacted) {
total_size += files_[level][i]->file_size;
numfiles++; numfiles++;
} }
} }
if (cfd_->options()->compaction_style == kCompactionStyleFIFO) {
score = static_cast<double>(total_size) /
cfd_->options()->compaction_options_fifo.max_table_files_size;
} else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
// If we are slowing down writes, then we better compact that first // If we are slowing down writes, then we better compact that first
if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
score = 1000000; score = 1000000;
} else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) { } else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) {
score = 10000; score = 10000;
@ -803,6 +808,10 @@ bool CompareSeqnoDescending(const Version::Fsize& first,
} // anonymous namespace } // anonymous namespace
void Version::UpdateFilesBySize() { void Version::UpdateFilesBySize() {
if (cfd_->options()->compaction_style == kCompactionStyleFIFO) {
// don't need this
return;
}
// No need to sort the highest level because it is never compacted. // No need to sort the highest level because it is never compacted.
int max_level = int max_level =
(cfd_->options()->compaction_style == kCompactionStyleUniversal) (cfd_->options()->compaction_style == kCompactionStyleUniversal)
@ -871,7 +880,8 @@ bool Version::NeedsCompaction() const {
// TODO(sdong): improve this function to be accurate for universal // TODO(sdong): improve this function to be accurate for universal
// compactions. // compactions.
int num_levels_to_check = int num_levels_to_check =
(cfd_->options()->compaction_style != kCompactionStyleUniversal) (cfd_->options()->compaction_style != kCompactionStyleUniversal &&
cfd_->options()->compaction_style != kCompactionStyleFIFO)
? NumberLevels() - 1 ? NumberLevels() - 1
: 1; : 1;
for (int i = 0; i < num_levels_to_check; i++) { for (int i = 0; i < num_levels_to_check; i++) {
@ -1253,7 +1263,7 @@ struct VersionSet::ManifestWriter {
class VersionSet::Builder { class VersionSet::Builder {
private: private:
// Helper to sort v->files_ // Helper to sort v->files_
// kLevel0LevelCompaction -- NewestFirst // kLevel0LevelCompaction -- NewestFirst (also used for FIFO compaction)
// kLevel0UniversalCompaction -- NewestFirstBySeqNo // kLevel0UniversalCompaction -- NewestFirstBySeqNo
// kLevelNon0 -- BySmallestKey // kLevelNon0 -- BySmallestKey
struct FileComparator { struct FileComparator {

@ -217,6 +217,7 @@ class Version {
friend class CompactionPicker; friend class CompactionPicker;
friend class LevelCompactionPicker; friend class LevelCompactionPicker;
friend class UniversalCompactionPicker; friend class UniversalCompactionPicker;
friend class FIFOCompactionPicker;
class LevelFileNumIterator; class LevelFileNumIterator;
class LevelFileIteratorState; class LevelFileIteratorState;

@ -1,19 +1,16 @@
This directory contains the hdfs extensions needed to make rocksdb store This directory contains the hdfs extensions needed to make rocksdb store
files in HDFS. files in HDFS.
The hdfs.h file is copied from the Apache Hadoop 1.0 source code. It has been compiled and testing against CDH 4.4 (2.0.0+1475-1.cdh4.4.0.p0.23~precise-cdh4.4.0).
It defines the libhdfs library
(http://hadoop.apache.org/common/docs/r0.20.2/libhdfs.html) to access The configuration assumes that packages libhdfs0, libhdfs0-dev are
data in HDFS. The libhdfs.a is copied from the Apache Hadoop 1.0 build. installed which basically means that hdfs.h is in /usr/include and libhdfs in /usr/lib
It implements the API defined in hdfs.h. If your hadoop cluster is running
a different hadoop release, then install these two files manually from your
hadoop distribution and then recompile rocksdb.
The env_hdfs.h file defines the rocksdb objects that are needed to talk to an The env_hdfs.h file defines the rocksdb objects that are needed to talk to an
underlying filesystem. underlying filesystem.
If you want to compile rocksdb with hdfs support, please set the following If you want to compile rocksdb with hdfs support, please set the following
enviroment variables appropriately: enviroment variables appropriately (also defined in setup.sh for convenience)
USE_HDFS=1 USE_HDFS=1
JAVA_HOME=/usr/local/jdk-6u22-64 JAVA_HOME=/usr/local/jdk-6u22-64
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/jdk-6u22-64/jre/lib/amd64/server:/usr/local/jdk-6u22-64/jre/lib/amd64/:./snappy/libs LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/jdk-6u22-64/jre/lib/amd64/server:/usr/local/jdk-6u22-64/jre/lib/amd64/:./snappy/libs

@ -14,7 +14,7 @@
#include "rocksdb/status.h" #include "rocksdb/status.h"
#ifdef USE_HDFS #ifdef USE_HDFS
#include "hdfs/hdfs.h" #include <hdfs.h>
namespace rocksdb { namespace rocksdb {

@ -1,477 +0,0 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef LIBHDFS_HDFS_H
#define LIBHDFS_HDFS_H
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <errno.h>
#include <jni.h>
#ifndef O_RDONLY
#define O_RDONLY 1
#endif
#ifndef O_WRONLY
#define O_WRONLY 2
#endif
#ifndef EINTERNAL
#define EINTERNAL 255
#endif
/** All APIs set errno to meaningful values */
#ifdef __cplusplus
extern "C" {
#endif
/**
* Some utility decls used in libhdfs.
*/
typedef int32_t tSize; /// size of data for read/write io ops
typedef time_t tTime; /// time type in seconds
typedef int64_t tOffset;/// offset within the file
typedef uint16_t tPort; /// port
typedef enum tObjectKind {
kObjectKindFile = 'F',
kObjectKindDirectory = 'D',
} tObjectKind;
/**
* The C reflection of org.apache.org.hadoop.FileSystem .
*/
typedef void* hdfsFS;
/**
* The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
*/
enum hdfsStreamType
{
UNINITIALIZED = 0,
INPUT = 1,
OUTPUT = 2,
};
/**
* The 'file-handle' to a file in hdfs.
*/
struct hdfsFile_internal {
void* file;
enum hdfsStreamType type;
};
typedef struct hdfsFile_internal* hdfsFile;
/**
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
* Connect to the hdfs.
* @param host A string containing either a host name, or an ip address
* of the namenode of a hdfs cluster. 'host' should be passed as NULL if
* you want to connect to local filesystem. 'host' should be passed as
* 'default' (and port as 0) to used the 'configured' filesystem
* (core-site/core-default.xml).
* @param port The port on which the server is listening.
* @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port)
* @param groups the groups (these are hadoop domain groups)
* @return Returns a handle to the filesystem or NULL on error.
*/
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const char *groups[], int groups_size );
/**
* hdfsConnect - Connect to a hdfs file system.
* Connect to the hdfs.
* @param host A string containing either a host name, or an ip address
* of the namenode of a hdfs cluster. 'host' should be passed as NULL if
* you want to connect to local filesystem. 'host' should be passed as
* 'default' (and port as 0) to used the 'configured' filesystem
* (core-site/core-default.xml).
* @param port The port on which the server is listening.
* @return Returns a handle to the filesystem or NULL on error.
*/
hdfsFS hdfsConnect(const char* host, tPort port);
/**
* This are the same as hdfsConnectAsUser except that every invocation returns a new FileSystem handle.
* Applications should call a hdfsDisconnect for every call to hdfsConnectAsUserNewInstance.
*/
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char *groups[], int groups_size );
hdfsFS hdfsConnectNewInstance(const char* host, tPort port);
hdfsFS hdfsConnectPath(const char* uri);
/**
* hdfsDisconnect - Disconnect from the hdfs file system.
* Disconnect from hdfs.
* @param fs The configured filesystem handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsDisconnect(hdfsFS fs);
/**
* hdfsOpenFile - Open a hdfs file in given mode.
* @param fs The configured filesystem handle.
* @param path The full path to the file.
* @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
* O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP.
* @param bufferSize Size of buffer for read/write - pass 0 if you want
* to use the default configured values.
* @param replication Block replication - pass 0 if you want to use
* the default configured values.
* @param blocksize Size of block - pass 0 if you want to use the
* default configured values.
* @return Returns the handle to the open file or NULL on error.
*/
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blocksize);
/**
* hdfsCloseFile - Close an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCloseFile(hdfsFS fs, hdfsFile file);
/**
* hdfsExists - Checks if a given path exsits on the filesystem
* @param fs The configured filesystem handle.
* @param path The path to look for
* @return Returns 0 on exists, 1 on non-exists, -1/-2 on error.
*/
int hdfsExists(hdfsFS fs, const char *path);
/**
* hdfsSeek - Seek to given offset in file.
* This works only for files opened in read-only mode.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param desiredPos Offset into the file to seek into.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos);
/**
* hdfsTell - Get the current offset in the file, in bytes.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Current offset, -1 on error.
*/
tOffset hdfsTell(hdfsFS fs, hdfsFile file);
/**
* hdfsRead - Read data from an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param buffer The buffer to copy read bytes into.
* @param length The length of the buffer.
* @return Returns the number of bytes actually read, possibly less
* than than length;-1 on error.
*/
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
/**
* hdfsPread - Positional read of data from an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param position Position from which to read
* @param buffer The buffer to copy read bytes into.
* @param length The length of the buffer.
* @return Returns the number of bytes actually read, possibly less than
* than length;-1 on error.
*/
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
void* buffer, tSize length);
/**
* hdfsWrite - Write data into an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param buffer The data.
* @param length The no. of bytes to write.
* @return Returns the number of bytes written, -1 on error.
*/
tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer,
tSize length);
/**
* hdfsWrite - Flush the data.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsFlush(hdfsFS fs, hdfsFile file);
/**
* hdfsSync - Sync the data to persistent store.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSync(hdfsFS fs, hdfsFile file);
/**
* hdfsGetNumReplicasInPipeline - get number of remaining replicas in
* pipeline
* @param fs The configured filesystem handle
* @param file the file handle
* @return returns the # of datanodes in the write pipeline; -1 on error
*/
int hdfsGetNumCurrentReplicas(hdfsFS, hdfsFile file);
/**
* hdfsAvailable - Number of bytes that can be read from this
* input stream without blocking.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns available bytes; -1 on error.
*/
int hdfsAvailable(hdfsFS fs, hdfsFile file);
/**
* hdfsCopy - Copy file from one filesystem to another.
* @param srcFS The handle to source filesystem.
* @param src The path of source file.
* @param dstFS The handle to destination filesystem.
* @param dst The path of destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
/**
* hdfsMove - Move file from one filesystem to another.
* @param srcFS The handle to source filesystem.
* @param src The path of source file.
* @param dstFS The handle to destination filesystem.
* @param dst The path of destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
/**
* hdfsDelete - Delete file.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsDelete(hdfsFS fs, const char* path);
/**
* hdfsRename - Rename file.
* @param fs The configured filesystem handle.
* @param oldPath The path of the source file.
* @param newPath The path of the destination file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath);
/**
* hdfsGetWorkingDirectory - Get the current working directory for
* the given filesystem.
* @param fs The configured filesystem handle.
* @param buffer The user-buffer to copy path of cwd into.
* @param bufferSize The length of user-buffer.
* @return Returns buffer, NULL on error.
*/
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize);
/**
* hdfsSetWorkingDirectory - Set the working directory. All relative
* paths will be resolved relative to it.
* @param fs The configured filesystem handle.
* @param path The path of the new 'cwd'.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path);
/**
* hdfsCreateDirectory - Make the given file and all non-existent
* parents into directories.
* @param fs The configured filesystem handle.
* @param path The path of the directory.
* @return Returns 0 on success, -1 on error.
*/
int hdfsCreateDirectory(hdfsFS fs, const char* path);
/**
* hdfsSetReplication - Set the replication of the specified
* file to the supplied value
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns 0 on success, -1 on error.
*/
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication);
/**
* hdfsFileInfo - Information about a file/directory.
*/
typedef struct {
tObjectKind mKind; /* file or directory */
char *mName; /* the name of the file */
tTime mLastMod; /* the last modification time for the file in seconds */
tOffset mSize; /* the size of the file in bytes */
short mReplication; /* the count of replicas */
tOffset mBlockSize; /* the block size for the file */
char *mOwner; /* the owner of the file */
char *mGroup; /* the group associated with the file */
short mPermissions; /* the permissions associated with the file */
tTime mLastAccess; /* the last access time for the file in seconds */
} hdfsFileInfo;
/**
* hdfsListDirectory - Get list of files/directories for a given
* directory-path. hdfsFreeFileInfo should be called to deallocate memory if
* the function returns non-NULL value.
* @param fs The configured filesystem handle.
* @param path The path of the directory.
* @param numEntries Set to the number of files/directories in path.
* @return Returns a dynamically-allocated array of hdfsFileInfo
* objects; NULL if empty or on error.
* on error, numEntries will be -1.
*/
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path,
int *numEntries);
/**
* hdfsGetPathInfo - Get information about a path as a (dynamically
* allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be
* called when the pointer is no longer needed.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns a dynamically-allocated hdfsFileInfo object;
* NULL on error.
*/
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path);
/**
* hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields)
* @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
* objects.
* @param numEntries The size of the array.
*/
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries);
/**
* hdfsGetHosts - Get hostnames where a particular block (determined by
* pos & blocksize) of a file is stored. The last element in the array
* is NULL. Due to replication, a single block could be present on
* multiple hosts.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @param start The start of the block.
* @param length The length of the block.
* @return Returns a dynamically-allocated 2-d array of blocks-hosts;
* NULL on error.
*/
char*** hdfsGetHosts(hdfsFS fs, const char* path,
tOffset start, tOffset length);
/**
* hdfsFreeHosts - Free up the structure returned by hdfsGetHosts
* @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
* objects.
* @param numEntries The size of the array.
*/
void hdfsFreeHosts(char ***blockHosts);
/**
* hdfsGetDefaultBlockSize - Get the optimum blocksize.
* @param fs The configured filesystem handle.
* @return Returns the blocksize; -1 on error.
*/
tOffset hdfsGetDefaultBlockSize(hdfsFS fs);
/**
* hdfsGetCapacity - Return the raw capacity of the filesystem.
* @param fs The configured filesystem handle.
* @return Returns the raw-capacity; -1 on error.
*/
tOffset hdfsGetCapacity(hdfsFS fs);
/**
* hdfsGetUsed - Return the total raw size of all files in the filesystem.
* @param fs The configured filesystem handle.
* @return Returns the total-size; -1 on error.
*/
tOffset hdfsGetUsed(hdfsFS fs);
/**
* hdfsChown
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param owner this is a string in Hadoop land. Set to null or "" if only setting group
* @param group this is a string in Hadoop land. Set to null or "" if only setting user
* @return 0 on success else -1
*/
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group);
/**
* hdfsChmod
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param mode the bitmask to set it to
* @return 0 on success else -1
*/
int hdfsChmod(hdfsFS fs, const char* path, short mode);
/**
* hdfsUtime
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param mtime new modification time or 0 for only set access time in seconds
* @param atime new access time or 0 for only set modification time in seconds
* @return 0 on success else -1
*/
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
#ifdef __cplusplus
}
#endif
#endif /*LIBHDFS_HDFS_H*/
/**
* vim: ts=4: sw=4: et
*/

Binary file not shown.

@ -0,0 +1,7 @@
export USE_HDFS=1
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:/usr/lib/hadoop/lib/native
export CLASSPATH=
for f in `find /usr/lib/hadoop-hdfs | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop/client | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done

@ -54,7 +54,17 @@ enum CompressionType : char {
enum CompactionStyle : char { enum CompactionStyle : char {
kCompactionStyleLevel = 0x0, // level based compaction style kCompactionStyleLevel = 0x0, // level based compaction style
kCompactionStyleUniversal = 0x1 // Universal compaction style kCompactionStyleUniversal = 0x1, // Universal compaction style
kCompactionStyleFIFO = 0x2, // FIFO compaction style
};
struct CompactionOptionsFIFO {
// once the total sum of table files reaches this, we will delete the oldest
// table file
// Default: 1GB
uint64_t max_table_files_size;
CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {}
}; };
// Compression options for different compression algorithms like Zlib // Compression options for different compression algorithms like Zlib
@ -429,6 +439,9 @@ struct ColumnFamilyOptions {
// The options needed to support Universal Style compactions // The options needed to support Universal Style compactions
CompactionOptionsUniversal compaction_options_universal; CompactionOptionsUniversal compaction_options_universal;
// The options for FIFO compaction style
CompactionOptionsFIFO compaction_options_fifo;
// Use KeyMayExist API to filter deletes when this is true. // Use KeyMayExist API to filter deletes when this is true.
// If KeyMayExist returns false, i.e. the key definitely does not exist, then // If KeyMayExist returns false, i.e. the key definitely does not exist, then
// the delete is a noop. KeyMayExist only incurs in-memory look up. // the delete is a noop. KeyMayExist only incurs in-memory look up.

@ -6,7 +6,7 @@
// Also update Makefile if you change these // Also update Makefile if you change these
#define ROCKSDB_MAJOR 3 #define ROCKSDB_MAJOR 3
#define ROCKSDB_MINOR 1 #define ROCKSDB_MINOR 2
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 0
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with

@ -60,7 +60,7 @@ def main(argv):
+ str(ops_per_thread) + "\nwrite_buffer_size=" \ + str(ops_per_thread) + "\nwrite_buffer_size=" \
+ str(write_buf_size) + "\n" + str(write_buf_size) + "\n"
total_check_mode = 3 total_check_mode = 4
check_mode = 0 check_mode = 0
while time.time() < exit_time: while time.time() < exit_time:
@ -75,8 +75,14 @@ def main(argv):
# normal run with universal compaction mode # normal run with universal compaction mode
additional_opts = "--ops_per_thread=" + str(ops_per_thread) + \ additional_opts = "--ops_per_thread=" + str(ops_per_thread) + \
" --compaction_style=1" " --compaction_style=1"
elif check_mode == 2:
# normal run with FIFO compaction mode
# ops_per_thread is divided by 5 because FIFO compaction
# style is quite a bit slower on reads with lot of files
additional_opts = "--ops_per_thread=" + str(ops_per_thread / 5) + \
" --compaction_style=2"
else: else:
# nomral run # normal run
additional_opts = "--ops_per_thread=" + str(ops_per_thread) additional_opts = "--ops_per_thread=" + str(ops_per_thread)
dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_') dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_')

@ -15,11 +15,11 @@
#include <sstream> #include <sstream>
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "hdfs/hdfs.h"
#include "hdfs/env_hdfs.h" #include "hdfs/env_hdfs.h"
#define HDFS_EXISTS 0 #define HDFS_EXISTS 0
#define HDFS_DOESNT_EXIST 1 #define HDFS_DOESNT_EXIST -1
#define HDFS_SUCCESS 0
// //
// This file defines an HDFS environment for rocksdb. It uses the libhdfs // This file defines an HDFS environment for rocksdb. It uses the libhdfs
@ -223,7 +223,7 @@ class HdfsWritableFile: public WritableFile {
if (hdfsFlush(fileSys_, hfile_) == -1) { if (hdfsFlush(fileSys_, hfile_) == -1) {
return IOError(filename_, errno); return IOError(filename_, errno);
} }
if (hdfsSync(fileSys_, hfile_) == -1) { if (hdfsHSync(fileSys_, hfile_) == -1) {
return IOError(filename_, errno); return IOError(filename_, errno);
} }
Log(mylog, "[hdfs] HdfsWritableFile Synced %s\n", filename_.c_str()); Log(mylog, "[hdfs] HdfsWritableFile Synced %s\n", filename_.c_str());
@ -398,12 +398,34 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname,
return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv"); return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv");
} }
class HdfsDirectory : public Directory {
public:
explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory() {}
virtual Status Fsync() { return Status::OK(); }
private:
int fd_;
};
Status HdfsEnv::NewDirectory(const std::string& name, Status HdfsEnv::NewDirectory(const std::string& name,
unique_ptr<Directory>* result) { unique_ptr<Directory>* result) {
return Status::NotSupported("NewDirectory not supported on HdfsEnv"); int value = hdfsExists(fileSys_, name.c_str());
switch (value) {
case HDFS_EXISTS:
result->reset(new HdfsDirectory(0));
return Status::OK();
default: // fail if the directory doesn't exist
Log(mylog, "NewDirectory hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " +
std::to_string(value) + " on path " + name +
".\n");
}
} }
bool HdfsEnv::FileExists(const std::string& fname) { bool HdfsEnv::FileExists(const std::string& fname) {
int value = hdfsExists(fileSys_, fname.c_str()); int value = hdfsExists(fileSys_, fname.c_str());
switch (value) { switch (value) {
case HDFS_EXISTS: case HDFS_EXISTS:
@ -413,7 +435,8 @@ bool HdfsEnv::FileExists(const std::string& fname) {
default: // anything else should be an error default: // anything else should be an error
Log(mylog, "FileExists hdfsExists call failed"); Log(mylog, "FileExists hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " + throw HdfsFatalException("hdfsExists call failed with error " +
std::to_string(value) + ".\n"); std::to_string(value) + " on path " + fname +
".\n");
} }
} }
@ -455,7 +478,7 @@ Status HdfsEnv::GetChildren(const std::string& path,
} }
Status HdfsEnv::DeleteFile(const std::string& fname) { Status HdfsEnv::DeleteFile(const std::string& fname) {
if (hdfsDelete(fileSys_, fname.c_str()) == 0) { if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
return Status::OK(); return Status::OK();
} }
return IOError(fname, errno); return IOError(fname, errno);
@ -514,7 +537,7 @@ Status HdfsEnv::GetFileModificationTime(const std::string& fname,
// target already exists. So, we delete the target before attemting the // target already exists. So, we delete the target before attemting the
// rename. // rename.
Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
hdfsDelete(fileSys_, target.c_str()); hdfsDelete(fileSys_, target.c_str(), 1);
if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) { if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
return Status::OK(); return Status::OK();
} }

@ -94,6 +94,17 @@ static Status IOError(const std::string& context, int err_number) {
return Status::IOError(context, strerror(err_number)); return Status::IOError(context, strerror(err_number));
} }
// TODO(sdong): temp logging. Need to help debugging. Remove it when
// the feature is proved to be stable.
inline void PrintThreadInfo(size_t thread_id, pthread_t id) {
unsigned char* ptc = (unsigned char*)(void*)(&id);
fprintf(stdout, "Bg thread %zu terminates 0x", thread_id);
for (size_t i = 0; i < sizeof(id); i++) {
fprintf(stdout, "%02x", (unsigned)(ptc[i]));
}
fprintf(stdout, "\n");
}
#ifdef NDEBUG #ifdef NDEBUG
// empty in release build // empty in release build
#define TEST_KILL_RANDOM(rocksdb_kill_odds) #define TEST_KILL_RANDOM(rocksdb_kill_odds)
@ -1514,8 +1525,7 @@ class PosixEnv : public Env {
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); PthreadCall("unlock", pthread_mutex_unlock(&mu_));
// TODO(sdong): temp logging. Need to help debugging. Remove it when // TODO(sdong): temp logging. Need to help debugging. Remove it when
// the feature is proved to be stable. // the feature is proved to be stable.
fprintf(stdout, "Bg thread %zu terminates %llx\n", thread_id, PrintThreadInfo(thread_id, terminating_thread);
static_cast<long long unsigned int>(terminating_thread));
break; break;
} }
void (*function)(void*) = queue_.front().function; void (*function)(void*) = queue_.front().function;

@ -285,7 +285,7 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) {
// Increase to 5 threads. Task 0 and 2 running. // Increase to 5 threads. Task 0 and 2 running.
env_->SetBackgroundThreads(5, Env::Priority::HIGH); env_->SetBackgroundThreads(5, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[2].IsSleeping()); ASSERT_TRUE(tasks[2].IsSleeping());
@ -330,7 +330,7 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) {
tasks[4].WakeUp(); tasks[4].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
for (size_t i = 5; i < 8; i++) { for (size_t i = 5; i < 8; i++) {
ASSERT_TRUE(tasks[i].IsSleeping()); ASSERT_TRUE(tasks[i].IsSleeping());
} }
@ -360,13 +360,13 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) {
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9], env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9],
Env::Priority::HIGH); Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0); ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0);
ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping()); ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
// Increase to 4 threads. Task 5, 8, 9 running. // Increase to 4 threads. Task 5, 8, 9 running.
env_->SetBackgroundThreads(4, Env::Priority::HIGH); env_->SetBackgroundThreads(4, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[8].IsSleeping()); ASSERT_TRUE(tasks[8].IsSleeping());
ASSERT_TRUE(tasks[9].IsSleeping()); ASSERT_TRUE(tasks[9].IsSleeping());

@ -135,6 +135,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
compaction_style(options.compaction_style), compaction_style(options.compaction_style),
verify_checksums_in_compaction(options.verify_checksums_in_compaction), verify_checksums_in_compaction(options.verify_checksums_in_compaction),
compaction_options_universal(options.compaction_options_universal), compaction_options_universal(options.compaction_options_universal),
compaction_options_fifo(options.compaction_options_fifo),
filter_deletes(options.filter_deletes), filter_deletes(options.filter_deletes),
max_sequential_skip_in_iterations( max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations), options.max_sequential_skip_in_iterations),
@ -413,6 +414,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
Log(log, Log(log,
"Options.compaction_options_universal.compression_size_percent: %u", "Options.compaction_options_universal.compression_size_percent: %u",
compaction_options_universal.compression_size_percent); compaction_options_universal.compression_size_percent);
Log(log, "Options.compaction_options_fifo.max_table_files_size: %" PRIu64,
compaction_options_fifo.max_table_files_size);
std::string collector_names; std::string collector_names;
for (const auto& collector_factory : table_properties_collector_factories) { for (const auto& collector_factory : table_properties_collector_factories) {
collector_names.append(collector_factory->Name()); collector_names.append(collector_factory->Name());

Loading…
Cancel
Save