Add commit marker with timestamp (#9266)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9266

This diff adds a new tag `CommitWithTimestamp`. Currently, there is no API to trigger writing
this tag to WAL, thus it is unavailable to users.
This is an ongoing effort to add user-defined timestamp support to write-committed transactions.
This diff also indicates all column families that may potentially participate in the same
transaction must either disable timestamp or have the same timestamp format, since
`CommitWithTimestamp` tag is followed by a single byte-array denoting the commit
timestamp of the transaction. We will enforce this checking in a future diff. We keep this
diff small.

Reviewed By: ltamasi

Differential Revision: D31721350

fbshipit-source-id: e1450811443647feb6ca01adec4c8aaae270ffc6
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent c39a808cb6
commit bd513fd075
  1. 2
      HISTORY.md
  2. 4
      db/blob/db_blob_compaction_test.cc
  3. 6
      db/db_basic_test.cc
  4. 6
      db/db_compaction_test.cc
  5. 8
      db/db_flush_test.cc
  6. 4
      db/db_impl/db_impl.h
  7. 4
      db/db_impl/db_impl_secondary.h
  8. 2
      db/db_test_util.cc
  9. 8
      db/db_wal_test.cc
  10. 5
      db/dbformat.h
  11. 2
      db/listener_test.cc
  12. 2
      db/obsolete_files_test.cc
  13. 4
      db/transaction_log_impl.cc
  14. 105
      db/write_batch.cc
  15. 9
      db/write_batch_internal.h
  16. 19
      db/write_batch_test.cc
  17. 6
      include/rocksdb/write_batch.h
  18. 21
      java/rocksjni/portal.h
  19. 24
      java/rocksjni/writebatchhandlerjnicallback.cc
  20. 2
      java/rocksjni/writebatchhandlerjnicallback.h
  21. 2
      java/src/main/java/org/rocksdb/WriteBatch.java
  22. 22
      java/src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java
  23. 5
      java/src/test/java/org/rocksdb/util/WriteBatchGetter.java
  24. 8
      tools/ldb_cmd.cc
  25. 6
      tools/trace_analyzer_tool.h
  26. 4
      utilities/transactions/transaction_base.cc

@ -1,6 +1,8 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### New Features ### New Features
* Introduced 'CommitWithTimestamp' as a new tag. Currently, there is no API for user to trigger a write with this tag to the WAL. This is part of the efforts to support write-commited transactions with user-defined timestamps.
### Bug Fixes ### Bug Fixes
* Fixed a bug in rocksdb automatic implicit prefetching which got broken because of new feature adaptive_readahead and internal prefetching got disabled when iterator moves from one file to next. * Fixed a bug in rocksdb automatic implicit prefetching which got broken because of new feature adaptive_readahead and internal prefetching got disabled when iterator moves from one file to next.
* Fixed a bug in TableOptions.prepopulate_block_cache which causes segmentation fault when used with TableOptions.partition_filters = true and TableOptions.cache_index_and_filter_blocks = true. * Fixed a bug in TableOptions.prepopulate_block_cache which causes segmentation fault when used with TableOptions.partition_filters = true and TableOptions.cache_index_and_filter_blocks = true.

@ -18,7 +18,7 @@ class DBBlobCompactionTest : public DBTestBase {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
const std::vector<InternalStats::CompactionStats>& GetCompactionStats() { const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
assert(versions->GetColumnFamilySet()); assert(versions->GetColumnFamilySet());
@ -495,7 +495,7 @@ TEST_F(DBBlobCompactionTest, TrackGarbage) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
assert(versions->GetColumnFamilySet()); assert(versions->GetColumnFamilySet());

@ -2565,7 +2565,7 @@ TEST_F(DBBasicTest, DisableTrackWal) {
ASSERT_OK(dbfull()->TEST_SwitchMemtable()); ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(db_->SyncWAL()); ASSERT_OK(db_->SyncWAL());
// Some WALs are tracked. // Some WALs are tracked.
ASSERT_FALSE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty()); ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
Close(); Close();
// Disable WAL tracking. // Disable WAL tracking.
@ -2573,14 +2573,14 @@ TEST_F(DBBasicTest, DisableTrackWal) {
options.create_if_missing = false; options.create_if_missing = false;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
// Previously tracked WALs are cleared. // Previously tracked WALs are cleared.
ASSERT_TRUE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty()); ASSERT_TRUE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
Close(); Close();
// Re-enable WAL tracking again. // Re-enable WAL tracking again.
options.track_and_verify_wals_in_manifest = true; options.track_and_verify_wals_in_manifest = true;
options.create_if_missing = false; options.create_if_missing = false;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
ASSERT_TRUE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty()); ASSERT_TRUE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
Close(); Close();
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -6127,7 +6127,7 @@ TEST_F(DBCompactionTest, CompactionWithBlob) {
ASSERT_EQ(Get(first_key), third_value); ASSERT_EQ(Get(first_key), third_value);
ASSERT_EQ(Get(second_key), third_value); ASSERT_EQ(Get(second_key), third_value);
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@ -6230,7 +6230,7 @@ TEST_P(DBCompactionTestBlobError, CompactionError) {
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@ -6352,7 +6352,7 @@ TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) {
ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]); ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
} }
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
assert(versions->GetColumnFamilySet()); assert(versions->GetColumnFamilySet());

