diff --git a/HISTORY.md b/HISTORY.md index f9bdd7c98..9b014c1f2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,12 +1,13 @@ # Rocksdb Change Log -## Unreleased (3.1.0) +## 3.1.0 (05/21/2014) ### Public API changes * Replaced ColumnFamilyOptions::table_properties_collectors with ColumnFamilyOptions::table_properties_collector_factories ### New Features * Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open. +* FIFO compaction style ## 3.0.0 (05/05/2014) diff --git a/Makefile b/Makefile index 1dc741e5c..c148aee7e 100644 --- a/Makefile +++ b/Makefile @@ -146,7 +146,7 @@ SHARED = $(SHARED1) else # Update db.h if you change these. SHARED_MAJOR = 3 -SHARED_MINOR = 0 +SHARED_MINOR = 2 SHARED1 = ${LIBNAME}.$(PLATFORM_SHARED_EXT) SHARED2 = $(SHARED1).$(SHARED_MAJOR) SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR) diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index 88aa216ad..c8ed00487 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -289,7 +289,7 @@ if test "$USE_HDFS"; then exit 1 fi HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS" - HDFS_LDFLAGS="$HDFS_LDFLAGS -Wl,--no-whole-archive hdfs/libhdfs.a -L$JAVA_HOME/jre/lib/amd64" + HDFS_LDFLAGS="$HDFS_LDFLAGS -Wl,--no-whole-archive -lhdfs -L$JAVA_HOME/jre/lib/amd64" HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib" HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm" COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS" diff --git a/db/column_family.cc b/db/column_family.cc index 39c37b9e8..9cf0c0d49 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include "db/db_impl.h" #include "db/version_set.h" @@ -116,6 +117,15 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, collector_factories.push_back( std::make_shared()); + if (result.compaction_style == kCompactionStyleFIFO) { + result.num_levels = 1; + // since we delete level0 files in FIFO compaction when there are too many + // of them, these options don't really mean anything + result.level0_file_num_compaction_trigger = std::numeric_limits::max(); + result.level0_slowdown_writes_trigger = std::numeric_limits::max(); + result.level0_stop_writes_trigger = std::numeric_limits::max(); + } + return result; } @@ -196,7 +206,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, options_(*db_options, SanitizeOptions(&internal_comparator_, &internal_filter_policy_, options)), mem_(nullptr), - imm_(options.min_write_buffer_number_to_merge), + imm_(options_.min_write_buffer_number_to_merge), super_version_(nullptr), super_version_number_(0), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), @@ -209,16 +219,20 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, // if dummy_versions is nullptr, then this is a dummy column family. if (dummy_versions != nullptr) { - internal_stats_.reset(new InternalStats(options.num_levels, db_options->env, - db_options->statistics.get())); + internal_stats_.reset(new InternalStats( + options_.num_levels, db_options->env, db_options->statistics.get())); table_cache_.reset( new TableCache(dbname, &options_, storage_options, table_cache)); if (options_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset( new UniversalCompactionPicker(&options_, &internal_comparator_)); - } else { + } else if (options_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( new LevelCompactionPicker(&options_, &internal_comparator_)); + } else { + assert(options_.compaction_style == kCompactionStyleFIFO); + compaction_picker_.reset( + new FIFOCompactionPicker(&options_, &internal_comparator_)); } Log(options_.info_log, "Options for column family \"%s\":\n", diff --git a/db/compaction.cc b/db/compaction.cc index 962ce1232..a8caa59ef 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -29,7 +29,8 @@ static uint64_t TotalFileSize(const std::vector& files) { Compaction::Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - bool seek_compaction, bool enable_compression) + bool seek_compaction, bool enable_compression, + bool deletion_compaction) : level_(level), out_level_(out_level), max_output_file_size_(target_file_size), @@ -39,6 +40,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, cfd_(input_version_->cfd_), seek_compaction_(seek_compaction), enable_compression_(enable_compression), + deletion_compaction_(deletion_compaction), grandparent_index_(0), seen_key_(false), overlapped_bytes_(0), @@ -83,6 +85,8 @@ bool Compaction::IsTrivialMove() const { TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); } +bool Compaction::IsDeletionCompaction() const { return deletion_compaction_; } + void Compaction::AddInputDeletions(VersionEdit* edit) { for (int which = 0; which < 2; which++) { for (size_t i = 0; i < inputs_[which].size(); i++) { @@ -92,6 +96,7 @@ void Compaction::AddInputDeletions(VersionEdit* edit) { } bool Compaction::IsBaseLevelForKey(const Slice& user_key) { + assert(cfd_->options()->compaction_style != kCompactionStyleFIFO); if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { return bottommost_level_; } @@ -155,6 +160,7 @@ void Compaction::MarkFilesBeingCompacted(bool value) { // Is this compaction producing files at the bottommost level? void Compaction::SetupBottomMostLevel(bool isManual) { + assert(cfd_->options()->compaction_style != kCompactionStyleFIFO); if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { // If universal compaction style is used and manual // compaction is occuring, then we are guaranteed that diff --git a/db/compaction.h b/db/compaction.h index 8fd95f909..aaa402303 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -54,6 +54,9 @@ class Compaction { // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; + // If true, just delete all files in inputs_[0] + bool IsDeletionCompaction() const; + // Add all inputs to this compaction as delete operations to *edit. void AddInputDeletions(VersionEdit* edit); @@ -91,11 +94,13 @@ class Compaction { private: friend class CompactionPicker; friend class UniversalCompactionPicker; + friend class FIFOCompactionPicker; friend class LevelCompactionPicker; Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - bool seek_compaction = false, bool enable_compression = true); + bool seek_compaction = false, bool enable_compression = true, + bool deletion_compaction = false); int level_; int out_level_; // levels to which output files are stored @@ -108,6 +113,8 @@ class Compaction { bool seek_compaction_; bool enable_compression_; + // if true, just delete files in inputs_[0] + bool deletion_compaction_; // Each compaction reads inputs from "level_" and "level_+1" std::vector inputs_[2]; // The two sets of inputs diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index a8700bbbc..3416a0bac 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -9,6 +9,8 @@ #include "db/compaction_picker.h" +#define __STDC_FORMAT_MACROS +#include #include #include "util/log_buffer.h" #include "util/statistics.h" @@ -307,6 +309,9 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { + // CompactionPickerFIFO has its own implementation of compact range + assert(options_->compaction_style != kCompactionStyleFIFO); + std::vector inputs; bool covering_the_whole_range = true; @@ -886,4 +891,70 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( return c; } +Compaction* FIFOCompactionPicker::PickCompaction(Version* version, + LogBuffer* log_buffer) { + assert(version->NumberLevels() == 1); + uint64_t total_size = 0; + for (const auto& file : version->files_[0]) { + total_size += file->file_size; + } + + if (total_size <= options_->compaction_options_fifo.max_table_files_size || + version->files_[0].size() == 0) { + // total size not exceeded + LogToBuffer(log_buffer, + "[%s] FIFO compaction: nothing to do. Total size %" PRIu64 + ", max size %" PRIu64 "\n", + version->cfd_->GetName().c_str(), total_size, + options_->compaction_options_fifo.max_table_files_size); + return nullptr; + } + + if (compactions_in_progress_[0].size() > 0) { + LogToBuffer(log_buffer, + "[%s] FIFO compaction: Already executing compaction. No need " + "to run parallel compactions since compactions are very fast", + version->cfd_->GetName().c_str()); + return nullptr; + } + + Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false, + true /* is deletion compaction */); + // delete old files (FIFO) + for (auto ritr = version->files_[0].rbegin(); + ritr != version->files_[0].rend(); ++ritr) { + auto f = *ritr; + total_size -= f->file_size; + c->inputs_[0].push_back(f); + char tmp_fsize[16]; + AppendHumanBytes(f->file_size, tmp_fsize, sizeof(tmp_fsize)); + LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 + " with size %s for deletion", + version->cfd_->GetName().c_str(), f->number, tmp_fsize); + if (total_size <= options_->compaction_options_fifo.max_table_files_size) { + break; + } + } + + c->MarkFilesBeingCompacted(true); + compactions_in_progress_[0].insert(c); + + return c; +} + +Compaction* FIFOCompactionPicker::CompactRange(Version* version, + int input_level, + int output_level, + const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end) { + assert(input_level == 0); + assert(output_level == 0); + *compaction_end = nullptr; + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get()); + auto c = PickCompaction(version, &log_buffer); + log_buffer.FlushBufferToLog(); + return c; +} + } // namespace rocksdb diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 6527ef967..65b1bc37a 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -47,9 +47,10 @@ class CompactionPicker { // compaction_end will be set to nullptr. // Client is responsible for compaction_end storage -- when called, // *compaction_end should point to valid InternalKey! - Compaction* CompactRange(Version* version, int input_level, int output_level, - const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end); + virtual Compaction* CompactRange(Version* version, int input_level, + int output_level, const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end); // Free up the files that participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); @@ -162,4 +163,19 @@ class LevelCompactionPicker : public CompactionPicker { Compaction* PickCompactionBySize(Version* version, int level, double score); }; +class FIFOCompactionPicker : public CompactionPicker { + public: + FIFOCompactionPicker(const Options* options, + const InternalKeyComparator* icmp) + : CompactionPicker(options, icmp) {} + + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) override; + + virtual Compaction* CompactRange(Version* version, int input_level, + int output_level, const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end) override; +}; + } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index bdc1832dc..f87442767 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1590,7 +1590,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, return s; } - int max_level_with_files = 1; + int max_level_with_files = 0; { MutexLock l(&mutex_); Version* base = cfd->current(); @@ -1604,6 +1604,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, // in case the compaction is unversal or if we're compacting the // bottom-most level, the output level will be the same as input one if (cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO || level == max_level_with_files) { s = RunManualCompaction(cfd, level, level, begin, end); } else { @@ -1754,14 +1755,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // For universal compaction, we enforce every manual compaction to compact // all files. if (begin == nullptr || - cfd->options()->compaction_style == kCompactionStyleUniversal) { + cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO) { manual.begin = nullptr; } else { begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); manual.begin = &begin_storage; } if (end == nullptr || - cfd->options()->compaction_style == kCompactionStyleUniversal) { + cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO) { manual.end = nullptr; } else { end_storage = InternalKey(*end, 0, static_cast(0)); @@ -2150,6 +2153,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (!c) { // Nothing to do LogToBuffer(log_buffer, "Compaction nothing to do"); + } else if (c->IsDeletionCompaction()) { + // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old + // file if there is alive snapshot pointing to it + assert(c->num_input_files(1) == 0); + assert(c->level() == 0); + assert(c->column_family_data()->options()->compaction_style == + kCompactionStyleFIFO); + for (const auto& f : *c->inputs(0)) { + c->edit()->DeleteFile(c->level(), f->number); + } + status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, + db_directory_.get()); + InstallSuperVersion(c->column_family_data(), deletion_state); + LogToBuffer(log_buffer, "[%s] Deleted %d files\n", + c->column_family_data()->GetName().c_str(), + c->num_input_files(0)); + c->ReleaseCompactionFiles(status); + *madeProgress = true; } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); @@ -2219,8 +2240,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (!m->done) { // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. - // Universal compaction should always compact the whole range + // Universal and FIFO compactions should always compact the whole range assert(m->cfd->options()->compaction_style != kCompactionStyleUniversal); + assert(m->cfd->options()->compaction_style != kCompactionStyleFIFO); m->tmp_storage = *manual_end; m->begin = &m->tmp_storage; } @@ -4468,13 +4490,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - if (cfd->options()->compaction_style == kCompactionStyleUniversal) { + if (cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO) { Version* current = cfd->current(); for (int i = 1; i < current->NumberLevels(); ++i) { int num_files = current->NumLevelFiles(i); if (num_files > 0) { - s = Status::InvalidArgument("Not all files are at level 0. Cannot " - "open with universal compaction style."); + s = Status::InvalidArgument( + "Not all files are at level 0. Cannot " + "open with universal or FIFO compaction style."); break; } } diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index d6551b45a..927a01a04 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -81,7 +81,8 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, cfd = cfh->cfd(); } int output_level = - (cfd->options()->compaction_style == kCompactionStyleUniversal) + (cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO) ? level : level + 1; return RunManualCompaction(cfd, level, output_level, begin, end); diff --git a/db/db_test.cc b/db/db_test.cc index 05403fc07..5e30b33f7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -317,6 +317,7 @@ class DBTest { kCompressedBlockCache, kInfiniteMaxOpenFiles, kxxHashChecksum, + kFIFOCompaction, kEnd }; int option_config_; @@ -339,7 +340,8 @@ class DBTest { kSkipPlainTable = 8, kSkipHashIndex = 16, kSkipNoSeekToLast = 32, - kSkipHashCuckoo = 64 + kSkipHashCuckoo = 64, + kSkipFIFOCompaction = 128, }; DBTest() : option_config_(kDefault), @@ -391,6 +393,10 @@ class DBTest { if ((skip_mask & kSkipHashCuckoo) && (option_config_ == kHashCuckoo)) { continue; } + if ((skip_mask & kSkipFIFOCompaction) && + option_config_ == kFIFOCompaction) { + continue; + } break; } @@ -503,6 +509,10 @@ class DBTest { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); break; } + case kFIFOCompaction: { + options.compaction_style = kCompactionStyleFIFO; + break; + } case kBlockBasedTableWithPrefixHashIndex: { BlockBasedTableOptions table_options; table_options.index_type = BlockBasedTableOptions::kHashSearch; @@ -1394,7 +1404,7 @@ TEST(DBTest, GetEncountersEmptyLevel) { env_->SleepForMicroseconds(1000000); ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX - } while (ChangeOptions(kSkipUniversalCompaction)); + } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction)); } // KeyMayExist can lead to a few false positives, but not false negatives. @@ -1460,7 +1470,8 @@ TEST(DBTest, KeyMayExist) { // KeyMayExist function only checks data in block caches, which is not used // by plain table format. - } while (ChangeOptions(kSkipPlainTable | kSkipHashIndex)); + } while ( + ChangeOptions(kSkipPlainTable | kSkipHashIndex | kSkipFIFOCompaction)); } TEST(DBTest, NonBlockingIteration) { @@ -4387,7 +4398,8 @@ TEST(DBTest, ApproximateSizes) { ASSERT_GT(NumTableFilesAtLevel(1, 1), 0); } // ApproximateOffsetOf() is not yet implemented in plain table format. - } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable)); + } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction | + kSkipPlainTable)); } TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) { @@ -4531,8 +4543,8 @@ TEST(DBTest, HiddenValuesAreRemoved) { // ApproximateOffsetOf() is not yet implemented in plain table format, // which is used by Size(). // skip HashCuckooRep as it does not support snapshot - } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable | - kSkipHashCuckoo)); + } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction | + kSkipPlainTable | kSkipHashCuckoo)); } TEST(DBTest, CompactBetweenSnapshots) { @@ -4588,7 +4600,7 @@ TEST(DBTest, CompactBetweenSnapshots) { ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]"); // skip HashCuckooRep as it does not support snapshot - } while (ChangeOptions(kSkipHashCuckoo)); + } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction)); } TEST(DBTest, DeletionMarkers1) { @@ -4694,7 +4706,7 @@ TEST(DBTest, OverlapInLevel0) { Flush(1); ASSERT_EQ("3", FilesPerLevel(1)); ASSERT_EQ("NOT_FOUND", Get(1, "600")); - } while (ChangeOptions(kSkipUniversalCompaction)); + } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction)); } TEST(DBTest, L0_CompactionBug_Issue44_a) { @@ -6797,6 +6809,42 @@ TEST(DBTest, ChecksumTest) { ASSERT_EQ("f", Get("e")); ASSERT_EQ("h", Get("g")); } + +TEST(DBTest, FIFOCompactionTest) { + for (int iter = 0; iter < 2; ++iter) { + // first iteration -- auto compaction + // second iteration -- manual compaction + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.write_buffer_size = 100 << 10; // 100KB + options.compaction_options_fifo.max_table_files_size = 500 << 10; // 500KB + options.compression = kNoCompression; + options.create_if_missing = true; + if (iter == 1) { + options.disable_auto_compactions = true; + } + DestroyAndReopen(&options); + + Random rnd(301); + for (int i = 0; i < 6; ++i) { + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(std::to_string(i * 100 + j), RandomString(&rnd, 1024))); + } + // flush should happen here + } + if (iter == 0) { + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + } else { + ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + } + // only 5 files should survive + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + for (int i = 0; i < 50; ++i) { + // these keys should be deleted in previous compaction + ASSERT_EQ("NOT_FOUND", Get(std::to_string(i))); + } + } +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 02e9aa152..5327cf55f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -711,7 +711,8 @@ void Version::ComputeCompactionScore( int max_score_level = 0; int num_levels_to_check = - (cfd_->options()->compaction_style != kCompactionStyleUniversal) + (cfd_->options()->compaction_style != kCompactionStyleUniversal && + cfd_->options()->compaction_style != kCompactionStyleFIFO) ? NumberLevels() - 1 : 1; @@ -730,14 +731,18 @@ void Version::ComputeCompactionScore( // setting, or very high compression ratios, or lots of // overwrites/deletions). int numfiles = 0; + uint64_t total_size = 0; for (unsigned int i = 0; i < files_[level].size(); i++) { if (!files_[level][i]->being_compacted) { + total_size += files_[level][i]->file_size; numfiles++; } } - - // If we are slowing down writes, then we better compact that first - if (numfiles >= cfd_->options()->level0_stop_writes_trigger) { + if (cfd_->options()->compaction_style == kCompactionStyleFIFO) { + score = static_cast(total_size) / + cfd_->options()->compaction_options_fifo.max_table_files_size; + } else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) { + // If we are slowing down writes, then we better compact that first score = 1000000; } else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) { score = 10000; @@ -803,6 +808,10 @@ bool CompareSeqnoDescending(const Version::Fsize& first, } // anonymous namespace void Version::UpdateFilesBySize() { + if (cfd_->options()->compaction_style == kCompactionStyleFIFO) { + // don't need this + return; + } // No need to sort the highest level because it is never compacted. int max_level = (cfd_->options()->compaction_style == kCompactionStyleUniversal) @@ -871,7 +880,8 @@ bool Version::NeedsCompaction() const { // TODO(sdong): improve this function to be accurate for universal // compactions. int num_levels_to_check = - (cfd_->options()->compaction_style != kCompactionStyleUniversal) + (cfd_->options()->compaction_style != kCompactionStyleUniversal && + cfd_->options()->compaction_style != kCompactionStyleFIFO) ? NumberLevels() - 1 : 1; for (int i = 0; i < num_levels_to_check; i++) { @@ -1253,7 +1263,7 @@ struct VersionSet::ManifestWriter { class VersionSet::Builder { private: // Helper to sort v->files_ - // kLevel0LevelCompaction -- NewestFirst + // kLevel0LevelCompaction -- NewestFirst (also used for FIFO compaction) // kLevel0UniversalCompaction -- NewestFirstBySeqNo // kLevelNon0 -- BySmallestKey struct FileComparator { diff --git a/db/version_set.h b/db/version_set.h index 13a138341..ffadb5813 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -217,6 +217,7 @@ class Version { friend class CompactionPicker; friend class LevelCompactionPicker; friend class UniversalCompactionPicker; + friend class FIFOCompactionPicker; class LevelFileNumIterator; class LevelFileIteratorState; diff --git a/hdfs/README b/hdfs/README index 9b7d0a64d..f4f1106e4 100644 --- a/hdfs/README +++ b/hdfs/README @@ -1,19 +1,16 @@ This directory contains the hdfs extensions needed to make rocksdb store files in HDFS. -The hdfs.h file is copied from the Apache Hadoop 1.0 source code. -It defines the libhdfs library -(http://hadoop.apache.org/common/docs/r0.20.2/libhdfs.html) to access -data in HDFS. The libhdfs.a is copied from the Apache Hadoop 1.0 build. -It implements the API defined in hdfs.h. If your hadoop cluster is running -a different hadoop release, then install these two files manually from your -hadoop distribution and then recompile rocksdb. +It has been compiled and testing against CDH 4.4 (2.0.0+1475-1.cdh4.4.0.p0.23~precise-cdh4.4.0). + +The configuration assumes that packages libhdfs0, libhdfs0-dev are +installed which basically means that hdfs.h is in /usr/include and libhdfs in /usr/lib The env_hdfs.h file defines the rocksdb objects that are needed to talk to an underlying filesystem. If you want to compile rocksdb with hdfs support, please set the following -enviroment variables appropriately: +enviroment variables appropriately (also defined in setup.sh for convenience) USE_HDFS=1 JAVA_HOME=/usr/local/jdk-6u22-64 LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/jdk-6u22-64/jre/lib/amd64/server:/usr/local/jdk-6u22-64/jre/lib/amd64/:./snappy/libs diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index e6fb8db12..5e7de77d3 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -14,7 +14,7 @@ #include "rocksdb/status.h" #ifdef USE_HDFS -#include "hdfs/hdfs.h" +#include namespace rocksdb { diff --git a/hdfs/hdfs.h b/hdfs/hdfs.h deleted file mode 100644 index 8e8dfecb8..000000000 --- a/hdfs/hdfs.h +++ /dev/null @@ -1,477 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// -#ifndef LIBHDFS_HDFS_H -#define LIBHDFS_HDFS_H - -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include - -#ifndef O_RDONLY -#define O_RDONLY 1 -#endif - -#ifndef O_WRONLY -#define O_WRONLY 2 -#endif - -#ifndef EINTERNAL -#define EINTERNAL 255 -#endif - - -/** All APIs set errno to meaningful values */ -#ifdef __cplusplus -extern "C" { -#endif - - /** - * Some utility decls used in libhdfs. - */ - - typedef int32_t tSize; /// size of data for read/write io ops - typedef time_t tTime; /// time type in seconds - typedef int64_t tOffset;/// offset within the file - typedef uint16_t tPort; /// port - typedef enum tObjectKind { - kObjectKindFile = 'F', - kObjectKindDirectory = 'D', - } tObjectKind; - - - /** - * The C reflection of org.apache.org.hadoop.FileSystem . - */ - typedef void* hdfsFS; - - - /** - * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream . - */ - enum hdfsStreamType - { - UNINITIALIZED = 0, - INPUT = 1, - OUTPUT = 2, - }; - - - /** - * The 'file-handle' to a file in hdfs. - */ - struct hdfsFile_internal { - void* file; - enum hdfsStreamType type; - }; - typedef struct hdfsFile_internal* hdfsFile; - - - /** - * hdfsConnectAsUser - Connect to a hdfs file system as a specific user - * Connect to the hdfs. - * @param host A string containing either a host name, or an ip address - * of the namenode of a hdfs cluster. 'host' should be passed as NULL if - * you want to connect to local filesystem. 'host' should be passed as - * 'default' (and port as 0) to used the 'configured' filesystem - * (core-site/core-default.xml). - * @param port The port on which the server is listening. - * @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port) - * @param groups the groups (these are hadoop domain groups) - * @return Returns a handle to the filesystem or NULL on error. - */ - hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user , const char *groups[], int groups_size ); - - - /** - * hdfsConnect - Connect to a hdfs file system. - * Connect to the hdfs. - * @param host A string containing either a host name, or an ip address - * of the namenode of a hdfs cluster. 'host' should be passed as NULL if - * you want to connect to local filesystem. 'host' should be passed as - * 'default' (and port as 0) to used the 'configured' filesystem - * (core-site/core-default.xml). - * @param port The port on which the server is listening. - * @return Returns a handle to the filesystem or NULL on error. - */ - hdfsFS hdfsConnect(const char* host, tPort port); - - - /** - * This are the same as hdfsConnectAsUser except that every invocation returns a new FileSystem handle. - * Applications should call a hdfsDisconnect for every call to hdfsConnectAsUserNewInstance. - */ - hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user , const char *groups[], int groups_size ); - hdfsFS hdfsConnectNewInstance(const char* host, tPort port); - hdfsFS hdfsConnectPath(const char* uri); - - /** - * hdfsDisconnect - Disconnect from the hdfs file system. - * Disconnect from hdfs. - * @param fs The configured filesystem handle. - * @return Returns 0 on success, -1 on error. - */ - int hdfsDisconnect(hdfsFS fs); - - - /** - * hdfsOpenFile - Open a hdfs file in given mode. - * @param fs The configured filesystem handle. - * @param path The full path to the file. - * @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT), - * O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP. - * @param bufferSize Size of buffer for read/write - pass 0 if you want - * to use the default configured values. - * @param replication Block replication - pass 0 if you want to use - * the default configured values. - * @param blocksize Size of block - pass 0 if you want to use the - * default configured values. - * @return Returns the handle to the open file or NULL on error. - */ - hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, - int bufferSize, short replication, tSize blocksize); - - - /** - * hdfsCloseFile - Close an open file. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @return Returns 0 on success, -1 on error. - */ - int hdfsCloseFile(hdfsFS fs, hdfsFile file); - - - /** - * hdfsExists - Checks if a given path exsits on the filesystem - * @param fs The configured filesystem handle. - * @param path The path to look for - * @return Returns 0 on exists, 1 on non-exists, -1/-2 on error. - */ - int hdfsExists(hdfsFS fs, const char *path); - - - /** - * hdfsSeek - Seek to given offset in file. - * This works only for files opened in read-only mode. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @param desiredPos Offset into the file to seek into. - * @return Returns 0 on success, -1 on error. - */ - int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos); - - - /** - * hdfsTell - Get the current offset in the file, in bytes. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @return Current offset, -1 on error. - */ - tOffset hdfsTell(hdfsFS fs, hdfsFile file); - - - /** - * hdfsRead - Read data from an open file. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @param buffer The buffer to copy read bytes into. - * @param length The length of the buffer. - * @return Returns the number of bytes actually read, possibly less - * than than length;-1 on error. - */ - tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); - - - /** - * hdfsPread - Positional read of data from an open file. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @param position Position from which to read - * @param buffer The buffer to copy read bytes into. - * @param length The length of the buffer. - * @return Returns the number of bytes actually read, possibly less than - * than length;-1 on error. - */ - tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, - void* buffer, tSize length); - - - /** - * hdfsWrite - Write data into an open file. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @param buffer The data. - * @param length The no. of bytes to write. - * @return Returns the number of bytes written, -1 on error. - */ - tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, - tSize length); - - - /** - * hdfsWrite - Flush the data. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @return Returns 0 on success, -1 on error. - */ - int hdfsFlush(hdfsFS fs, hdfsFile file); - - /** - * hdfsSync - Sync the data to persistent store. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @return Returns 0 on success, -1 on error. - */ - int hdfsSync(hdfsFS fs, hdfsFile file); - - /** - * hdfsGetNumReplicasInPipeline - get number of remaining replicas in - * pipeline - * @param fs The configured filesystem handle - * @param file the file handle - * @return returns the # of datanodes in the write pipeline; -1 on error - */ - int hdfsGetNumCurrentReplicas(hdfsFS, hdfsFile file); - - /** - * hdfsAvailable - Number of bytes that can be read from this - * input stream without blocking. - * @param fs The configured filesystem handle. - * @param file The file handle. - * @return Returns available bytes; -1 on error. - */ - int hdfsAvailable(hdfsFS fs, hdfsFile file); - - - /** - * hdfsCopy - Copy file from one filesystem to another. - * @param srcFS The handle to source filesystem. - * @param src The path of source file. - * @param dstFS The handle to destination filesystem. - * @param dst The path of destination file. - * @return Returns 0 on success, -1 on error. - */ - int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); - - - /** - * hdfsMove - Move file from one filesystem to another. - * @param srcFS The handle to source filesystem. - * @param src The path of source file. - * @param dstFS The handle to destination filesystem. - * @param dst The path of destination file. - * @return Returns 0 on success, -1 on error. - */ - int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); - - - /** - * hdfsDelete - Delete file. - * @param fs The configured filesystem handle. - * @param path The path of the file. - * @return Returns 0 on success, -1 on error. - */ - int hdfsDelete(hdfsFS fs, const char* path); - - - /** - * hdfsRename - Rename file. - * @param fs The configured filesystem handle. - * @param oldPath The path of the source file. - * @param newPath The path of the destination file. - * @return Returns 0 on success, -1 on error. - */ - int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath); - - - /** - * hdfsGetWorkingDirectory - Get the current working directory for - * the given filesystem. - * @param fs The configured filesystem handle. - * @param buffer The user-buffer to copy path of cwd into. - * @param bufferSize The length of user-buffer. - * @return Returns buffer, NULL on error. - */ - char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize); - - - /** - * hdfsSetWorkingDirectory - Set the working directory. All relative - * paths will be resolved relative to it. - * @param fs The configured filesystem handle. - * @param path The path of the new 'cwd'. - * @return Returns 0 on success, -1 on error. - */ - int hdfsSetWorkingDirectory(hdfsFS fs, const char* path); - - - /** - * hdfsCreateDirectory - Make the given file and all non-existent - * parents into directories. - * @param fs The configured filesystem handle. - * @param path The path of the directory. - * @return Returns 0 on success, -1 on error. - */ - int hdfsCreateDirectory(hdfsFS fs, const char* path); - - - /** - * hdfsSetReplication - Set the replication of the specified - * file to the supplied value - * @param fs The configured filesystem handle. - * @param path The path of the file. - * @return Returns 0 on success, -1 on error. - */ - int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication); - - - /** - * hdfsFileInfo - Information about a file/directory. - */ - typedef struct { - tObjectKind mKind; /* file or directory */ - char *mName; /* the name of the file */ - tTime mLastMod; /* the last modification time for the file in seconds */ - tOffset mSize; /* the size of the file in bytes */ - short mReplication; /* the count of replicas */ - tOffset mBlockSize; /* the block size for the file */ - char *mOwner; /* the owner of the file */ - char *mGroup; /* the group associated with the file */ - short mPermissions; /* the permissions associated with the file */ - tTime mLastAccess; /* the last access time for the file in seconds */ - } hdfsFileInfo; - - - /** - * hdfsListDirectory - Get list of files/directories for a given - * directory-path. hdfsFreeFileInfo should be called to deallocate memory if - * the function returns non-NULL value. - * @param fs The configured filesystem handle. - * @param path The path of the directory. - * @param numEntries Set to the number of files/directories in path. - * @return Returns a dynamically-allocated array of hdfsFileInfo - * objects; NULL if empty or on error. - * on error, numEntries will be -1. - */ - hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, - int *numEntries); - - - /** - * hdfsGetPathInfo - Get information about a path as a (dynamically - * allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be - * called when the pointer is no longer needed. - * @param fs The configured filesystem handle. - * @param path The path of the file. - * @return Returns a dynamically-allocated hdfsFileInfo object; - * NULL on error. - */ - hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path); - - - /** - * hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields) - * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo - * objects. - * @param numEntries The size of the array. - */ - void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries); - - - /** - * hdfsGetHosts - Get hostnames where a particular block (determined by - * pos & blocksize) of a file is stored. The last element in the array - * is NULL. Due to replication, a single block could be present on - * multiple hosts. - * @param fs The configured filesystem handle. - * @param path The path of the file. - * @param start The start of the block. - * @param length The length of the block. - * @return Returns a dynamically-allocated 2-d array of blocks-hosts; - * NULL on error. - */ - char*** hdfsGetHosts(hdfsFS fs, const char* path, - tOffset start, tOffset length); - - - /** - * hdfsFreeHosts - Free up the structure returned by hdfsGetHosts - * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo - * objects. - * @param numEntries The size of the array. - */ - void hdfsFreeHosts(char ***blockHosts); - - - /** - * hdfsGetDefaultBlockSize - Get the optimum blocksize. - * @param fs The configured filesystem handle. - * @return Returns the blocksize; -1 on error. - */ - tOffset hdfsGetDefaultBlockSize(hdfsFS fs); - - - /** - * hdfsGetCapacity - Return the raw capacity of the filesystem. - * @param fs The configured filesystem handle. - * @return Returns the raw-capacity; -1 on error. - */ - tOffset hdfsGetCapacity(hdfsFS fs); - - - /** - * hdfsGetUsed - Return the total raw size of all files in the filesystem. - * @param fs The configured filesystem handle. - * @return Returns the total-size; -1 on error. - */ - tOffset hdfsGetUsed(hdfsFS fs); - - /** - * hdfsChown - * @param fs The configured filesystem handle. - * @param path the path to the file or directory - * @param owner this is a string in Hadoop land. Set to null or "" if only setting group - * @param group this is a string in Hadoop land. Set to null or "" if only setting user - * @return 0 on success else -1 - */ - int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group); - - /** - * hdfsChmod - * @param fs The configured filesystem handle. - * @param path the path to the file or directory - * @param mode the bitmask to set it to - * @return 0 on success else -1 - */ - int hdfsChmod(hdfsFS fs, const char* path, short mode); - - /** - * hdfsUtime - * @param fs The configured filesystem handle. - * @param path the path to the file or directory - * @param mtime new modification time or 0 for only set access time in seconds - * @param atime new access time or 0 for only set modification time in seconds - * @return 0 on success else -1 - */ - int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime); - -#ifdef __cplusplus -} -#endif - -#endif /*LIBHDFS_HDFS_H*/ - -/** - * vim: ts=4: sw=4: et - */ diff --git a/hdfs/libhdfs.a b/hdfs/libhdfs.a deleted file mode 100644 index 4d1f19f0b..000000000 Binary files a/hdfs/libhdfs.a and /dev/null differ diff --git a/hdfs/setup.sh b/hdfs/setup.sh new file mode 100644 index 000000000..ac69b525d --- /dev/null +++ b/hdfs/setup.sh @@ -0,0 +1,7 @@ +export USE_HDFS=1 +export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:/usr/lib/hadoop/lib/native + +export CLASSPATH= +for f in `find /usr/lib/hadoop-hdfs | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done +for f in `find /usr/lib/hadoop | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done +for f in `find /usr/lib/hadoop/client | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e26ecde51..9ba6a522c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -53,8 +53,18 @@ enum CompressionType : char { }; enum CompactionStyle : char { - kCompactionStyleLevel = 0x0, // level based compaction style - kCompactionStyleUniversal = 0x1 // Universal compaction style + kCompactionStyleLevel = 0x0, // level based compaction style + kCompactionStyleUniversal = 0x1, // Universal compaction style + kCompactionStyleFIFO = 0x2, // FIFO compaction style +}; + +struct CompactionOptionsFIFO { + // once the total sum of table files reaches this, we will delete the oldest + // table file + // Default: 1GB + uint64_t max_table_files_size; + + CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {} }; // Compression options for different compression algorithms like Zlib @@ -429,6 +439,9 @@ struct ColumnFamilyOptions { // The options needed to support Universal Style compactions CompactionOptionsUniversal compaction_options_universal; + // The options for FIFO compaction style + CompactionOptionsFIFO compaction_options_fifo; + // Use KeyMayExist API to filter deletes when this is true. // If KeyMayExist returns false, i.e. the key definitely does not exist, then // the delete is a noop. KeyMayExist only incurs in-memory look up. diff --git a/include/rocksdb/version.h b/include/rocksdb/version.h index e440412eb..2bae1ed6e 100644 --- a/include/rocksdb/version.h +++ b/include/rocksdb/version.h @@ -6,7 +6,7 @@ // Also update Makefile if you change these #define ROCKSDB_MAJOR 3 -#define ROCKSDB_MINOR 1 +#define ROCKSDB_MINOR 2 #define ROCKSDB_PATCH 0 // Do not use these. We made the mistake of declaring macros starting with diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index 0a12b5a60..3ef383afc 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -60,7 +60,7 @@ def main(argv): + str(ops_per_thread) + "\nwrite_buffer_size=" \ + str(write_buf_size) + "\n" - total_check_mode = 3 + total_check_mode = 4 check_mode = 0 while time.time() < exit_time: @@ -75,8 +75,14 @@ def main(argv): # normal run with universal compaction mode additional_opts = "--ops_per_thread=" + str(ops_per_thread) + \ " --compaction_style=1" + elif check_mode == 2: + # normal run with FIFO compaction mode + # ops_per_thread is divided by 5 because FIFO compaction + # style is quite a bit slower on reads with lot of files + additional_opts = "--ops_per_thread=" + str(ops_per_thread / 5) + \ + " --compaction_style=2" else: - # nomral run + # normal run additional_opts = "--ops_per_thread=" + str(ops_per_thread) dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_') diff --git a/util/env_hdfs.cc b/util/env_hdfs.cc index eb2b12cba..1618e5468 100644 --- a/util/env_hdfs.cc +++ b/util/env_hdfs.cc @@ -15,11 +15,11 @@ #include #include "rocksdb/env.h" #include "rocksdb/status.h" -#include "hdfs/hdfs.h" #include "hdfs/env_hdfs.h" #define HDFS_EXISTS 0 -#define HDFS_DOESNT_EXIST 1 +#define HDFS_DOESNT_EXIST -1 +#define HDFS_SUCCESS 0 // // This file defines an HDFS environment for rocksdb. It uses the libhdfs @@ -223,7 +223,7 @@ class HdfsWritableFile: public WritableFile { if (hdfsFlush(fileSys_, hfile_) == -1) { return IOError(filename_, errno); } - if (hdfsSync(fileSys_, hfile_) == -1) { + if (hdfsHSync(fileSys_, hfile_) == -1) { return IOError(filename_, errno); } Log(mylog, "[hdfs] HdfsWritableFile Synced %s\n", filename_.c_str()); @@ -398,12 +398,34 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname, return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv"); } +class HdfsDirectory : public Directory { + public: + explicit HdfsDirectory(int fd) : fd_(fd) {} + ~HdfsDirectory() {} + + virtual Status Fsync() { return Status::OK(); } + + private: + int fd_; +}; + Status HdfsEnv::NewDirectory(const std::string& name, unique_ptr* result) { - return Status::NotSupported("NewDirectory not supported on HdfsEnv"); + int value = hdfsExists(fileSys_, name.c_str()); + switch (value) { + case HDFS_EXISTS: + result->reset(new HdfsDirectory(0)); + return Status::OK(); + default: // fail if the directory doesn't exist + Log(mylog, "NewDirectory hdfsExists call failed"); + throw HdfsFatalException("hdfsExists call failed with error " + + std::to_string(value) + " on path " + name + + ".\n"); + } } bool HdfsEnv::FileExists(const std::string& fname) { + int value = hdfsExists(fileSys_, fname.c_str()); switch (value) { case HDFS_EXISTS: @@ -413,7 +435,8 @@ bool HdfsEnv::FileExists(const std::string& fname) { default: // anything else should be an error Log(mylog, "FileExists hdfsExists call failed"); throw HdfsFatalException("hdfsExists call failed with error " + - std::to_string(value) + ".\n"); + std::to_string(value) + " on path " + fname + + ".\n"); } } @@ -455,7 +478,7 @@ Status HdfsEnv::GetChildren(const std::string& path, } Status HdfsEnv::DeleteFile(const std::string& fname) { - if (hdfsDelete(fileSys_, fname.c_str()) == 0) { + if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) { return Status::OK(); } return IOError(fname, errno); @@ -514,7 +537,7 @@ Status HdfsEnv::GetFileModificationTime(const std::string& fname, // target already exists. So, we delete the target before attemting the // rename. Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { - hdfsDelete(fileSys_, target.c_str()); + hdfsDelete(fileSys_, target.c_str(), 1); if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) { return Status::OK(); } diff --git a/util/env_posix.cc b/util/env_posix.cc index 7ffba6f53..1f8c3bcf2 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -94,6 +94,17 @@ static Status IOError(const std::string& context, int err_number) { return Status::IOError(context, strerror(err_number)); } +// TODO(sdong): temp logging. Need to help debugging. Remove it when +// the feature is proved to be stable. +inline void PrintThreadInfo(size_t thread_id, pthread_t id) { + unsigned char* ptc = (unsigned char*)(void*)(&id); + fprintf(stdout, "Bg thread %zu terminates 0x", thread_id); + for (size_t i = 0; i < sizeof(id); i++) { + fprintf(stdout, "%02x", (unsigned)(ptc[i])); + } + fprintf(stdout, "\n"); +} + #ifdef NDEBUG // empty in release build #define TEST_KILL_RANDOM(rocksdb_kill_odds) @@ -1514,8 +1525,7 @@ class PosixEnv : public Env { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); // TODO(sdong): temp logging. Need to help debugging. Remove it when // the feature is proved to be stable. - fprintf(stdout, "Bg thread %zu terminates %llx\n", thread_id, - static_cast(terminating_thread)); + PrintThreadInfo(thread_id, terminating_thread); break; } void (*function)(void*) = queue_.front().function; diff --git a/util/env_test.cc b/util/env_test.cc index 2abce6f3a..c0d00ce94 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -285,7 +285,7 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) { // Increase to 5 threads. Task 0 and 2 running. env_->SetBackgroundThreads(5, Env::Priority::HIGH); Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(tasks[2].IsSleeping()); @@ -330,7 +330,7 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) { tasks[4].WakeUp(); Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); for (size_t i = 5; i < 8; i++) { ASSERT_TRUE(tasks[i].IsSleeping()); } @@ -360,13 +360,13 @@ TEST(EnvPosixTest, DecreaseNumBgThreads) { env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9], Env::Priority::HIGH); Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0); + ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0); ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping()); // Increase to 4 threads. Task 5, 8, 9 running. env_->SetBackgroundThreads(4, Env::Priority::HIGH); Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_TRUE(tasks[8].IsSleeping()); ASSERT_TRUE(tasks[9].IsSleeping()); diff --git a/util/options.cc b/util/options.cc index 22952f587..4fe8b219e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -135,6 +135,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) compaction_style(options.compaction_style), verify_checksums_in_compaction(options.verify_checksums_in_compaction), compaction_options_universal(options.compaction_options_universal), + compaction_options_fifo(options.compaction_options_fifo), filter_deletes(options.filter_deletes), max_sequential_skip_in_iterations( options.max_sequential_skip_in_iterations), @@ -413,6 +414,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { Log(log, "Options.compaction_options_universal.compression_size_percent: %u", compaction_options_universal.compression_size_percent); + Log(log, "Options.compaction_options_fifo.max_table_files_size: %" PRIu64, + compaction_options_fifo.max_table_files_size); std::string collector_names; for (const auto& collector_factory : table_properties_collector_factories) { collector_names.append(collector_factory->Name());