Add checks to GetUpdatesSince (#9459)

Summary:
Make `DB::GetUpdatesSince` return early if told to scan WALs generated by transactions
with write-prepared or write-unprepared policies (`seq_per_batch` is true), as indicated by
API comment.

Also add checks to `TransactionLogIterator` to clarify some conditions.

No API change.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9459

Test Plan:
make check

Closing https://github.com/facebook/rocksdb/issues/1565

Reviewed By: akankshamahajan15

Differential Revision: D33821243

Pulled By: riversand963

fbshipit-source-id: c8b155d020ce0980e2d3b3b1da40b96e65b48d79
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent 0bd4dcde6b
commit fe63899d1a
  1. 1
      HISTORY.md
  2. 5
      db/db_impl/db_impl.cc
  3. 81
      db/transaction_log_impl.cc
  4. 19
      db/transaction_log_impl.h
  5. 5
      db/wal_manager.cc
  6. 2
      db/wal_manager.h
  7. 9
      include/rocksdb/db.h

@ -23,6 +23,7 @@
### Behavior changes
* Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794).
* Make DB::GetUpdatesSince() return NotSupported early for write-prepared/write-unprepared transactions, as the API contract indicates.
### Public API changes
* Exposed APIs to examine results of block cache stats collections in a structured way. In particular, users of `GetMapProperty()` with property `kBlockCacheEntryStats` can now use the functions in `BlockCacheEntryStatsMapKeys` to find stats in the map.

@ -3682,6 +3682,11 @@ Status DBImpl::GetUpdatesSince(
SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) {
RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
if (seq_per_batch_) {
return Status::NotSupported(
"This API is not yet compatible with write-prepared/write-unprepared "
"transactions");
}
if (seq > versions_->LastSequence()) {
return Status::NotFound("Requested sequence not yet written in the db");
}

@ -6,9 +6,12 @@
#ifndef ROCKSDB_LITE
#include "db/transaction_log_impl.h"
#include <cinttypes>
#include "db/write_batch_internal.h"
#include "file/sequence_file_reader.h"
#include "util/defer.h"
namespace ROCKSDB_NAMESPACE {
@ -24,16 +27,17 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
soptions_(soptions),
starting_sequence_number_(seq),
files_(std::move(files)),
versions_(versions),
seq_per_batch_(seq_per_batch),
io_tracer_(io_tracer),
started_(false),
is_valid_(false),
current_file_index_(0),
current_batch_seq_(0),
current_last_seq_(0),
versions_(versions),
seq_per_batch_(seq_per_batch),
io_tracer_(io_tracer) {
current_last_seq_(0) {
assert(files_ != nullptr);
assert(versions_ != nullptr);
assert(!seq_per_batch_);
current_status_.PermitUncheckedError(); // Clear on start
reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get();
@ -94,8 +98,21 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
Slice record;
started_ = false;
is_valid_ = false;
// Check invariant of TransactionLogIterator when SeekToStartSequence()
// succeeds.
const Defer defer([this]() {
if (is_valid_) {
assert(current_status_.ok());
if (starting_sequence_number_ > current_batch_seq_) {
assert(current_batch_seq_ < current_last_seq_);
assert(current_last_seq_ >= starting_sequence_number_);
}
}
});
if (files_->size() <= start_file_index) {
return;
} else if (!current_status_.ok()) {
return;
}
Status s =
OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
@ -151,6 +168,9 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
}
void TransactionLogIteratorImpl::Next() {
if (!current_status_.ok()) {
return;
}
return NextImpl(false);
}
@ -159,7 +179,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
is_valid_ = false;
if (!internal && !started_) {
// Runs every time until we can seek to the start sequence
return SeekToStartSequence();
SeekToStartSequence();
}
while(true) {
assert(current_log_reader_);
@ -249,55 +269,10 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
return SeekToStartSequence(current_file_index_, !seq_per_batch_);
}
struct BatchCounter : public WriteBatch::Handler {
SequenceNumber sequence_;
BatchCounter(SequenceNumber sequence) : sequence_(sequence) {}
Status MarkNoop(bool empty_batch) override {
if (!empty_batch) {
sequence_++;
}
return Status::OK();
}
Status MarkEndPrepare(const Slice&) override {
sequence_++;
return Status::OK();
}
Status MarkCommit(const Slice&) override {
sequence_++;
return Status::OK();
}
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
++sequence_;
return Status::OK();
}
Status PutCF(uint32_t /*cf*/, const Slice& /*key*/,
const Slice& /*val*/) override {
return Status::OK();
}
Status DeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
return Status::OK();
}
Status SingleDeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
return Status::OK();
}
Status MergeCF(uint32_t /*cf*/, const Slice& /*key*/,
const Slice& /*val*/) override {
return Status::OK();
}
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
};
current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
if (seq_per_batch_) {
BatchCounter counter(current_batch_seq_);
batch->Iterate(&counter);
current_last_seq_ = counter.sequence_;
} else {
current_last_seq_ =
current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
}
assert(!seq_per_batch_);
current_last_seq_ =
current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
// currentBatchSeq_ can only change here
assert(current_last_seq_ <= versions_->LastSequence());

