DeleteRange flush support

Summary:
Changed BuildTable() (used for flush) to (1) add range
tombstones to the aggregator, which is used by CompactionIterator to
determine which keys can be removed; and (2) add aggregator's range
tombstones to the table that is output for the flush.
Closes https://github.com/facebook/rocksdb/pull/1438

Differential Revision: D4100025

Pulled By: ajkr

fbshipit-source-id: cb01a70
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent d5555d95a3
commit 40a2e406f8
  1. 18
      db/builder.cc
  2. 4
      db/builder.h
  3. 4
      db/db_impl.cc
  4. 16
      db/flush_job.cc
  5. 20
      db/flush_job_test.cc
  6. 11
      db/repair.cc
  7. 2
      table/mock_table.cc

@ -61,7 +61,8 @@ TableBuilder* NewTableBuilder(
Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
TableCache* table_cache, InternalIterator* iter, FileMetaData* meta,
TableCache* table_cache, InternalIterator* iter,
ScopedArenaIterator&& range_del_iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
@ -81,6 +82,14 @@ Status BuildTable(
Status s;
meta->fd.file_size = 0;
iter->SeekToFirst();
range_del_iter->SeekToFirst();
std::unique_ptr<RangeDelAggregator> range_del_agg(
new RangeDelAggregator(internal_comparator, snapshots));
s = range_del_agg->AddTombstones(std::move(range_del_iter));
if (!s.ok()) {
// may be non-ok if a range tombstone key is unparsable
return s;
}
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
@ -90,7 +99,7 @@ Status BuildTable(
#endif // !ROCKSDB_LITE
TableProperties tp;
if (iter->Valid()) {
if (iter->Valid() || range_del_agg->ShouldAddTombstones()) {
TableBuilder* builder;
unique_ptr<WritableFileWriter> file_writer;
{
@ -112,8 +121,6 @@ Status BuildTable(
compression_opts, level);
}
std::unique_ptr<RangeDelAggregator> range_del_agg;
range_del_agg.reset(new RangeDelAggregator(internal_comparator, snapshots));
MergeHelper merge(env, internal_comparator.user_comparator(),
ioptions.merge_operator, nullptr, ioptions.info_log,
mutable_cf_options.min_partial_merge_operands,
@ -138,6 +145,9 @@ Status BuildTable(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
// nullptr for table_{min,max} so all range tombstones will be flushed
range_del_agg->AddToBuilder(builder, true /* extend_before_min_key */,
nullptr /* next_table_min_key*/, meta);
// Finish and check for builder errors
bool empty = builder->NumEntries() == 0;

@ -17,6 +17,7 @@
#include "rocksdb/status.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
#include "table/scoped_arena_iterator.h"
#include "util/cf_options.h"
#include "util/event_logger.h"
@ -63,7 +64,8 @@ TableBuilder* NewTableBuilder(
extern Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
TableCache* table_cache, InternalIterator* iter, FileMetaData* meta,
TableCache* table_cache, InternalIterator* iter,
ScopedArenaIterator&& range_del_iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,

@ -1794,7 +1794,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_,
cfd->table_cache(), iter.get(), &meta, cfd->internal_comparator(),
cfd->table_cache(), iter.get(),
ScopedArenaIterator(mem->NewRangeTombstoneIterator(ro, &arena)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
snapshot_seqs, earliest_write_conflict_snapshot,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),

@ -241,7 +241,11 @@ Status FlushJob::WriteLevel0Table() {
if (log_buffer_) {
log_buffer_->FlushBufferToLog();
}
// memtables and range_del_iters store internal iterators over each data
// memtable and its associated range deletion memtable, respectively, at
// corresponding indexes.
std::vector<InternalIterator*> memtables;
std::vector<InternalIterator*> range_del_iters;
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
@ -252,10 +256,12 @@ Status FlushJob::WriteLevel0Table() {
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
range_del_iters.push_back(m->NewRangeTombstoneIterator(ro, &arena));
total_num_entries += m->num_entries();
total_num_deletes += m->num_deletes();
total_memory_usage += m->ApproximateMemoryUsage();
}
assert(memtables.size() == range_del_iters.size());
event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
@ -268,6 +274,9 @@ Status FlushJob::WriteLevel0Table() {
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
static_cast<int>(memtables.size()), &arena));
ScopedArenaIterator range_del_iter(NewMergingIterator(
&cfd_->internal_comparator(), &range_del_iters[0],
static_cast<int>(range_del_iters.size()), &arena));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber());
@ -276,9 +285,10 @@ Status FlushJob::WriteLevel0Table() {
&output_compression_);
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(), iter.get(), &meta_,
cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(),
cfd_->GetID(), cfd_->GetName(), existing_snapshots_,
env_options_, cfd_->table_cache(), iter.get(),
std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, output_compression_,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),

@ -115,18 +115,19 @@ TEST_F(FlushJobTest, NonEmpty) {
// Test data:
// seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
// key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
// Expected:
// smallest_key = "0"
// largest_key = "9999"
// smallest_seqno = 1
// smallest_seqno = 9999
// range-delete "9995" -> "9999" at seqno 10000
for (int i = 1; i < 10000; ++i) {
std::string key(ToString((i + 1000) % 10000));
std::string value("value" + key);
new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
inserted_keys.insert({internal_key.Encode().ToString(), value});
if ((i + 1000) % 10000 < 9995) {
InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
inserted_keys.insert({internal_key.Encode().ToString(), value});
}
}
new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999");
InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
inserted_keys.insert({internal_key.Encode().ToString(), "9999"});
autovector<MemTable*> to_delete;
cfd->imm()->Add(new_mem, &to_delete);
@ -146,9 +147,10 @@ TEST_F(FlushJobTest, NonEmpty) {
ASSERT_OK(flush_job.Run(&fd));
mutex_.Unlock();
ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString());
ASSERT_EQ(ToString(9999), fd.largest.user_key().ToString());
ASSERT_EQ(ToString(9999),
fd.largest.user_key().ToString()); // range tombstone end key
ASSERT_EQ(1, fd.smallest_seqno);
ASSERT_EQ(9999, fd.largest_seqno);
ASSERT_EQ(10000, fd.largest_seqno); // range tombstone seqnum 10000
mock_table_factory_->AssertSingleFile(inserted_keys);
job_context.Clean();
}

@ -380,11 +380,12 @@ class Repairer {
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
status = BuildTable(
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
env_options_, table_cache_, iter.get(), &meta,
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression,
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery);
env_options_, table_cache_, iter.get(),
ScopedArenaIterator(mem->NewRangeTombstoneIterator(ro, &arena)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
{}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false,
nullptr /* internal_stats */, TableFileCreationReason::kRecovery);
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log,
counter, meta.fd.GetNumber(), status.ToString().c_str());

@ -117,7 +117,7 @@ uint32_t MockTableFactory::GetIDFromFile(RandomAccessFileReader* file) const {
void MockTableFactory::AssertSingleFile(
const stl_wrappers::KVMap& file_contents) {
ASSERT_EQ(file_system_.files.size(), 1U);
ASSERT_TRUE(file_contents == file_system_.files.begin()->second);
ASSERT_EQ(file_contents, file_system_.files.begin()->second);
}
void MockTableFactory::AssertLatestFile(

Loading…
Cancel
Save