diff --git a/HISTORY.md b/HISTORY.md index edeb54f99..059572293 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,8 @@ # Rocksdb Change Log ## Unreleased ### New Features +* Introduced 'CommitWithTimestamp' as a new tag. Currently, there is no API for user to trigger a write with this tag to the WAL. This is part of the efforts to support write-commited transactions with user-defined timestamps. + ### Bug Fixes * Fixed a bug in rocksdb automatic implicit prefetching which got broken because of new feature adaptive_readahead and internal prefetching got disabled when iterator moves from one file to next. * Fixed a bug in TableOptions.prepopulate_block_cache which causes segmentation fault when used with TableOptions.partition_filters = true and TableOptions.cache_index_and_filter_blocks = true. diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index aff47896d..e4d709c8c 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -18,7 +18,7 @@ class DBBlobCompactionTest : public DBTestBase { #ifndef ROCKSDB_LITE const std::vector& GetCompactionStats() { - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); assert(versions->GetColumnFamilySet()); @@ -495,7 +495,7 @@ TEST_F(DBBlobCompactionTest, TrackGarbage) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); assert(versions->GetColumnFamilySet()); diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index b6075b6fb..0d8d1e33f 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2565,7 +2565,7 @@ TEST_F(DBBasicTest, DisableTrackWal) { ASSERT_OK(dbfull()->TEST_SwitchMemtable()); ASSERT_OK(db_->SyncWAL()); // Some WALs are tracked. - ASSERT_FALSE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty()); + ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty()); Close(); // Disable WAL tracking. @@ -2573,14 +2573,14 @@ TEST_F(DBBasicTest, DisableTrackWal) { options.create_if_missing = false; ASSERT_OK(TryReopen(options)); // Previously tracked WALs are cleared. - ASSERT_TRUE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty()); + ASSERT_TRUE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty()); Close(); // Re-enable WAL tracking again. options.track_and_verify_wals_in_manifest = true; options.create_if_missing = false; ASSERT_OK(TryReopen(options)); - ASSERT_TRUE(dbfull()->TEST_GetVersionSet()->GetWalSet().GetWals().empty()); + ASSERT_TRUE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty()); Close(); } #endif // !ROCKSDB_LITE diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index a2fe771d9..8a89a7cdf 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6127,7 +6127,7 @@ TEST_F(DBCompactionTest, CompactionWithBlob) { ASSERT_EQ(Get(first_key), third_value); ASSERT_EQ(Get(second_key), third_value); - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); @@ -6230,7 +6230,7 @@ TEST_P(DBCompactionTestBlobError, CompactionError) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); @@ -6352,7 +6352,7 @@ TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) { ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]); } - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); assert(versions->GetColumnFamilySet()); diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 0326a2cc7..fcee2f1a6 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -1608,7 +1608,7 @@ TEST_F(DBFlushTest, FlushWithBlob) { ASSERT_EQ(Get("key1"), short_value); ASSERT_EQ(Get("key2"), long_value); - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); @@ -1938,7 +1938,7 @@ TEST_P(DBFlushTestBlobError, FlushError) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); @@ -2149,7 +2149,7 @@ TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) { flush_edits.push_back({}); auto unflushed_cfh = static_cast(handles_[1]); - ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->TEST_GetVersionSet(), + ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(), flushed_cfds, flush_edits), unflushed_cfh->cfd()->GetLogNumber()); } @@ -2174,7 +2174,7 @@ TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) { std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber()); } ASSERT_EQ(min_log_number_to_keep, log_num_after_flush); - ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->TEST_GetVersionSet(), + ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(), flushed_cfds, flush_edits), min_log_number_to_keep); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4b55fbfbf..6812a37d2 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -942,6 +942,8 @@ class DBImpl : public DB { int max_entries_to_print, std::string* out_str); + VersionSet* GetVersionSet() const { return versions_.get(); } + #ifndef NDEBUG // Compact any files in the named level that overlap [*begin, *end] Status TEST_CompactRange(int level, const Slice* begin, const Slice* end, @@ -1049,8 +1051,6 @@ class DBImpl : public DB { void TEST_WaitForStatsDumpRun(std::function callback) const; size_t TEST_EstimateInMemoryStatsHistorySize() const; - VersionSet* TEST_GetVersionSet() const { return versions_.get(); } - uint64_t TEST_GetCurrentLogNumber() const { InstrumentedMutexLock l(mutex()); assert(!logs_.empty()); diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 8c5f05029..1e5af2e13 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -290,6 +290,10 @@ class DBImplSecondary : public DBImpl { Status MarkCommit(const Slice&) override { return Status::OK(); } + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + return Status::OK(); + } + Status MarkNoop(bool) override { return Status::OK(); } const std::unordered_set& column_families() const { diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 4168217ee..130b16fda 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1130,7 +1130,7 @@ std::string DBTestBase::FilesPerLevel(int cf) { #endif // !ROCKSDB_LITE std::vector DBTestBase::GetBlobFileNumbers() { - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 85e4cd103..b24a5c633 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -382,7 +382,7 @@ TEST_F(DBWALTest, RecoverWithBlob) { // There should be no files just yet since we haven't flushed. { - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); ASSERT_NE(versions, nullptr); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); @@ -412,7 +412,7 @@ TEST_F(DBWALTest, RecoverWithBlob) { ASSERT_EQ(Get("key1"), short_value); ASSERT_EQ(Get("key2"), long_value); - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); ASSERT_NE(versions, nullptr); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); @@ -477,7 +477,7 @@ TEST_F(DBWALTest, RecoverWithBlobMultiSST) { // There should be no files just yet since we haven't flushed. { - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); ASSERT_NE(versions, nullptr); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); @@ -509,7 +509,7 @@ TEST_F(DBWALTest, RecoverWithBlobMultiSST) { ASSERT_EQ(Get(Key(i)), large_value); } - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); ASSERT_NE(versions, nullptr); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); diff --git a/db/dbformat.h b/db/dbformat.h index b67c68697..339ce9ff4 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -65,7 +65,8 @@ enum ValueType : unsigned char { // another. kTypeBeginUnprepareXID = 0x13, // WAL only. kTypeDeletionWithTimestamp = 0x14, - kMaxValue = 0x7F // Not used for storing records. + kTypeCommitXIDAndTimestamp = 0x15, // WAL only + kMaxValue = 0x7F // Not used for storing records. }; // Defined in dbformat.cc @@ -654,7 +655,7 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, // Read record from a write batch piece from input. // tag, column_family, key, value and blob are return values. Callers own the -// Slice they point to. +// slice they point to. // Tag is defined as ValueType. // input will be advanced to after the record. extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, diff --git a/db/listener_test.cc b/db/listener_test.cc index 6e39e55a9..cbfa13b1e 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -1246,7 +1246,7 @@ class BlobDBJobLevelEventListenerTest : public EventListener { } const VersionStorageInfo::BlobFiles& GetBlobFiles() { - VersionSet* const versions = test_->dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = test_->dbfull()->GetVersionSet(); assert(versions); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); diff --git a/db/obsolete_files_test.cc b/db/obsolete_files_test.cc index 83dd2f03f..efd6a4316 100644 --- a/db/obsolete_files_test.cc +++ b/db/obsolete_files_test.cc @@ -189,7 +189,7 @@ TEST_F(ObsoleteFilesTest, DeleteObsoleteOptionsFile) { TEST_F(ObsoleteFilesTest, BlobFiles) { ReopenDB(); - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); assert(versions->GetColumnFamilySet()); diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index e1e168597..cc53b42ab 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -266,6 +266,10 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { sequence_++; return Status::OK(); } + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + ++sequence_; + return Status::OK(); + } Status PutCF(uint32_t /*cf*/, const Slice& /*key*/, const Slice& /*val*/) override { diff --git a/db/write_batch.cc b/db/write_batch.cc index 13ec947b7..c66bcb9ea 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -134,6 +134,11 @@ struct BatchContentClassifier : public WriteBatch::Handler { return Status::OK(); } + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_COMMIT; + return Status::OK(); + } + Status MarkRollback(const Slice&) override { content_flags |= ContentFlags::HAS_ROLLBACK; return Status::OK(); @@ -416,6 +421,11 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::Corruption("bad EndPrepare XID"); } break; + case kTypeCommitXIDAndTimestamp: + if (!GetLengthPrefixedSlice(input, key)) { + return Status::Corruption("bad commit timestamp"); + } + FALLTHROUGH_INTENDED; case kTypeCommitXID: if (!GetLengthPrefixedSlice(input, xid)) { return Status::Corruption("bad Commit XID"); @@ -625,6 +635,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, assert(s.ok()); empty_batch = true; break; + case kTypeCommitXIDAndTimestamp: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); + // key stores the commit timestamp. + assert(!key.empty()); + s = handler->MarkCommitWithTimestamp(xid, key); + if (LIKELY(s.ok())) { + empty_batch = true; + } + break; case kTypeRollbackXID: assert(wb->content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); @@ -824,6 +844,19 @@ Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { return Status::OK(); } +Status WriteBatchInternal::MarkCommitWithTimestamp(WriteBatch* b, + const Slice& xid, + const Slice& commit_ts) { + assert(!commit_ts.empty()); + b->rep_.push_back(static_cast(kTypeCommitXIDAndTimestamp)); + PutLengthPrefixedSlice(&b->rep_, commit_ts); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_COMMIT, + std::memory_order_relaxed); + return Status::OK(); +} + Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) { b->rep_.push_back(static_cast(kTypeRollbackXID)); PutLengthPrefixedSlice(&b->rep_, xid); @@ -2072,6 +2105,8 @@ class MemTableInserter : public WriteBatch::Handler { Status s; if (recovering_log_number_ != 0) { + // We must hold db mutex in recovery. + db_->mutex()->AssertHeld(); // in recovery when we encounter a commit marker // we lookup this transaction in our set of rebuilt transactions // and commit. @@ -2114,6 +2149,76 @@ class MemTableInserter : public WriteBatch::Handler { return s; } + Status MarkCommitWithTimestamp(const Slice& name, + const Slice& commit_ts) override { + assert(db_); + + Status s; + + if (recovering_log_number_ != 0) { + // In recovery, db mutex must be held. + db_->mutex()->AssertHeld(); + // in recovery when we encounter a commit marker + // we lookup this transaction in our set of rebuilt transactions + // and commit. + auto trx = db_->GetRecoveredTransaction(name.ToString()); + // the log containing the prepared section may have + // been released in the last incarnation because the + // data was flushed to L0 + if (trx) { + // at this point individual CF lognumbers will prevent + // duplicate re-insertion of values. + assert(0 == log_number_ref_); + if (write_after_commit_) { + // write_after_commit_ can only have one batch in trx. + assert(trx->batches_.size() == 1); + const auto& batch_info = trx->batches_.begin()->second; + // all inserts must reference this trx log number + log_number_ref_ = batch_info.log_number_; + const auto checker = [this](uint32_t cf, size_t& ts_sz) { + assert(db_); + VersionSet* const vset = db_->GetVersionSet(); + assert(vset); + ColumnFamilySet* const cf_set = vset->GetColumnFamilySet(); + assert(cf_set); + ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf); + assert(cfd); + const auto* const ucmp = cfd->user_comparator(); + assert(ucmp); + if (ucmp->timestamp_size() == 0) { + ts_sz = 0; + } else if (ucmp->timestamp_size() != ts_sz) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + return Status::OK(); + }; + s = batch_info.batch_->AssignTimestamp(commit_ts, checker); + if (s.ok()) { + s = batch_info.batch_->Iterate(this); + log_number_ref_ = 0; + } + } + // else the values are already inserted before the commit + + if (s.ok()) { + db_->DeleteRecoveredTransaction(name.ToString()); + } + if (has_valid_writes_) { + *has_valid_writes_ = true; + } + } + } else { + // When writes are not delayed until commit, there is no connection + // between a memtable write and the WAL that supports it. So the commit + // need not reference any log as the only log to which it depends. + assert(!write_after_commit_ || log_number_ref_ > 0); + } + constexpr bool batch_boundary = true; + MaybeAdvanceSeq(batch_boundary); + + return s; + } + Status MarkRollback(const Slice& name) override { assert(db_); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 13afa822b..3e221be32 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -124,6 +124,9 @@ class WriteBatchInternal { static Status MarkCommit(WriteBatch* batch, const Slice& xid); + static Status MarkCommitWithTimestamp(WriteBatch* batch, const Slice& xid, + const Slice& commit_ts); + static Status InsertNoop(WriteBatch* batch); // Return the number of entries in the batch. @@ -302,8 +305,14 @@ class TimestampAssignerBase : public WriteBatch::Handler { Status MarkCommit(const Slice&) override { return Status::OK(); } + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + return Status::OK(); + } + Status MarkRollback(const Slice&) override { return Status::OK(); } + Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } + protected: Status AssignTimestamp(uint32_t cf, const Slice& key) { Status s = static_cast_with_check(this)->AssignTimestampImpl( diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index ea74e79a3..a777f0586 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -318,6 +318,11 @@ namespace { seen += "MarkCommit(" + xid.ToString() + ")"; return Status::OK(); } + Status MarkCommitWithTimestamp(const Slice& xid, const Slice& ts) override { + seen += "MarkCommitWithTimestamp(" + xid.ToString() + ", " + + ts.ToString(true) + ")"; + return Status::OK(); + } Status MarkRollback(const Slice& xid) override { seen += "MarkRollback(" + xid.ToString() + ")"; return Status::OK(); @@ -1057,6 +1062,20 @@ TEST_F(WriteBatchTest, AssignTimestamps) { batch, std::string(timestamp_size, '\xee'), cf_to_ucmps)); } +TEST_F(WriteBatchTest, CommitWithTimestamp) { + WriteBatch wb; + const std::string txn_name = "xid1"; + std::string ts; + constexpr uint64_t commit_ts = 23; + PutFixed64(&ts, commit_ts); + ASSERT_OK(WriteBatchInternal::MarkCommitWithTimestamp(&wb, txn_name, ts)); + TestHandler handler; + ASSERT_OK(wb.Iterate(&handler)); + ASSERT_EQ("MarkCommitWithTimestamp(" + txn_name + ", " + + Slice(ts).ToString(true) + ")", + handler.seen); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 8d03a74e3..888aaef63 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -288,6 +288,12 @@ class WriteBatch : public WriteBatchBase { return Status::InvalidArgument("MarkCommit() handler not defined."); } + virtual Status MarkCommitWithTimestamp(const Slice& /*xid*/, + const Slice& /*commit_ts*/) { + return Status::InvalidArgument( + "MarkCommitWithTimestamp() handler not defined."); + } + // Continue is called by WriteBatch::Iterate. If it returns false, // iteration is halted. Otherwise, it continues iterating. The default // implementation always returns true. diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index fd8c0ed1e..061ebfb2a 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -3240,6 +3240,27 @@ class WriteBatchHandlerJni return mid; } + /** + * Get the Java Method: WriteBatch.Handler#markCommitWithTimestamp + * + * @param env A pointer to the Java environment + * + * @return The Java Method ID or nullptr if the class or method id could not + * be retrieved + */ + static jmethodID getMarkCommitWithTimestampMethodId(JNIEnv* env) { + jclass jclazz = getJClass(env); + if (jclazz == nullptr) { + // exception occurred accessing class + return nullptr; + } + + static jmethodID mid = + env->GetMethodID(jclazz, "markCommitWithTimestamp", "([B[B)V"); + assert(mid != nullptr); + return mid; + } + /** * Get the Java Method: WriteBatch.Handler#shouldContinue * diff --git a/java/rocksjni/writebatchhandlerjnicallback.cc b/java/rocksjni/writebatchhandlerjnicallback.cc index 4ecb6b2d1..b9f42f904 100644 --- a/java/rocksjni/writebatchhandlerjnicallback.cc +++ b/java/rocksjni/writebatchhandlerjnicallback.cc @@ -108,7 +108,7 @@ WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback( // exception thrown return; } - + m_jMarkRollbackMethodId = WriteBatchHandlerJni::getMarkRollbackMethodId(env); if(m_jMarkRollbackMethodId == nullptr) { // exception thrown @@ -121,6 +121,13 @@ WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback( return; } + m_jMarkCommitWithTimestampMethodId = + WriteBatchHandlerJni::getMarkCommitWithTimestampMethodId(env); + if (m_jMarkCommitWithTimestampMethodId == nullptr) { + // exception thrown + return; + } + m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env); if(m_jContinueMethodId == nullptr) { // exception thrown @@ -429,6 +436,23 @@ ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkCommit( } } +ROCKSDB_NAMESPACE::Status WriteBatchHandlerJniCallback::MarkCommitWithTimestamp( + const Slice& xid, const Slice& ts) { + auto markCommitWithTimestamp = [this](jbyteArray j_xid, jbyteArray j_ts) { + m_env->CallVoidMethod(m_jcallback_obj, m_jMarkCommitWithTimestampMethodId, + j_xid, j_ts); + }; + auto status = + WriteBatchHandlerJniCallback::kv_op(xid, ts, markCommitWithTimestamp); + if (status == nullptr) { + return ROCKSDB_NAMESPACE::Status::OK(); // TODO(AR) what to do if there is + // an Exception but we don't know + // the ROCKSDB_NAMESPACE::Status? + } else { + return ROCKSDB_NAMESPACE::Status(*status); + } +} + bool WriteBatchHandlerJniCallback::Continue() { jboolean jContinue = m_env->CallBooleanMethod( m_jcallback_obj, diff --git a/java/rocksjni/writebatchhandlerjnicallback.h b/java/rocksjni/writebatchhandlerjnicallback.h index a4c61f8bd..c12ffe0d9 100644 --- a/java/rocksjni/writebatchhandlerjnicallback.h +++ b/java/rocksjni/writebatchhandlerjnicallback.h @@ -48,6 +48,7 @@ class WriteBatchHandlerJniCallback : public JniCallback, public WriteBatch::Hand Status MarkNoop(bool empty_batch); Status MarkRollback(const Slice& xid); Status MarkCommit(const Slice& xid); + Status MarkCommitWithTimestamp(const Slice& xid, const Slice& commit_ts); bool Continue(); private: @@ -69,6 +70,7 @@ class WriteBatchHandlerJniCallback : public JniCallback, public WriteBatch::Hand jmethodID m_jMarkNoopMethodId; jmethodID m_jMarkRollbackMethodId; jmethodID m_jMarkCommitMethodId; + jmethodID m_jMarkCommitWithTimestampMethodId; jmethodID m_jContinueMethodId; /** * @return A pointer to a ROCKSDB_NAMESPACE::Status or nullptr if an diff --git a/java/src/main/java/org/rocksdb/WriteBatch.java b/java/src/main/java/org/rocksdb/WriteBatch.java index a0ee1424d..9b46108d0 100644 --- a/java/src/main/java/org/rocksdb/WriteBatch.java +++ b/java/src/main/java/org/rocksdb/WriteBatch.java @@ -321,6 +321,8 @@ public class WriteBatch extends AbstractWriteBatch { throws RocksDBException; public abstract void markCommit(final byte[] xid) throws RocksDBException; + public abstract void markCommitWithTimestamp(final byte[] xid, final byte[] ts) + throws RocksDBException; /** * shouldContinue is called by the underlying iterator diff --git a/java/src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java b/java/src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java index f80e69c1c..8ea104332 100644 --- a/java/src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java +++ b/java/src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java @@ -119,6 +119,11 @@ public class CapturingWriteBatchHandler extends WriteBatch.Handler { events.add(new Event(Action.MARK_COMMIT, (byte[])null, (byte[])null)); } + @Override + public void markCommitWithTimestamp(final byte[] xid, final byte[] ts) throws RocksDBException { + events.add(new Event(Action.MARK_COMMIT_WITH_TIMESTAMP, (byte[]) null, (byte[]) null)); + } + public static class Event { public final Action action; public final int columnFamilyId; @@ -168,7 +173,18 @@ public class CapturingWriteBatchHandler extends WriteBatch.Handler { * event actions */ public enum Action { - PUT, MERGE, DELETE, SINGLE_DELETE, DELETE_RANGE, LOG, PUT_BLOB_INDEX, - MARK_BEGIN_PREPARE, MARK_END_PREPARE, MARK_NOOP, MARK_COMMIT, - MARK_ROLLBACK } + PUT, + MERGE, + DELETE, + SINGLE_DELETE, + DELETE_RANGE, + LOG, + PUT_BLOB_INDEX, + MARK_BEGIN_PREPARE, + MARK_END_PREPARE, + MARK_NOOP, + MARK_COMMIT, + MARK_ROLLBACK, + MARK_COMMIT_WITH_TIMESTAMP + } } diff --git a/java/src/test/java/org/rocksdb/util/WriteBatchGetter.java b/java/src/test/java/org/rocksdb/util/WriteBatchGetter.java index 646e8b8f8..2efa16473 100644 --- a/java/src/test/java/org/rocksdb/util/WriteBatchGetter.java +++ b/java/src/test/java/org/rocksdb/util/WriteBatchGetter.java @@ -131,4 +131,9 @@ public class WriteBatchGetter extends WriteBatch.Handler { public void markCommit(final byte[] xid) throws RocksDBException { throw new UnsupportedOperationException(); } + + @Override + public void markCommitWithTimestamp(final byte[] xid, final byte[] ts) throws RocksDBException { + throw new UnsupportedOperationException(); + } } diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index f44f1bffd..3725cf3ba 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -2322,6 +2322,14 @@ class InMemoryHandler : public WriteBatch::Handler { return Status::OK(); } + Status MarkCommitWithTimestamp(const Slice& xid, + const Slice& commit_ts) override { + row_ << "COMMIT_WITH_TIMESTAMP("; + row_ << LDBCommand::StringToHex(xid.ToString()) << ", "; + row_ << LDBCommand::StringToHex(commit_ts.ToString()) << ") "; + return Status::OK(); + } + ~InMemoryHandler() override {} protected: diff --git a/tools/trace_analyzer_tool.h b/tools/trace_analyzer_tool.h index e27c7ed1b..4b885b18c 100644 --- a/tools/trace_analyzer_tool.h +++ b/tools/trace_analyzer_tool.h @@ -242,6 +242,12 @@ class TraceAnalyzer : private TraceRecord::Handler, using WriteBatch::Handler::MarkCommit; Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); } + using WriteBatch::Handler::MarkCommitWithTimestamp; + Status MarkCommitWithTimestamp(const Slice& /*xid*/, + const Slice& /*commit_ts*/) override { + return Status::OK(); + } + // Process each trace operation and output the analysis result to // stdout/files. Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp, diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index eae384b1b..9775a1c1b 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -663,6 +663,10 @@ Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) { return Status::InvalidArgument(); } + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + return Status::InvalidArgument(); + } + Status MarkRollback(const Slice&) override { return Status::InvalidArgument(); }