@ -81,6 +81,13 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const EnvOptions& soptions_;
SequenceNumber starting_sequence_number_;
std::unique_ptr<VectorLogPtr> files_;
// Used only to get latest seq. num
// TODO(icanadi) can this be just a callback?
VersionSet const* const versions_;
const bool seq_per_batch_;
std::shared_ptr<IOTracer> io_tracer_;
// State variables
bool started_;
bool is_valid_; // not valid when it starts of.
Status current_status_;
@ -104,14 +111,11 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
SequenceNumber
current_batch_seq_; // sequence number at start of current batch
SequenceNumber current_last_seq_; // last sequence in the current batch
// Used only to get latest seq. num
// TODO(icanadi) can this be just a callback?
VersionSet const* const versions_;
const bool seq_per_batch_;
// Reads from transaction log only if the writebatch record has been written
bool RestrictedRead(Slice* record);
// Seeks to startingSequenceNumber reading from startFileIndex in files_.
// If strict is set,then must get a batch starting with startingSequenceNumber
// Seeks to starting_sequence_number_ reading from start_file_index in files_.
// If strict is set, then must get a batch starting with
// starting_sequence_number_.
void SeekToStartSequence(uint64_t start_file_index = 0, bool strict = false);
// Implementation of Next. SeekToStartSequence calls it internally with
// internal=true to let it find next entry even if it has to jump gaps because
@ -120,10 +124,9 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
void NextImpl(bool internal = false);
// Check if batch is expected, else return false
bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq);
// Update current batch if a continuous batch is found, else return false
// Update current batch if a continuous batch is found.
void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file);
std::shared_ptr<IOTracer> io_tracer_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -105,6 +105,11 @@ Status WalManager::GetUpdatesSince(
SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options,
VersionSet* version_set) {
if (seq_per_batch_) {
return Status::NotSupported();
}
assert(!seq_per_batch_);
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.

@ -113,7 +113,7 @@ class WalManager {
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;
static constexpr uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;
std::shared_ptr<IOTracer> io_tracer_;
};

@ -1484,9 +1484,12 @@ class DB {
virtual Status GetCreationTimeOfOldestFile(uint64_t* creation_time) = 0;
// Note: this API is not yet consistent with WritePrepared transactions.
// Sets iter to an iterator that is positioned at a write-batch containing
// seq_number. If the sequence number is non existent, it returns an iterator
// at the first available seq_no after the requested seq_no
//
// Sets iter to an iterator that is positioned at a write-batch whose
// sequence number range [start_seq, end_seq] covers seq_number. If no such
// write-batch exists, then iter is positioned at the next write-batch whose
// start_seq > seq_number.
//
// Returns Status::OK if iterator is valid
// Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to
// use this api, else the WAL files will get

Loading…
Cancel
Save