@ -1608,7 +1608,7 @@ TEST_F(DBFlushTest, FlushWithBlob) {
ASSERT_EQ(Get("key1"), short_value); ASSERT_EQ(Get("key1"), short_value);
ASSERT_EQ(Get("key2"), long_value); ASSERT_EQ(Get("key2"), long_value);
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@ -1938,7 +1938,7 @@ TEST_P(DBFlushTestBlobError, FlushError) {
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@ -2149,7 +2149,7 @@ TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
flush_edits.push_back({}); flush_edits.push_back({});
auto unflushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[1]); auto unflushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[1]);
ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->TEST_GetVersionSet(), ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
flushed_cfds, flush_edits), flushed_cfds, flush_edits),
unflushed_cfh->cfd()->GetLogNumber()); unflushed_cfh->cfd()->GetLogNumber());
} }
@ -2174,7 +2174,7 @@ TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber()); std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber());
} }
ASSERT_EQ(min_log_number_to_keep, log_num_after_flush); ASSERT_EQ(min_log_number_to_keep, log_num_after_flush);
ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->TEST_GetVersionSet(), ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
flushed_cfds, flush_edits), flushed_cfds, flush_edits),
min_log_number_to_keep); min_log_number_to_keep);
} }

@ -942,6 +942,8 @@ class DBImpl : public DB {
int max_entries_to_print, int max_entries_to_print,
std::string* out_str); std::string* out_str);
VersionSet* GetVersionSet() const { return versions_.get(); }
#ifndef NDEBUG #ifndef NDEBUG
// Compact any files in the named level that overlap [*begin, *end] // Compact any files in the named level that overlap [*begin, *end]
Status TEST_CompactRange(int level, const Slice* begin, const Slice* end, Status TEST_CompactRange(int level, const Slice* begin, const Slice* end,
@ -1049,8 +1051,6 @@ class DBImpl : public DB {
void TEST_WaitForStatsDumpRun(std::function<void()> callback) const; void TEST_WaitForStatsDumpRun(std::function<void()> callback) const;
size_t TEST_EstimateInMemoryStatsHistorySize() const; size_t TEST_EstimateInMemoryStatsHistorySize() const;
VersionSet* TEST_GetVersionSet() const { return versions_.get(); }
uint64_t TEST_GetCurrentLogNumber() const { uint64_t TEST_GetCurrentLogNumber() const {
InstrumentedMutexLock l(mutex()); InstrumentedMutexLock l(mutex());
assert(!logs_.empty()); assert(!logs_.empty());

@ -290,6 +290,10 @@ class DBImplSecondary : public DBImpl {
Status MarkCommit(const Slice&) override { return Status::OK(); } Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
return Status::OK();
}
Status MarkNoop(bool) override { return Status::OK(); } Status MarkNoop(bool) override { return Status::OK(); }
const std::unordered_set<uint32_t>& column_families() const { const std::unordered_set<uint32_t>& column_families() const {

@ -1130,7 +1130,7 @@ std::string DBTestBase::FilesPerLevel(int cf) {
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
std::vector<uint64_t> DBTestBase::GetBlobFileNumbers() { std::vector<uint64_t> DBTestBase::GetBlobFileNumbers() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();

@ -382,7 +382,7 @@ TEST_F(DBWALTest, RecoverWithBlob) {
// There should be no files just yet since we haven't flushed. // There should be no files just yet since we haven't flushed.
{ {
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr); ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@ -412,7 +412,7 @@ TEST_F(DBWALTest, RecoverWithBlob) {
ASSERT_EQ(Get("key1"), short_value); ASSERT_EQ(Get("key1"), short_value);
ASSERT_EQ(Get("key2"), long_value); ASSERT_EQ(Get("key2"), long_value);
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr); ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@ -477,7 +477,7 @@ TEST_F(DBWALTest, RecoverWithBlobMultiSST) {
// There should be no files just yet since we haven't flushed. // There should be no files just yet since we haven't flushed.
{ {
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr); ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@ -509,7 +509,7 @@ TEST_F(DBWALTest, RecoverWithBlobMultiSST) {
ASSERT_EQ(Get(Key(i)), large_value); ASSERT_EQ(Get(Key(i)), large_value);
} }
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
ASSERT_NE(versions, nullptr); ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();

@ -65,7 +65,8 @@ enum ValueType : unsigned char {
// another. // another.
kTypeBeginUnprepareXID = 0x13, // WAL only. kTypeBeginUnprepareXID = 0x13, // WAL only.
kTypeDeletionWithTimestamp = 0x14, kTypeDeletionWithTimestamp = 0x14,
kMaxValue = 0x7F // Not used for storing records. kTypeCommitXIDAndTimestamp = 0x15, // WAL only
kMaxValue = 0x7F // Not used for storing records.
}; };
// Defined in dbformat.cc // Defined in dbformat.cc
@ -654,7 +655,7 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key,
// Read record from a write batch piece from input. // Read record from a write batch piece from input.
// tag, column_family, key, value and blob are return values. Callers own the // tag, column_family, key, value and blob are return values. Callers own the
// Slice they point to. // slice they point to.
// Tag is defined as ValueType. // Tag is defined as ValueType.
// input will be advanced to after the record. // input will be advanced to after the record.
extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, extern Status ReadRecordFromWriteBatch(Slice* input, char* tag,

@ -1246,7 +1246,7 @@ class BlobDBJobLevelEventListenerTest : public EventListener {
} }
const VersionStorageInfo::BlobFiles& GetBlobFiles() { const VersionStorageInfo::BlobFiles& GetBlobFiles() {
VersionSet* const versions = test_->dbfull()->TEST_GetVersionSet(); VersionSet* const versions = test_->dbfull()->GetVersionSet();
assert(versions); assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();

@ -189,7 +189,7 @@ TEST_F(ObsoleteFilesTest, DeleteObsoleteOptionsFile) {
TEST_F(ObsoleteFilesTest, BlobFiles) { TEST_F(ObsoleteFilesTest, BlobFiles) {
ReopenDB(); ReopenDB();
VersionSet* const versions = dbfull()->TEST_GetVersionSet(); VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions); assert(versions);
assert(versions->GetColumnFamilySet()); assert(versions->GetColumnFamilySet());

@ -266,6 +266,10 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
sequence_++; sequence_++;
return Status::OK(); return Status::OK();
} }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
++sequence_;
return Status::OK();
}
Status PutCF(uint32_t /*cf*/, const Slice& /*key*/, Status PutCF(uint32_t /*cf*/, const Slice& /*key*/,
const Slice& /*val*/) override { const Slice& /*val*/) override {

@ -134,6 +134,11 @@ struct BatchContentClassifier : public WriteBatch::Handler {
return Status::OK(); return Status::OK();
} }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_COMMIT;
return Status::OK();
}
Status MarkRollback(const Slice&) override { Status MarkRollback(const Slice&) override {
content_flags |= ContentFlags::HAS_ROLLBACK; content_flags |= ContentFlags::HAS_ROLLBACK;
return Status::OK(); return Status::OK();
@ -416,6 +421,11 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
return Status::Corruption("bad EndPrepare XID"); return Status::Corruption("bad EndPrepare XID");
} }
break; break;
case kTypeCommitXIDAndTimestamp:
if (!GetLengthPrefixedSlice(input, key)) {
return Status::Corruption("bad commit timestamp");
}
FALLTHROUGH_INTENDED;
case kTypeCommitXID: case kTypeCommitXID:
if (!GetLengthPrefixedSlice(input, xid)) { if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Commit XID"); return Status::Corruption("bad Commit XID");
@ -625,6 +635,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
assert(s.ok()); assert(s.ok());
empty_batch = true; empty_batch = true;
break; break;
case kTypeCommitXIDAndTimestamp:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
// key stores the commit timestamp.
assert(!key.empty());
s = handler->MarkCommitWithTimestamp(xid, key);
if (LIKELY(s.ok())) {
empty_batch = true;
}
break;
case kTypeRollbackXID: case kTypeRollbackXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) & assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
@ -824,6 +844,19 @@ Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
return Status::OK(); return Status::OK();
} }
Status WriteBatchInternal::MarkCommitWithTimestamp(WriteBatch* b,
const Slice& xid,
const Slice& commit_ts) {
assert(!commit_ts.empty());
b->rep_.push_back(static_cast<char>(kTypeCommitXIDAndTimestamp));
PutLengthPrefixedSlice(&b->rep_, commit_ts);
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_COMMIT,
std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) { Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeRollbackXID)); b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
PutLengthPrefixedSlice(&b->rep_, xid); PutLengthPrefixedSlice(&b->rep_, xid);
@ -2072,6 +2105,8 @@ class MemTableInserter : public WriteBatch::Handler {
Status s; Status s;
if (recovering_log_number_ != 0) { if (recovering_log_number_ != 0) {
// We must hold db mutex in recovery.
db_->mutex()->AssertHeld();
// in recovery when we encounter a commit marker // in recovery when we encounter a commit marker
// we lookup this transaction in our set of rebuilt transactions // we lookup this transaction in our set of rebuilt transactions
// and commit. // and commit.
@ -2114,6 +2149,76 @@ class MemTableInserter : public WriteBatch::Handler {
return s; return s;
} }
Status MarkCommitWithTimestamp(const Slice& name,
const Slice& commit_ts) override {
assert(db_);
Status s;
if (recovering_log_number_ != 0) {
// In recovery, db mutex must be held.
db_->mutex()->AssertHeld();
// in recovery when we encounter a commit marker
// we lookup this transaction in our set of rebuilt transactions
// and commit.
auto trx = db_->GetRecoveredTransaction(name.ToString());
// the log containing the prepared section may have
// been released in the last incarnation because the
// data was flushed to L0
if (trx) {
// at this point individual CF lognumbers will prevent
// duplicate re-insertion of values.
assert(0 == log_number_ref_);
if (write_after_commit_) {
// write_after_commit_ can only have one batch in trx.
assert(trx->batches_.size() == 1);
const auto& batch_info = trx->batches_.begin()->second;
// all inserts must reference this trx log number
log_number_ref_ = batch_info.log_number_;
const auto checker = [this](uint32_t cf, size_t& ts_sz) {
assert(db_);
VersionSet* const vset = db_->GetVersionSet();
assert(vset);
ColumnFamilySet* const cf_set = vset->GetColumnFamilySet();
assert(cf_set);
ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf);
assert(cfd);
const auto* const ucmp = cfd->user_comparator();
assert(ucmp);
if (ucmp->timestamp_size() == 0) {
ts_sz = 0;
} else if (ucmp->timestamp_size() != ts_sz) {
return Status::InvalidArgument("Timestamp size mismatch");
}
return Status::OK();
};
s = batch_info.batch_->AssignTimestamp(commit_ts, checker);
if (s.ok()) {
s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0;
}
}
// else the values are already inserted before the commit
if (s.ok()) {
db_->DeleteRecoveredTransaction(name.ToString());
}
if (has_valid_writes_) {
*has_valid_writes_ = true;
}
}
} else {
// When writes are not delayed until commit, there is no connection
// between a memtable write and the WAL that supports it. So the commit
// need not reference any log as the only log to which it depends.
assert(!write_after_commit_ || log_number_ref_ > 0);
}
constexpr bool batch_boundary = true;
MaybeAdvanceSeq(batch_boundary);
return s;
}
Status MarkRollback(const Slice& name) override { Status MarkRollback(const Slice& name) override {
assert(db_); assert(db_);

@ -124,6 +124,9 @@ class WriteBatchInternal {
static Status MarkCommit(WriteBatch* batch, const Slice& xid); static Status MarkCommit(WriteBatch* batch, const Slice& xid);
static Status MarkCommitWithTimestamp(WriteBatch* batch, const Slice& xid,
const Slice& commit_ts);
static Status InsertNoop(WriteBatch* batch); static Status InsertNoop(WriteBatch* batch);
// Return the number of entries in the batch. // Return the number of entries in the batch.
@ -302,8 +305,14 @@ class TimestampAssignerBase : public WriteBatch::Handler {
Status MarkCommit(const Slice&) override { return Status::OK(); } Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
return Status::OK();
}
Status MarkRollback(const Slice&) override { return Status::OK(); } Status MarkRollback(const Slice&) override { return Status::OK(); }
Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
protected: protected:
Status AssignTimestamp(uint32_t cf, const Slice& key) { Status AssignTimestamp(uint32_t cf, const Slice& key) {
Status s = static_cast_with_check<Derived>(this)->AssignTimestampImpl( Status s = static_cast_with_check<Derived>(this)->AssignTimestampImpl(

@ -318,6 +318,11 @@ namespace {
seen += "MarkCommit(" + xid.ToString() + ")"; seen += "MarkCommit(" + xid.ToString() + ")";
return Status::OK(); return Status::OK();
} }
Status MarkCommitWithTimestamp(const Slice& xid, const Slice& ts) override {
seen += "MarkCommitWithTimestamp(" + xid.ToString() + ", " +
ts.ToString(true) + ")";
return Status::OK();
}
Status MarkRollback(const Slice& xid) override { Status MarkRollback(const Slice& xid) override {
seen += "MarkRollback(" + xid.ToString() + ")"; seen += "MarkRollback(" + xid.ToString() + ")";
return Status::OK(); return Status::OK();
@ -1057,6 +1062,20 @@ TEST_F(WriteBatchTest, AssignTimestamps) {
batch, std::string(timestamp_size, '\xee'), cf_to_ucmps)); batch, std::string(timestamp_size, '\xee'), cf_to_ucmps));
} }
TEST_F(WriteBatchTest, CommitWithTimestamp) {
WriteBatch wb;
const std::string txn_name = "xid1";
std::string ts;
constexpr uint64_t commit_ts = 23;
PutFixed64(&ts, commit_ts);
ASSERT_OK(WriteBatchInternal::MarkCommitWithTimestamp(&wb, txn_name, ts));
TestHandler handler;
ASSERT_OK(wb.Iterate(&handler));
ASSERT_EQ("MarkCommitWithTimestamp(" + txn_name + ", " +
Slice(ts).ToString(true) + ")",
handler.seen);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -288,6 +288,12 @@ class WriteBatch : public WriteBatchBase {
return Status::InvalidArgument("MarkCommit() handler not defined."); return Status::InvalidArgument("MarkCommit() handler not defined.");
} }
virtual Status MarkCommitWithTimestamp(const Slice& /*xid*/,
const Slice& /*commit_ts*/) {
return Status::InvalidArgument(
"MarkCommitWithTimestamp() handler not defined.");
}
// Continue is called by WriteBatch::Iterate. If it returns false, // Continue is called by WriteBatch::Iterate. If it returns false,
// iteration is halted. Otherwise, it continues iterating. The default // iteration is halted. Otherwise, it continues iterating. The default
// implementation always returns true. // implementation always returns true.

@ -3240,6 +3240,27 @@ class WriteBatchHandlerJni
return mid; return mid;
} }
/**
* Get the Java Method: WriteBatch.Handler#markCommitWithTimestamp
*
* @param env A pointer to the Java environment
*
* @return The Java Method ID or nullptr if the class or method id could not
* be retrieved
*/
static jmethodID getMarkCommitWithTimestampMethodId(JNIEnv* env) {
jclass jclazz = getJClass(env);
if (jclazz == nullptr) {
// exception occurred accessing class
return nullptr;
}
static jmethodID mid =
env->GetMethodID(jclazz, "markCommitWithTimestamp", "([B[B)V");
assert(mid != nullptr);
return mid;
}
/** /**
* Get the Java Method: WriteBatch.Handler#shouldContinue * Get the Java Method: WriteBatch.Handler#shouldContinue
* *

@ -121,6 +121,13 @@ WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback(
return; return;
} }
m_jMarkCommitWithTimestampMethodId =
WriteBatchHandlerJni::getMarkCommitWithTimestampMethodId(env);
if (m_jMarkCommitWithTimestampMethodId == nullptr) {
// exception thrown
return;
}
m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env); m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env);
if(m_jContinueMethodId == nullptr) { if(m_jContinueMethodId == nullptr) {
// exception thrown // exception thrown
@ -429,6 +436,23 @@ ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkCommit(
} }
} }
ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkCommitWithTimestamp(
const Slice& xid, const Slice& ts) {
auto markCommitWithTimestamp = [this](jbyteArray j_xid, jbyteArray j_ts) {
m_env->CallVoidMethod(m_jcallback_obj, m_jMarkCommitWithTimestampMethodId,
j_xid, j_ts);
};
auto status =
WriteBatchHandlerJniCallback::kv_op(xid, ts, markCommitWithTimestamp);
if (status == nullptr) {
return ROCKSDB_NAMESPACE::Status::OK(); // TODO(AR) what to do if there is
// an Exception but we don't know
// the ROCKSDB_NAMESPACE::Status?
} else {
return ROCKSDB_NAMESPACE::Status(*status);
}
}
bool WriteBatchHandlerJniCallback::Continue() { bool WriteBatchHandlerJniCallback::Continue() {
jboolean jContinue = m_env->CallBooleanMethod( jboolean jContinue = m_env->CallBooleanMethod(
m_jcallback_obj, m_jcallback_obj,

@ -48,6 +48,7 @@ class WriteBatchHandlerJniCallback : public JniCallback, public WriteBatch::Hand
Status MarkNoop(bool empty_batch); Status MarkNoop(bool empty_batch);
Status MarkRollback(const Slice& xid); Status MarkRollback(const Slice& xid);
Status MarkCommit(const Slice& xid); Status MarkCommit(const Slice& xid);
Status MarkCommitWithTimestamp(const Slice& xid, const Slice& commit_ts);
bool Continue(); bool Continue();
private: private:
@ -69,6 +70,7 @@ class WriteBatchHandlerJniCallback : public JniCallback, public WriteBatch::Hand
jmethodID m_jMarkNoopMethodId; jmethodID m_jMarkNoopMethodId;
jmethodID m_jMarkRollbackMethodId; jmethodID m_jMarkRollbackMethodId;
jmethodID m_jMarkCommitMethodId; jmethodID m_jMarkCommitMethodId;
jmethodID m_jMarkCommitWithTimestampMethodId;
jmethodID m_jContinueMethodId; jmethodID m_jContinueMethodId;
/** /**
* @return A pointer to a ROCKSDB_NAMESPACE::Status or nullptr if an * @return A pointer to a ROCKSDB_NAMESPACE::Status or nullptr if an

@ -321,6 +321,8 @@ public class WriteBatch extends AbstractWriteBatch {
throws RocksDBException; throws RocksDBException;
public abstract void markCommit(final byte[] xid) public abstract void markCommit(final byte[] xid)
throws RocksDBException; throws RocksDBException;
public abstract void markCommitWithTimestamp(final byte[] xid, final byte[] ts)
throws RocksDBException;
/** /**
* shouldContinue is called by the underlying iterator * shouldContinue is called by the underlying iterator

@ -119,6 +119,11 @@ public class CapturingWriteBatchHandler extends WriteBatch.Handler {
events.add(new Event(Action.MARK_COMMIT, (byte[])null, (byte[])null)); events.add(new Event(Action.MARK_COMMIT, (byte[])null, (byte[])null));
} }
@Override
public void markCommitWithTimestamp(final byte[] xid, final byte[] ts) throws RocksDBException {
events.add(new Event(Action.MARK_COMMIT_WITH_TIMESTAMP, (byte[]) null, (byte[]) null));
}
public static class Event { public static class Event {
public final Action action; public final Action action;
public final int columnFamilyId; public final int columnFamilyId;
@ -168,7 +173,18 @@ public class CapturingWriteBatchHandler extends WriteBatch.Handler {
* event actions * event actions
*/ */
public enum Action { public enum Action {
PUT, MERGE, DELETE, SINGLE_DELETE, DELETE_RANGE, LOG, PUT_BLOB_INDEX, PUT,
MARK_BEGIN_PREPARE, MARK_END_PREPARE, MARK_NOOP, MARK_COMMIT, MERGE,
MARK_ROLLBACK } DELETE,
SINGLE_DELETE,
DELETE_RANGE,
LOG,
PUT_BLOB_INDEX,
MARK_BEGIN_PREPARE,
MARK_END_PREPARE,
MARK_NOOP,
MARK_COMMIT,
MARK_ROLLBACK,
MARK_COMMIT_WITH_TIMESTAMP
}
} }

@ -131,4 +131,9 @@ public class WriteBatchGetter extends WriteBatch.Handler {
public void markCommit(final byte[] xid) throws RocksDBException { public void markCommit(final byte[] xid) throws RocksDBException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void markCommitWithTimestamp(final byte[] xid, final byte[] ts) throws RocksDBException {
throw new UnsupportedOperationException();
}
} }

@ -2322,6 +2322,14 @@ class InMemoryHandler : public WriteBatch::Handler {
return Status::OK(); return Status::OK();
} }
Status MarkCommitWithTimestamp(const Slice& xid,
const Slice& commit_ts) override {
row_ << "COMMIT_WITH_TIMESTAMP(";
row_ << LDBCommand::StringToHex(xid.ToString()) << ", ";
row_ << LDBCommand::StringToHex(commit_ts.ToString()) << ") ";
return Status::OK();
}
~InMemoryHandler() override {} ~InMemoryHandler() override {}
protected: protected:

@ -242,6 +242,12 @@ class TraceAnalyzer : private TraceRecord::Handler,
using WriteBatch::Handler::MarkCommit; using WriteBatch::Handler::MarkCommit;
Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); } Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); }
using WriteBatch::Handler::MarkCommitWithTimestamp;
Status MarkCommitWithTimestamp(const Slice& /*xid*/,
const Slice& /*commit_ts*/) override {
return Status::OK();
}
// Process each trace operation and output the analysis result to // Process each trace operation and output the analysis result to
// stdout/files. // stdout/files.
Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp, Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,

@ -663,6 +663,10 @@ Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
return Status::InvalidArgument(); return Status::InvalidArgument();
} }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
return Status::InvalidArgument();
}
Status MarkRollback(const Slice&) override { Status MarkRollback(const Slice&) override {
return Status::InvalidArgument(); return Status::InvalidArgument();
} }

Loading…
Cancel
Save