Optimistic Transactions

Summary: Optimistic transactions supporting begin/commit/rollback semantics.  Currently relies on checking the memtable to determine if there are any collisions at commit time.  Not yet implemented would be a way of enuring the memtable has some minimum amount of history so that we won't fail to commit when the memtable is empty.  You should probably start with transaction.h to get an overview of what is currently supported.

Test Plan: Added a new test, but still need to look into stress testing.

Reviewers: yhchiang, igor, rven, sdong

Reviewed By: sdong

Subscribers: adamretter, MarkCallaghan, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D33435
main
agiardullo 10 years ago
parent d5a0c0e69b
commit dc9d70de65
  1. 6
      HISTORY.md
  2. 12
      Makefile
  3. 1
      ROCKSDB_LITE.md
  4. 10
      db/column_family.cc
  5. 9
      db/column_family.h
  6. 266
      db/db_bench.cc
  7. 196
      db/db_impl.cc
  8. 72
      db/db_impl.h
  9. 1
      db/db_test.cc
  10. 8
      db/dbformat.cc
  11. 5
      db/dbformat.h
  12. 3
      db/flush_job_test.cc
  13. 39
      db/memtable.cc
  14. 35
      db/memtable.h
  15. 49
      db/memtable_list.cc
  16. 30
      db/memtable_list.h
  17. 27
      db/memtable_list_test.cc
  18. 5
      db/repair.cc
  19. 3
      db/version_set.cc
  20. 5
      db/write_batch_test.cc
  21. 24
      db/write_callback.h
  22. 120
      db/write_callback_test.cc
  23. 13
      db/write_thread.cc
  24. 2
      db/write_thread.h
  25. 1
      examples/.gitignore
  26. 7
      examples/Makefile
  27. 142
      examples/transaction_example.cc
  28. 3
      include/rocksdb/db.h
  29. 11
      include/rocksdb/status.h
  30. 233
      include/rocksdb/utilities/optimistic_transaction.h
  31. 64
      include/rocksdb/utilities/optimistic_transaction_db.h
  32. 2
      include/rocksdb/utilities/stackable_db.h
  33. 2
      java/rocksjni/write_batch_test.cc
  34. 7
      src.mk
  35. 8
      table/table_test.cc
  36. 17
      tools/benchmark.sh
  37. 3
      util/status.cc
  38. 105
      util/xfunc.cc
  39. 6
      util/xfunc.h
  40. 80
      utilities/transactions/optimistic_transaction_db_impl.cc
  41. 33
      utilities/transactions/optimistic_transaction_db_impl.h
  42. 339
      utilities/transactions/optimistic_transaction_impl.cc
  43. 196
      utilities/transactions/optimistic_transaction_impl.h
  44. 846
      utilities/transactions/optimistic_transaction_test.cc

@ -1,12 +1,14 @@
# Rocksdb Change Log
## Public API changes
### New Features
* Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info.
### Public API changes
* DB::GetDbIdentity() is now a const function. If this function is overridden in your application, be sure to also make GetDbIdentity() const to avoid compile error.
* Move listeners from ColumnFamilyOptions to DBOptions.
* Add max_write_buffer_number_to_maintain option
## 3.11.0 (5/19/2015)
### New Features
* Added a new API Cache::SetCapacity(size_t capacity) to dynamically change the maximum configured capacity of the cache. If the new capacity is less than the existing cache usage, the implementation will try to lower the usage by evicting the necessary number of elements following a strict LRU policy.
* Added an experimental API for handling flashcache devices (blacklists background threads from caching their reads) -- NewFlashcacheAwareEnv

@ -30,7 +30,7 @@ quoted_perl_command = $(subst ','\'',$(perl_command))
# We use this debug level when developing RocksDB.
# * DEBUG_LEVEL=0; this is the debug level we use for release. If you're
# running rocksdb in production you most definitely want to compile RocksDB
# with debug level 0. To compile with level 0, run `make shared_lib`,
# with debug level 0. To compile with level 0, run `make shared_lib`,
# `make install-shared`, `make static_lib`, `make install-static` or
# `make install`
DEBUG_LEVEL=1
@ -287,7 +287,9 @@ TESTS = \
thread_list_test \
sst_dump_test \
compact_files_test \
perf_context_test
perf_context_test \
optimistic_transaction_test \
write_callback_test
SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/)
@ -817,6 +819,9 @@ sst_dump_test: util/sst_dump_test.o $(LIBOBJECTS) $(TESTHARNESS)
memenv_test : util/memenv_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
optimistic_transaction_test: utilities/transactions/optimistic_transaction_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
mock_env_test : util/mock_env_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
@ -832,6 +837,9 @@ auto_roll_logger_test: util/auto_roll_logger_test.o $(LIBOBJECTS) $(TESTHARNESS)
memtable_list_test: db/memtable_list_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
write_callback_test: db/write_callback_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
sst_dump: tools/sst_dump.o $(LIBOBJECTS)
$(AM_LINK)

@ -8,6 +8,7 @@ Some examples of the features disabled by ROCKSDB_LITE:
* No support for replication (which we provide in form of TrasactionalIterator)
* No advanced monitoring tools
* No special-purpose memtables that are highly optimized for specific use cases
* No Transactions
When adding a new big feature to RocksDB, please add ROCKSDB_LITE compile guard if:
* Nobody from mobile really needs your feature,

@ -527,18 +527,18 @@ uint64_t ColumnFamilyData::GetNumLiveVersions() const {
}
MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
assert(current_ != nullptr);
return new MemTable(internal_comparator_, ioptions_,
mutable_cf_options, write_buffer_);
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_, earliest_seq);
}
void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
if (mem_ != nullptr) {
delete mem_->Unref();
}
SetMemtable(ConstructNewMemtable(mutable_cf_options));
SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
mem_->Ref();
}

@ -223,10 +223,13 @@ class ColumnFamilyData {
Version* dummy_versions() { return dummy_versions_; }
void SetCurrent(Version* current);
uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options);
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options);
// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
TableCache* table_cache() const { return table_cache_.get(); }

@ -46,6 +46,8 @@ int main() {
#include "rocksdb/slice_transform.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/utilities/flashcache.h"
#include "rocksdb/utilities/optimistic_transaction.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "util/crc32c.h"
@ -94,7 +96,8 @@ DEFINE_string(benchmarks,
"compress,"
"uncompress,"
"acquireload,"
"fillseekseq,",
"fillseekseq,"
"randomtransaction",
"Comma-separated list of operations to run in the specified order"
"Actual benchmarks:\n"
@ -145,6 +148,8 @@ DEFINE_string(benchmarks,
"\tacquireload -- load N*1000 times\n"
"\tfillseekseq -- write N values in sequential key, then read "
"them by seeking to each key\n"
"\trandomtransaction -- execute N random transactions and "
"verify correctness\n"
"Meta operations:\n"
"\tcompact -- Compact the entire DB\n"
"\tstats -- Print DB stats\n"
@ -423,6 +428,18 @@ DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
DEFINE_uint64(delete_obsolete_files_period_micros, 0,
"Ignored. Left here for backward compatibility");
DEFINE_bool(transaction_db, false,
"Open a OptimisticTransactionDB instance. "
"Required for randomtransaction benchmark.");
DEFINE_uint64(transaction_sets, 2,
"Number of keys each transaction will "
"modify (use in RandomTransaction only). Max: 9999");
DEFINE_int32(transaction_sleep, 0,
"Max microseconds to sleep in between "
"reading and writing a value (used in RandomTransaction only). ");
namespace {
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype);
@ -866,6 +883,7 @@ static void AppendWithSpace(std::string* str, Slice msg) {
struct DBWithColumnFamilies {
std::vector<ColumnFamilyHandle*> cfh;
DB* db;
OptimisticTransactionDB* txn_db;
std::atomic<size_t> num_created; // Need to be updated after all the
// new entries in cfh are set.
size_t num_hot; // Number of column families to be queried at each moment.
@ -873,7 +891,7 @@ struct DBWithColumnFamilies {
// Column families will be created and used to be queried.
port::Mutex create_cf_mutex; // Only one thread can execute CreateNewCf()
DBWithColumnFamilies() : db(nullptr) {
DBWithColumnFamilies() : db(nullptr), txn_db(nullptr) {
cfh.clear();
num_created = 0;
num_hot = 0;
@ -882,9 +900,23 @@ struct DBWithColumnFamilies {
DBWithColumnFamilies(const DBWithColumnFamilies& other)
: cfh(other.cfh),
db(other.db),
txn_db(other.txn_db),
num_created(other.num_created.load()),
num_hot(other.num_hot) {}
void DeleteDBs() {
std::for_each(cfh.begin(), cfh.end(),
[](ColumnFamilyHandle* cfhi) { delete cfhi; });
cfh.clear();
if (txn_db) {
delete txn_db;
txn_db = nullptr;
} else {
delete db;
}
db = nullptr;
}
ColumnFamilyHandle* GetCfh(int64_t rand_num) {
assert(num_hot > 0);
return cfh[num_created.load(std::memory_order_acquire) - num_hot +
@ -1487,9 +1519,7 @@ class Benchmark {
}
~Benchmark() {
std::for_each(db_.cfh.begin(), db_.cfh.end(),
[](ColumnFamilyHandle* cfh) { delete cfh; });
delete db_.db;
db_.DeleteDBs();
delete prefix_extractor_;
if (cache_.get() != nullptr) {
// this will leak, but we're shutting down so nobody cares
@ -1593,6 +1623,8 @@ class Benchmark {
write_options_.disableWAL = FLAGS_disable_wal;
void (Benchmark::*method)(ThreadState*) = nullptr;
void (Benchmark::*post_process_method)() = nullptr;
bool fresh_db = false;
int num_threads = FLAGS_threads;
@ -1708,6 +1740,9 @@ class Benchmark {
method = &Benchmark::Compress;
} else if (name == Slice("uncompress")) {
method = &Benchmark::Uncompress;
} else if (name == Slice("randomtransaction")) {
method = &Benchmark::RandomTransaction;
post_process_method = &Benchmark::RandomTransactionVerify;
} else if (name == Slice("stats")) {
PrintStats("rocksdb.stats");
} else if (name == Slice("levelstats")) {
@ -1728,11 +1763,7 @@ class Benchmark {
method = nullptr;
} else {
if (db_.db != nullptr) {
std::for_each(db_.cfh.begin(), db_.cfh.end(),
[](ColumnFamilyHandle* cfh) { delete cfh; });
delete db_.db;
db_.db = nullptr;
db_.cfh.clear();
db_.DeleteDBs();
DestroyDB(FLAGS_db, open_options_);
}
for (size_t i = 0; i < multi_dbs_.size(); i++) {
@ -1748,6 +1779,9 @@ class Benchmark {
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
RunBenchmark(num_threads, name, method);
}
if (post_process_method != nullptr) {
(this->*post_process_method)();
}
}
if (FLAGS_statistics) {
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
@ -2293,6 +2327,11 @@ class Benchmark {
NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec));
}
if (FLAGS_readonly && FLAGS_transaction_db) {
fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
exit(1);
}
if (FLAGS_num_multi_db <= 1) {
OpenDb(options, FLAGS_db, &db_);
} else {
@ -2327,15 +2366,25 @@ class Benchmark {
if (FLAGS_readonly) {
s = DB::OpenForReadOnly(options, db_name, column_families,
&db->cfh, &db->db);
} else if (FLAGS_transaction_db) {
s = OptimisticTransactionDB::Open(options, db_name, column_families,
&db->cfh, &db->txn_db);
if (s.ok()) {
db->db = db->txn_db->GetBaseDB();
}
} else {
s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
}
db->cfh.resize(FLAGS_num_column_families);
db->num_created = num_hot;
db->num_hot = num_hot;
} else if (FLAGS_readonly) {
s = DB::OpenForReadOnly(options, db_name, &db->db);
} else if (FLAGS_transaction_db) {
s = OptimisticTransactionDB::Open(options, db_name, &db->txn_db);
if (s.ok()) {
db->db = db->txn_db->GetBaseDB();
}
} else {
s = DB::Open(options, db_name, &db->db);
}
@ -3376,6 +3425,201 @@ class Benchmark {
}
}
// This benchmark stress tests Transactions. For a given --duration (or
// total number of --writes, a Transaction will perform a read-modify-write
// to increment the value of a key in each of N(--transaction-sets) sets of
// keys (where each set has --num keys). If --threads is set, this will be
// done in parallel.
//
// To test transactions, use --transaction_db=true. Not setting this
// parameter
// will run the same benchmark without transactions.
//
// RandomTransactionVerify() will then validate the correctness of the results
// by checking if the sum of all keys in each set is the same.
void RandomTransaction(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
Duration duration(FLAGS_duration, readwrites_);
ReadOptions read_options(FLAGS_verify_checksum, true);
std::string value;
DB* db = db_.db;
uint64_t transactions_done = 0;
uint64_t transactions_aborted = 0;
Status s;
uint64_t num_prefix_ranges = FLAGS_transaction_sets;
bool use_txn = FLAGS_transaction_db;
if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
fprintf(stderr, "invalid value for transaction_sets\n");
abort();
}
if (FLAGS_num_multi_db > 1) {
fprintf(stderr,
"Cannot run RandomTransaction benchmark with "
"FLAGS_multi_db > 1.");
abort();
}
while (!duration.Done(1)) {
OptimisticTransaction* txn = nullptr;
WriteBatch* batch = nullptr;
if (use_txn) {
txn = db_.txn_db->BeginTransaction(write_options_);
assert(txn);
} else {
batch = new WriteBatch();
}
// pick a random number to use to increment a key in each set
uint64_t incr = (thread->rand.Next() % 100) + 1;
// For each set, pick a key at random and increment it
for (uint8_t i = 0; i < num_prefix_ranges; i++) {
uint64_t int_value;
char prefix_buf[5];
// key format: [SET#][random#]
std::string rand_key = ToString(thread->rand.Next() % FLAGS_num);
Slice base_key(rand_key);
// Pad prefix appropriately so we can iterate over each set
snprintf(prefix_buf, sizeof(prefix_buf), "%04d", i + 1);
std::string full_key = std::string(prefix_buf) + base_key.ToString();
Slice key(full_key);
if (use_txn) {
s = txn->Get(read_options, key, &value);
} else {
s = db->Get(read_options, key, &value);
}
if (s.ok()) {
int_value = std::stoull(value);
if (int_value == 0 || int_value == ULONG_MAX) {
fprintf(stderr, "Get returned unexpected value: %s\n",
value.c_str());
abort();
}
} else if (s.IsNotFound()) {
int_value = 0;
} else {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort();
}
if (FLAGS_transaction_sleep > 0) {
FLAGS_env->SleepForMicroseconds(thread->rand.Next() %
FLAGS_transaction_sleep);
}
std::string sum = ToString(int_value + incr);
if (use_txn) {
txn->Put(key, sum);
} else {
batch->Put(key, sum);
}
}
if (use_txn) {
s = txn->Commit();
} else {
s = db->Write(write_options_, batch);
}
if (!s.ok()) {
// Ideally, we'd want to run this stress test with enough concurrency
// on a small enough set of keys that we get some failed transactions
// due to conflicts.
if (use_txn && s.IsBusy()) {
transactions_aborted++;
} else {
fprintf(stderr, "Unexpected write error: %s\n", s.ToString().c_str());
abort();
}
}
if (txn) {
delete txn;
}
if (batch) {
delete batch;
}
transactions_done++;
}
char msg[100];
if (use_txn) {
snprintf(msg, sizeof(msg),
"( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
transactions_done, transactions_aborted);
} else {
snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
}
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > 0) {
thread->stats.AddMessage(perf_context.ToString());
}
}
// Verifies consistency of data after RandomTransaction() has been run.
// Since each iteration of RandomTransaction() incremented a key in each set
// by the same value, the sum of the keys in each set should be the same.
void RandomTransactionVerify() {
if (!FLAGS_transaction_db) {
// transactions not used, nothing to verify.
return;
}
uint64_t prev_total = 0;
// For each set of keys with the same prefix, sum all the values
for (uint32_t i = 0; i < FLAGS_transaction_sets; i++) {
char prefix_buf[5];
snprintf(prefix_buf, sizeof(prefix_buf), "%04u", i + 1);
uint64_t total = 0;
Iterator* iter = db_.db->NewIterator(ReadOptions());
for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) {
Slice key = iter->key();
// stop when we reach a different prefix
if (key.ToString().compare(0, 4, prefix_buf) != 0) {
break;
}
Slice value = iter->value();
uint64_t int_value = std::stoull(value.ToString());
if (int_value == 0 || int_value == ULONG_MAX) {
fprintf(stderr, "Iter returned unexpected value: %s\n",
value.ToString().c_str());
abort();
}
total += int_value;
}
delete iter;
if (i > 0) {
if (total != prev_total) {
fprintf(stderr,
"RandomTransactionVerify found inconsistent totals. "
"Set[%u]: %lu, Set[%u]: %lu \n",
i - 1, prev_total, i, total);
abort();
}
}
prev_total = total;
}
fprintf(stdout, "RandomTransactionVerify Success! Total:%lu\n", prev_total);
}
void Compact(ThreadState* thread) {
DB* db = SelectDB(thread);
db->CompactRange(nullptr, nullptr);

@ -48,6 +48,7 @@
#include "db/version_set.h"
#include "db/writebuffer.h"
#include "db/write_batch_internal.h"
#include "db/write_callback.h"
#include "port/port.h"
#include "rocksdb/cache.h"
#include "port/likely.h"
@ -866,7 +867,7 @@ Status DBImpl::Recover(
s = CheckConsistency();
}
if (s.ok()) {
SequenceNumber max_sequence(0);
SequenceNumber max_sequence(kMaxSequenceNumber);
default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
@ -917,7 +918,8 @@ Status DBImpl::Recover(
if (!s.ok()) {
// Clear memtables if recovery failed
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
kMaxSequenceNumber);
}
}
}
@ -1035,7 +1037,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
if ((*max_sequence == kMaxSequenceNumber) || (last_seq > *max_sequence)) {
*max_sequence = last_seq;
}
@ -1058,7 +1060,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// file-systems cause the DB::Open() to fail.
return status;
}
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
*max_sequence);
}
}
}
@ -1068,7 +1072,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
flush_scheduler_.Clear();
if (versions_->LastSequence() < *max_sequence) {
if ((*max_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() < *max_sequence)) {
versions_->SetLastSequence(*max_sequence);
}
}
@ -1099,7 +1104,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// Recovery failed
break;
}
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
*max_sequence);
}
// write MANIFEST with update
@ -2657,10 +2664,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
} else {
snapshot = versions_->LastSequence();
}
// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
@ -3156,9 +3161,32 @@ Status DBImpl::Delete(const WriteOptions& write_options,
}
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
return WriteImpl(write_options, my_batch, nullptr);
}
Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback) {
return WriteImpl(write_options, my_batch, callback);
}
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback) {
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}
Status status;
bool xfunc_attempted_write = false;
XFUNC_TEST("transaction", "transaction_xftest_write_impl",
xf_transaction_write1, xf_transaction_write, write_options,
db_options_, my_batch, callback, this, &status,
&xfunc_attempted_write);
if (xfunc_attempted_write) {
// Test already did the write
return status;
}
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(&mutex_);
w.batch = my_batch;
@ -3166,6 +3194,7 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
w.disableWAL = write_options.disableWAL;
w.in_batch_group = false;
w.done = false;
w.has_callback = (callback != nullptr) ? true : false;
w.timeout_hint_us = write_options.timeout_hint_us;
uint64_t expiration_time = 0;
@ -3188,7 +3217,7 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
}
Status status = write_thread_.EnterWriteThread(&w, expiration_time);
status = write_thread_.EnterWriteThread(&w, expiration_time);
assert(status.ok() || status.IsTimedOut());
if (status.IsTimedOut()) {
mutex_.Unlock();
@ -3290,16 +3319,30 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
uint64_t last_sequence = versions_->LastSequence();
WriteThread::Writer* last_writer = &w;
autovector<WriteBatch*> write_batch_group;
if (status.ok()) {
autovector<WriteBatch*> write_batch_group;
write_thread_.BuildBatchGroup(&last_writer, &write_batch_group);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into memtables
{
mutex_.Unlock();
mutex_.Unlock();
if (callback != nullptr) {
// If this write has a validation callback, check to see if this write
// is able to be written. Must be called on the write thread.
status = callback->Callback(this);
}
} else {
mutex_.Unlock();
}
// At this point the mutex is unlocked
if (status.ok()) {
WriteBatch* updates = nullptr;
if (write_batch_group.size() == 1) {
updates = write_batch_group[0];
@ -3371,6 +3414,7 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
tmp_batch_.Clear();
}
mutex_.Lock();
// internal stats
default_cf_internal_stats_->AddDBStats(
InternalStats::BYTES_WRITTEN, batch_size);
@ -3385,13 +3429,17 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
if (status.ok()) {
versions_->SetLastSequence(last_sequence);
}
} else {
// Operation failed. Make sure sure mutex is held for cleanup code below.
mutex_.Lock();
}
}
if (db_options_.paranoid_checks && !status.ok() &&
!status.IsTimedOut() && bg_error_.ok()) {
if (db_options_.paranoid_checks && !status.ok() && !status.IsTimedOut() &&
!status.IsBusy() && bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
}
mutex_.AssertHeld();
write_thread_.ExitWriteThread(&w, last_writer, status);
if (context.schedule_bg_work_) {
@ -3503,7 +3551,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
}
if (s.ok()) {
new_mem = cfd->ConstructNewMemtable(mutable_cf_options);
SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
new_superversion = new SuperVersion();
}
}
@ -3647,6 +3696,18 @@ SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
return cfd->GetThreadLocalSuperVersion(&mutex_);
}
// REQUIRED: this function should only be called on the write thread or if the
// mutex is held.
SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
auto column_family_set = versions_->GetColumnFamilySet();
auto cfd = column_family_set->GetColumnFamily(column_family_id);
if (!cfd) {
return nullptr;
}
return GetAndRefSuperVersion(cfd);
}
void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
SuperVersion* sv) {
bool unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv);
@ -3665,6 +3726,30 @@ void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
}
}
// REQUIRED: this function should only be called on the write thread.
void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
SuperVersion* sv) {
auto column_family_set = versions_->GetColumnFamilySet();
auto cfd = column_family_set->GetColumnFamily(column_family_id);
// If SuperVersion is held, and we successfully fetched a cfd using
// GetAndRefSuperVersion(), it must still exist.
assert(cfd != nullptr);
ReturnAndCleanupSuperVersion(cfd, sv);
}
// REQUIRED: this function should only be called on the write thread or if the
// mutex is held.
ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
if (!cf_memtables->Seek(column_family_id)) {
return nullptr;
}
return cf_memtables->GetColumnFamilyHandle();
}
void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* range, int n, uint64_t* sizes) {
Version* v;
@ -4235,4 +4320,83 @@ void DumpRocksDBBuildVersion(Logger * log) {
#endif
}
#ifndef ROCKSDB_LITE
SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
bool include_history) {
// Find the earliest sequence number that we know we can rely on reading
// from the memtable without needing to check sst files.
SequenceNumber earliest_seq =
sv->imm->GetEarliestSequenceNumber(include_history);
if (earliest_seq == kMaxSequenceNumber) {
earliest_seq = sv->mem->GetEarliestSequenceNumber();
}
assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
return earliest_seq;
}
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE
Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv,
const Slice& key,
SequenceNumber* seq) {
Status s;
std::string value;
MergeContext merge_context;
SequenceNumber current_seq = versions_->LastSequence();
LookupKey lkey(key, current_seq);
*seq = kMaxSequenceNumber;
// Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, &value, &s, &merge_context, seq);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Unexpected status returned from MemTable::Get: %s\n",
s.ToString().c_str());
return s;
}
if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check immutable memtables
return Status::OK();
}
// Check if there is a record for this key in the immutable memtables
sv->imm->Get(lkey, &value, &s, &merge_context, seq);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Unexpected status returned from MemTableList::Get: %s\n",
s.ToString().c_str());
return s;
}
if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check memtable history
return Status::OK();
}
// Check if there is a record for this key in the immutable memtables
sv->imm->GetFromHistory(lkey, &value, &s, &merge_context, seq);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Unexpected status returned from MemTableList::GetFromHistory: %s\n",
s.ToString().c_str());
return s;
}
return Status::OK();
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -53,6 +53,7 @@ class VersionEdit;
class VersionSet;
class CompactionFilterV2;
class Arena;
class WriteCallback;
struct JobContext;
class DBImpl : public DB {
@ -76,6 +77,7 @@ class DBImpl : public DB {
using DB::Write;
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;
using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
@ -191,6 +193,34 @@ class DBImpl : public DB {
Status PromoteL0(ColumnFamilyHandle* column_family, int target_level);
// Similar to Write() but will call the callback once on the single write
// thread to determine whether it is safe to perform the write.
virtual Status WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback);
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into the current
// memtables. It can then be assumed that any write with a larger(or equal)
// sequence number will be present in this memtable or a later memtable.
//
// If the earliest sequence number could not be determined,
// kMaxSequenceNumber will be returned.
//
// If include_history=true, will also search Memtables in MemTableList
// History.
SequenceNumber GetEarliestMemTableSequenceNumber(SuperVersion* sv,
bool include_history);
// For a given key, check to see if there are any records for this key
// in the memtables, including memtable history.
// On success, *seq will contain the sequence number for the
// latest such change or kMaxSequenceNumber if no records were present.
// Returns OK on success, other status on error reading memtables.
Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key,
SequenceNumber* seq);
#endif // ROCKSDB_LITE
// checks if all live files exist on file system and that their file sizes
@ -279,6 +309,32 @@ class DBImpl : public DB {
void CancelAllBackgroundWork(bool wait);
// Find Super version and reference it. Based on options, it might return
// the thread local cached one.
// Call ReturnAndCleanupSuperVersion() when it is no longer needed.
SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);
// Similar to the previous function but looks up based on a column family id.
// nullptr will be returned if this column family no longer exists.
// REQUIRED: this function should only be called on the write thread or if the
// mutex is held.
SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id);
// Un-reference the super version and return it to thread local cache if
// needed. If it is the last reference of the super version. Clean it up
// after un-referencing it.
void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, SuperVersion* sv);
// Similar to the previous function but looks up based on a column family id.
// nullptr will be returned if this column family no longer exists.
// REQUIRED: this function should only be called on the write thread.
void ReturnAndCleanupSuperVersion(uint32_t colun_family_id, SuperVersion* sv);
// REQUIRED: this function should only be called on the write thread or if the
// mutex is held. Return value only valid until next call to this function or
// mutex is released.
ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);
protected:
Env* const env_;
const std::string dbname_;
@ -301,6 +357,9 @@ class DBImpl : public DB {
void EraseThreadStatusDbInfo() const;
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback);
private:
friend class DB;
friend class InternalStats;
@ -309,6 +368,9 @@ class DBImpl : public DB {
#endif
friend struct SuperVersion;
friend class CompactedDBImpl;
#ifndef NDEBUG
friend class XFTransactionWriteHandler;
#endif
struct CompactionState;
struct WriteContext;
@ -660,16 +722,6 @@ class DBImpl : public DB {
const MutableCFOptions& mutable_cf_options,
bool dont_schedule_bg_work = false);
// Find Super version and reference it. Based on options, it might return
// the thread local cached one.
inline SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);
// Un-reference the super version and return it to thread local cache if
// needed. If it is the last reference of the super version. Clean it up
// after un-referencing it.
inline void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
SuperVersion* sv);
#ifndef ROCKSDB_LITE
using DB::GetPropertiesOfAllTables;
virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,

@ -39,6 +39,7 @@
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/convenience.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "table/block_based_table_factory.h"
#include "table/mock_table.h"
#include "table/plain_table_factory.h"

@ -21,6 +21,14 @@ uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
return (seq << 8) | t;
}
void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) {
*seq = packed >> 8;
*t = static_cast<ValueType>(packed & 0xff);
assert(*seq <= kMaxSequenceNumber);
assert(*t <= kValueTypeForSeek);
}
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));

@ -71,8 +71,13 @@ inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
return key.user_key.size() + 8;
}
// Pack a sequence number and a ValueType into a uint64_t
extern uint64_t PackSequenceAndType(uint64_t seq, ValueType t);
// Given the result of PackSequenceAndType, store the sequence number in *seq
// and the ValueType in *t.
extern void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t);
// Append the serialization of "key" to *result.
extern void AppendInternalKey(std::string* result,
const ParsedInternalKey& key);

@ -97,7 +97,8 @@ TEST_F(FlushJobTest, Empty) {
TEST_F(FlushJobTest, NonEmpty) {
JobContext job_context(0);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions());
auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
kMaxSequenceNumber);
new_mem->Ref();
std::map<std::string, std::string> inserted_keys;
for (int i = 1; i < 10000; ++i) {

@ -54,7 +54,7 @@ MemTableOptions::MemTableOptions(
MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBuffer* write_buffer)
WriteBuffer* write_buffer, SequenceNumber earliest_seq)
: comparator_(cmp),
moptions_(ioptions, mutable_cf_options),
refs_(0),
@ -70,6 +70,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
flush_completed_(false),
file_number_(0),
first_seqno_(0),
earliest_seqno_(earliest_seq),
mem_next_logfile_number_(0),
locks_(moptions_.inplace_update_support
? moptions_.inplace_update_num_locks
@ -309,7 +310,8 @@ void MemTable::Add(SequenceNumber s, ValueType type,
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
uint64_t packed = PackSequenceAndType(s, type);
EncodeFixed64(p, packed);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
@ -329,6 +331,11 @@ void MemTable::Add(SequenceNumber s, ValueType type,
assert(first_seqno_ == 0 || s > first_seqno_);
if (first_seqno_ == 0) {
first_seqno_ = s;
if (earliest_seqno_ == kMaxSequenceNumber) {
earliest_seqno_ = first_seqno_;
}
assert(first_seqno_ >= earliest_seqno_);
}
should_flush_ = ShouldFlushNow();
@ -343,6 +350,7 @@ struct Saver {
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
bool* merge_in_progress;
std::string* value;
SequenceNumber seq;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
@ -376,7 +384,10 @@ static bool SaveValue(void* arg, const char* entry) {
Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
ValueType type;
UnPackSequenceAndType(tag, &s->seq, &type);
switch (type) {
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
@ -461,7 +472,7 @@ static bool SaveValue(void* arg, const char* entry) {
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context) {
MergeContext* merge_context, SequenceNumber* seq) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
@ -476,6 +487,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
if (prefix_bloom_ &&
!prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
// iter is null if prefix bloom says the key does not exist
*seq = kMaxSequenceNumber;
} else {
Saver saver;
saver.status = s;
@ -483,7 +495,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.value = value;
saver.status = s;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.merge_operator = moptions_.merge_operator;
@ -492,6 +504,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.statistics = moptions_.statistics;
saver.env_ = env_;
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
}
// No change to value, since we have not yet found a Put/Delete
@ -529,7 +543,10 @@ void MemTable::Update(SequenceNumber seq,
Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
ValueType type;
SequenceNumber unused;
UnPackSequenceAndType(tag, &unused, &type);
switch (type) {
case kTypeValue: {
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
@ -587,7 +604,10 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
ValueType type;
uint64_t unused;
UnPackSequenceAndType(tag, &unused, &type);
switch (type) {
case kTypeValue: {
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
@ -657,7 +677,10 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
}
const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
if (static_cast<ValueType>(tag & 0xff) != kTypeMerge) {
ValueType type;
uint64_t unused;
UnPackSequenceAndType(tag, &unused, &type);
if (type != kTypeMerge) {
break;
}

@ -80,10 +80,17 @@ class MemTable {
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
//
// earliest_seq should be the current SequenceNumber in the db such that any
// key inserted into this memtable will have an equal or larger seq number.
// (When a db is first created, the earliest sequence number will be 0).
// If the earliest sequence number is not known, kMaxSequenceNumber may be
// used, but this may prevent some transactions from succeeding until the
// first key is inserted into the memtable.
explicit MemTable(const InternalKeyComparator& comparator,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBuffer* write_buffer);
WriteBuffer* write_buffer, SequenceNumber earliest_seq);
// Do not delete this MemTable unless Unref() indicates it not in use.
~MemTable();
@ -153,8 +160,19 @@ class MemTable {
// prepend the current merge operand to *operands.
// store MergeInProgress in s, and return false.
// Else, return false.
// If any operation was found, its most recent sequence number
// will be stored in *seq on success (regardless of whether true/false is
// returned). Otherwise, *seq will be set to kMaxSequenceNumber.
// On success, *s may be set to OK, NotFound, or MergeInProgress. Any other
// status returned indicates a corruption or other unexpected error.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq);
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context);
MergeContext* merge_context) {
SequenceNumber seq;
return Get(key, value, s, merge_context, &seq);
}
// Attempts to update the new_value inplace, else does normal Add
// Pseudocode
@ -215,6 +233,15 @@ class MemTable {
// operations on the same MemTable (unless this Memtable is immutable).
SequenceNumber GetFirstSequenceNumber() { return first_seqno_; }
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into this
// memtable. It can then be assumed that any write with a larger(or equal)
// sequence number will be present in this memtable or a later memtable.
//
// If the earliest sequence number could not be determined,
// kMaxSequenceNumber will be returned.
SequenceNumber GetEarliestSequenceNumber() { return earliest_seqno_; }
// Returns the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
@ -288,6 +315,10 @@ class MemTable {
// The sequence number of the kv that was inserted first
SequenceNumber first_seqno_;
// The db sequence number at the time of creation or kMaxSequenceNumber
// if not set.
SequenceNumber earliest_seqno_;
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;

@ -87,20 +87,38 @@ int MemTableList::NumFlushed() const {
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context) {
for (auto& memtable : memlist_) {
if (memtable->Get(key, value, s, merge_context)) {
return true;
}
}
return false;
Status* s, MergeContext* merge_context,
SequenceNumber* seq) {
return GetFromList(&memlist_, key, value, s, merge_context, seq);
}
bool MemTableListVersion::GetFromHistory(const LookupKey& key,
std::string* value, Status* s,
MergeContext* merge_context) {
for (auto& memtable : memlist_history_) {
if (memtable->Get(key, value, s, merge_context)) {
MergeContext* merge_context,
SequenceNumber* seq) {
return GetFromList(&memlist_history_, key, value, s, merge_context, seq);
}
bool MemTableListVersion::GetFromList(std::list<MemTable*>* list,
const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context,
SequenceNumber* seq) {
*seq = kMaxSequenceNumber;
for (auto& memtable : *list) {
SequenceNumber current_seq = kMaxSequenceNumber;
bool done = memtable->Get(key, value, s, merge_context, &current_seq);
if (*seq == kMaxSequenceNumber) {
// Store the most recent sequence number of any operation on this key.
// Since we only care about the most recent change, we only need to
// return the first operation found when searching memtables in
// reverse-chronological order.
*seq = current_seq;
}
if (done) {
assert(*seq != kMaxSequenceNumber);
return true;
}
}
@ -139,6 +157,17 @@ uint64_t MemTableListVersion::GetTotalNumDeletes() const {
return total_num;
}
SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
bool include_history) const {
if (include_history && !memlist_history_.empty()) {
return memlist_history_.back()->GetEarliestSequenceNumber();
} else if (!memlist_.empty()) {
return memlist_.back()->GetEarliestSequenceNumber();
} else {
return kMaxSequenceNumber;
}
}
// caller is responsible for referencing m
void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable

@ -21,6 +21,7 @@
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "util/autovector.h"
#include "util/instrumented_mutex.h"
#include "util/log_buffer.h"
@ -48,15 +49,30 @@ class MemTableListVersion {
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
//
// If any operation was found for this key, its most recent sequence number
// will be stored in *seq on success (regardless of whether true/false is
// returned). Otherwise, *seq will be set to kMaxSequenceNumber.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context);
MergeContext* merge_context, SequenceNumber* seq);
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context) {
SequenceNumber seq;
return Get(key, value, s, merge_context, &seq);
}
// Similar to Get(), but searches the Memtable history of memtables that
// have already been flushed. Should only be used from in-memory only
// queries (such as Transaction validation) as the history may contain
// writes that are also present in the SST files.
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context);
MergeContext* merge_context, SequenceNumber* seq);
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context) {
SequenceNumber seq;
return GetFromHistory(key, value, s, merge_context, &seq);
}
void AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list, Arena* arena);
@ -68,6 +84,12 @@ class MemTableListVersion {
uint64_t GetTotalNumDeletes() const;
// Returns the value of MemTable::GetEarliestSequenceNumber() on the most
// recent MemTable in this list or kMaxSequenceNumber if the list is empty.
// If include_history=true, will also search Memtables in MemTableList
// History.
SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const;
private:
// REQUIRE: m is an immutable memtable
void Add(MemTable* m, autovector<MemTable*>* to_delete);
@ -76,6 +98,10 @@ class MemTableListVersion {
void TrimHistory(autovector<MemTable*>* to_delete);
bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
std::string* value, Status* s, MergeContext* merge_context,
SequenceNumber* seq);
friend class MemTableList;
// Immutable MemTables that have not yet been flushed.

@ -133,7 +133,8 @@ TEST_F(MemTableListTest, GetTest) {
WriteBuffer wb(options.db_write_buffer_size);
MemTable* mem =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb);
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb,
kMaxSequenceNumber);
mem->Ref();
// Write some keys to this memtable.
@ -169,7 +170,8 @@ TEST_F(MemTableListTest, GetTest) {
// Create another memtable and write some keys to it
WriteBuffer wb2(options.db_write_buffer_size);
MemTable* mem2 =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2);
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2,
kMaxSequenceNumber);
mem2->Ref();
mem2->Add(++seq, kTypeDeletion, "key1", "");
@ -233,7 +235,8 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
WriteBuffer wb(options.db_write_buffer_size);
MemTable* mem =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb);
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb,
kMaxSequenceNumber);
mem->Ref();
// Write some keys to this memtable.
@ -307,7 +310,8 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Create another memtable and write some keys to it
WriteBuffer wb2(options.db_write_buffer_size);
MemTable* mem2 =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2);
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2,
kMaxSequenceNumber);
mem2->Ref();
mem2->Add(++seq, kTypeDeletion, "key1", "");
@ -332,7 +336,8 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Add a third memtable to push the first memtable out of the history
WriteBuffer wb3(options.db_write_buffer_size);
MemTable* mem3 =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb3);
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb3,
kMaxSequenceNumber);
mem3->Ref();
list.Add(mem3, &to_delete);
ASSERT_EQ(1, list.NumNotFlushed());
@ -403,7 +408,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
std::vector<MemTable*> tables;
MutableCFOptions mutable_cf_options(options, ioptions);
for (int i = 0; i < num_tables; i++) {
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb);
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
kMaxSequenceNumber);
mem->Ref();
std::string value;
@ -581,6 +587,15 @@ TEST_F(MemTableListTest, FlushPendingTest) {
list.current()->Unref(&to_delete);
int to_delete_size = std::min(5, max_write_buffer_number_to_maintain);
ASSERT_EQ(to_delete_size, to_delete.size());
for (const auto& m : to_delete) {
// Refcount should be 0 after calling InstallMemtableFlushResults.
// Verify this, by Ref'ing then UnRef'ing:
m->Ref();
ASSERT_EQ(m, m->Unref());
delete m;
}
to_delete.clear();
}
} // namespace rocksdb

@ -254,8 +254,9 @@ class Repairer {
Slice record;
WriteBatch batch;
WriteBuffer wb(options_.db_write_buffer_size);
MemTable* mem = new MemTable(icmp_, ioptions_,
MutableCFOptions(options_, ioptions_), &wb);
MemTable* mem =
new MemTable(icmp_, ioptions_, MutableCFOptions(options_, ioptions_),
&wb, kMaxSequenceNumber);
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem);
mem->Ref();
int counter = 0;

@ -3115,7 +3115,8 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
AppendVersion(new_cfd, v);
// GetLatestMutableCFOptions() is safe here without mutex since the
// cfd is not available to client
new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions());
new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
LastSequence());
new_cfd->SetLogNumber(edit->log_number_);
return new_cfd;
}

@ -31,8 +31,9 @@ static std::string PrintContents(WriteBatch* b) {
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
WriteBuffer wb(options.db_write_buffer_size);
MemTable* mem = new MemTable(cmp, ioptions,
MutableCFOptions(options, ioptions), &wb);
MemTable* mem =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb,
kMaxSequenceNumber);
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem);

@ -0,0 +1,24 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include "rocksdb/status.h"
namespace rocksdb {
class DB;
class WriteCallback {
public:
virtual ~WriteCallback() {}
// Will be called while on the write thread before the write executes. If
// this function returns a non-OK status, the write will be aborted and this
// status will be returned to the caller of DB::Write().
virtual Status Callback(DB* db) = 0;
};
} // namespace rocksdb

@ -0,0 +1,120 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include <string>
#include "db/db_impl.h"
#include "db/write_callback.h"
#include "rocksdb/db.h"
#include "rocksdb/write_batch.h"
#include "util/logging.h"
#include "util/testharness.h"
using std::string;
namespace rocksdb {
class WriteCallbackTest : public testing::Test {
public:
string dbname;
WriteCallbackTest() {
dbname = test::TmpDir() + "/write_callback_testdb";
}
};
class WriteCallbackTestWriteCallback1 : public WriteCallback {
public:
bool was_called = false;
Status Callback(DB *db) override {
was_called = true;
// Make sure db is a DBImpl
DBImpl* db_impl = dynamic_cast<DBImpl*> (db);
if (db_impl == nullptr) {
return Status::InvalidArgument("");
}
return Status::OK();
}
};
class WriteCallbackTestWriteCallback2 : public WriteCallback {
public:
Status Callback(DB *db) override {
return Status::Busy();
}
};
TEST_F(WriteCallbackTest, WriteCallBackTest) {
Options options;
WriteOptions write_options;
ReadOptions read_options;
string value;
DB* db;
DBImpl* db_impl;
options.create_if_missing = true;
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
db_impl = dynamic_cast<DBImpl*> (db);
ASSERT_TRUE(db_impl);
WriteBatch wb;
wb.Put("a", "value.a");
wb.Delete("x");
// Test a simple Write
s = db->Write(write_options, &wb);
ASSERT_OK(s);
s = db->Get(read_options, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("value.a", value);
// Test WriteWithCallback
WriteCallbackTestWriteCallback1 callback1;
WriteBatch wb2;
wb2.Put("a", "value.a2");
s = db_impl->WriteWithCallback(write_options, &wb2, &callback1);
ASSERT_OK(s);
ASSERT_TRUE(callback1.was_called);
s = db->Get(read_options, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("value.a2", value);
// Test WriteWithCallback for a callback that fails
WriteCallbackTestWriteCallback2 callback2;
WriteBatch wb3;
wb3.Put("a", "value.a3");
s = db_impl->WriteWithCallback(write_options, &wb3, &callback2);
ASSERT_NOK(s);
s = db->Get(read_options, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("value.a2", value);
delete db;
DestroyDB(dbname, options);
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif // ROCKSDB_LITE

@ -105,6 +105,13 @@ void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer,
}
*last_writer = first;
if (first->has_callback) {
// TODO(agiardullo:) Batching not currently supported as this write may
// fail if the callback function decides to abort this write.
return;
}
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
@ -126,6 +133,12 @@ void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer,
break;
}
if (w->has_callback) {
// Do not include writes which may be aborted if the callback does not
// succeed.
break;
}
if (w->batch == nullptr) {
// Do not include those writes with nullptr batch. Those are not writes,
// those are something else. They want to be alone

@ -27,6 +27,7 @@ class WriteThread {
bool disableWAL;
bool in_batch_group;
bool done;
bool has_callback;
uint64_t timeout_hint_us;
InstrumentedCondVar cv;
@ -36,6 +37,7 @@ class WriteThread {
disableWAL(false),
in_batch_group(false),
done(false),
has_callback(false),
timeout_hint_us(kNoTimeOut),
cv(mu) {}
};

@ -2,3 +2,4 @@ column_families_example
simple_example
c_simple_example
compact_files_example
transaction_example

@ -2,7 +2,7 @@ include ../make_config.mk
.PHONY: clean
all: simple_example column_families_example compact_files_example c_simple_example
all: simple_example column_families_example compact_files_example c_simple_example transaction_example
simple_example: simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
@ -19,5 +19,8 @@ compact_files_example: compact_files_example.cc
c_simple_example: c_simple_example.o
$(CXX) $@.o -o$@ ../librocksdb.a $(PLATFORM_LDFLAGS) $(EXEC_LDFLAGS)
transaction_example: transaction_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
clean:
rm -rf ./simple_example ./column_families_example ./compact_files_example ./c_simple_example c_simple_example.o
rm -rf ./simple_example ./column_families_example ./compact_files_example ./c_simple_example c_simple_example.o ./transaction_example

@ -0,0 +1,142 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/optimistic_transaction.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
using namespace rocksdb;
std::string kDBPath = "/tmp/rocksdb_transaction_example";
int main() {
// open DB
Options options;
options.create_if_missing = true;
DB* db;
OptimisticTransactionDB* txn_db;
Status s = OptimisticTransactionDB::Open(options, kDBPath, &txn_db);
assert(s.ok());
db = txn_db->GetBaseDB();
WriteOptions write_options;
ReadOptions read_options;
OptimisticTransactionOptions txn_options;
std::string value;
////////////////////////////////////////////////////////
//
// Simple OptimisticTransaction Example ("Read Committed")
//
////////////////////////////////////////////////////////
// Start a transaction
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
assert(txn);
// Read a key in this transaction
s = txn->Get(read_options, "abc", &value);
assert(s.IsNotFound());
// Write a key in this transaction
txn->Put("abc", "def");
// Read a key OUTSIDE this transaction. Does not affect txn.
s = db->Get(read_options, "abc", &value);
// Write a key OUTSIDE of this transaction.
// Does not affect txn since this is an unrelated key. If we wrote key 'abc'
// here, the transaction would fail to commit.
s = db->Put(write_options, "xyz", "zzz");
// Commit transaction
s = txn->Commit();
assert(s.ok());
delete txn;
////////////////////////////////////////////////////////
//
// "Repeatable Read" (Snapshot Isolation) Example
// -- Using a single Snapshot
//
////////////////////////////////////////////////////////
// Set a snapshot at start of transaction by setting set_snapshot=true
txn_options.set_snapshot = true;
txn = txn_db->BeginTransaction(write_options, txn_options);
const Snapshot* snapshot = txn->GetSnapshot();
// Write a key OUTSIDE of transaction
db->Put(write_options, "abc", "xyz");
// Read a key using the snapshot
read_options.snapshot = snapshot;
s = txn->GetForUpdate(read_options, "abc", &value);
assert(value == "def");
// Attempt to commit transaction
s = txn->Commit();
// Transaction could not commit since the write outside of the txn conflicted
// with the read!
assert(s.IsBusy());
delete txn;
// Clear snapshot from read options since it is no longer valid
read_options.snapshot = nullptr;
snapshot = nullptr;
////////////////////////////////////////////////////////
//
// "Read Committed" (Monotonic Atomic Views) Example
// --Using multiple Snapshots
//
////////////////////////////////////////////////////////
// In this example, we set the snapshot multiple times. This is probably
// only necessary if you have very strict isolation requirements to
// implement.
// Set a snapshot at start of transaction
txn_options.set_snapshot = true;
txn = txn_db->BeginTransaction(write_options, txn_options);
// Do some reads and writes to key "x"
read_options.snapshot = db->GetSnapshot();
s = txn->Get(read_options, "x", &value);
txn->Put("x", "x");
// Do a write outside of the transaction to key "y"
s = db->Put(write_options, "y", "y");
// Set a new snapshot in the transaction
txn->SetSnapshot();
read_options.snapshot = db->GetSnapshot();
// Do some reads and writes to key "y"
s = txn->GetForUpdate(read_options, "y", &value);
txn->Put("y", "y");
// Commit. Since the snapshot was advanced, the write done outside of the
// transaction does not prevent this transaction from Committing.
s = txn->Commit();
assert(s.ok());
delete txn;
// Clear snapshot from read options since it is no longer valid
read_options.snapshot = nullptr;
// Cleanup
delete txn_db;
DestroyDB(kDBPath, options);
return 0;
}
#endif // ROCKSDB_LITE

@ -603,6 +603,9 @@ class DB {
}
#endif // ROCKSDB_LITE
// Needed for StackableDB
virtual DB* GetRootDB() { return this; }
private:
// No copying allowed
DB(const DB&);

@ -80,6 +80,10 @@ class Status {
static Status Aborted(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kAborted, msg, msg2);
}
static Status Busy() { return Status(kBusy); }
static Status Busy(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kBusy, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return code() == kOk; }
@ -112,6 +116,10 @@ class Status {
bool IsAborted() const { return code() == kAborted; }
// Returns true iff the status indicates that a resource is Busy and
// temporarily could not be acquired.
bool IsBusy() const { return code() == kBusy; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
@ -127,7 +135,8 @@ class Status {
kIncomplete = 7,
kShutdownInProgress = 8,
kTimedOut = 9,
kAborted = 10
kAborted = 10,
kBusy = 11,
};
Code code() const {

@ -0,0 +1,233 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
namespace rocksdb {
class OptimisticTransactionDB;
class WriteBatchWithIndex;
// Provides BEGIN/COMMIT/ROLLBACK transactions for batched writes.
//
// The current implementation provides optimistic concurrency control.
// Transactional reads/writes will not block other operations in the
// db. At commit time, the batch of writes will only be written if there have
// been no other writes to any keys read or written by this transaction.
// Otherwise, the commit will return an error.
//
// A new optimistic transaction is created by calling
// OptimisticTransactionDB::BeginTransaction().
// Only reads/writes done through this transaction object will be a part of the
// transaction. Any other reads/writes will not be tracked by this
// transaction.
//
// For example, reading data via OptimisticTransaction::GetForUpdate() will
// prevent the transaction from committing if this key is written to outside of
// this transaction. Any reads done via DB::Get() will not be checked for
// conflicts at commit time.
//
// It is up to the caller to synchronize access to this object.
//
// See examples/transaction_example.cc for some simple examples.
//
// TODO(agiardullo): Not yet implemented:
// -Transaction support for iterators
// -Ensuring memtable holds large enough history to check for conflicts
// -Support for using Transactions with DBWithTTL
// Options to use when starting an Optimistic Transaction
struct OptimisticTransactionOptions {
// Setting set_snapshot=true is the same as calling SetSnapshot().
bool set_snapshot = false;
// Should be set if the DB has a non-default comparator.
// See comment in WriteBatchWithIndex constructor.
const Comparator* cmp = BytewiseComparator();
};
class OptimisticTransaction {
public:
virtual ~OptimisticTransaction() {}
// If SetSnapshot() is not called, all keys read/written through this
// transaction will only be committed if there have been no writes to
// these keys outside of this transaction *since the time each key
// was first read/written* in this transaction.
//
// When SetSnapshot() is called, this transaction will create a Snapshot
// to use for conflict validation of all future operations in the transaction.
// All future keys read/written will only be committed if there have been
// no writes to these keys outside of this transaction *since SetSnapshot()
// was called.* Otherwise, Commit() will not succeed.
//
// It is not necessary to call SetSnapshot() if you only care about other
// writes happening on keys *after* they have first been read/written in this
// transaction. However, you should set a snapshot if you are concerned
// with any other writes happening since a particular time (such as
// the start of the transaction).
//
// SetSnapshot() may be called multiple times if you would like to change
// the snapshot used for different operations in this transaction.
//
// Calling SetSnapshot will not affect the version of Data returned by Get()
// methods. See OptimisticTransaction::Get() for more details.
//
// TODO(agiardullo): add better documentation here once memtable change are
// committed
virtual void SetSnapshot() = 0;
// Returns the Snapshot created by the last call to SetSnapshot().
//
// REQUIRED: The returned Snapshot is only valid up until the next time
// SetSnapshot() is called or the OptimisticTransaction is deleted.
virtual const Snapshot* GetSnapshot() const = 0;
// Write all batched keys to the db atomically if there have not been any
// other writes performed on the keys read/written by this transaction.
//
// Currently, Commit() only checks the memtables to verify that there are no
// other writes to these keys. If the memtable's history is not long
// enough to verify that there are no conflicts, Commit() will return
// a non-OK status.
//
// Returns OK on success, non-OK on failure.
virtual Status Commit() = 0;
// Discard all batched writes in this transaction.
virtual void Rollback() = 0;
// This function is similar to DB::Get() except it will also read pending
// changes in this transaction.
//
// If read_options.snapshot is not set, the current version of the key will
// be read. Calling SetSnapshot() does not affect the version of the data
// returned.
//
// Note that setting read_options.snapshot will affect what is read from the
// DB but will NOT change which keys are read from this transaction (the keys
// in this transaction do not yet belong to any snapshot and will be fetched
// regardless).
//
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) = 0;
virtual Status Get(const ReadOptions& options, const Slice& key,
std::string* value) = 0;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) = 0;
virtual std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) = 0;
// Read this key and ensure that this transaction will only
// be able to be committed if this key is not written outside this
// transaction after it has first been read (or after the snapshot if a
// snapshot is set in this transaction).
// This function is similar to OptimisticTransaction::Get() except it will
// affect whether this transaction will be able to be committed.
virtual Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) = 0;
virtual Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value) = 0;
virtual std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) = 0;
virtual std::vector<Status> MultiGetForUpdate(
const ReadOptions& options, const std::vector<Slice>& keys,
std::vector<std::string>* values) = 0;
// Put, Merge, and Delete behave similarly to their corresponding
// functions in WriteBatch. In addition, this transaction will only
// be able to be committed if these keys are not written outside of this
// transaction after they have been written by this transaction (or after the
// snapshot if a snapshot is set in this transaction).
virtual void Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) = 0;
virtual void Put(const SliceParts& key, const SliceParts& value) = 0;
virtual void Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
virtual void Merge(const Slice& key, const Slice& value) = 0;
virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0;
virtual void Delete(const Slice& key) = 0;
virtual void Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) = 0;
virtual void Delete(const SliceParts& key) = 0;
// PutUntracked() will write a Put to the batch of operations to be committed
// in this transaction. This write will only happen if this transaction
// gets committed successfully. But unlike OptimisticTransaction::Put(),
// no conflict checking will be done for this key. So any other writes to
// this key outside of this transaction will not prevent this transaction from
// committing.
virtual void PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
virtual void PutUntracked(const Slice& key, const Slice& value) = 0;
virtual void PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) = 0;
virtual void PutUntracked(const SliceParts& key, const SliceParts& value) = 0;
virtual void MergeUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) = 0;
virtual void MergeUntracked(const Slice& key, const Slice& value) = 0;
virtual void DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
virtual void DeleteUntracked(const Slice& key) = 0;
virtual void DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) = 0;
virtual void DeleteUntracked(const SliceParts& key) = 0;
// Similar to WriteBatch::PutLogData
virtual void PutLogData(const Slice& blob) = 0;
// Fetch the underlying write batch that contains all pending changes to be
// committed.
//
// Note: You should not write or delete anything from the batch directly and
// should only use the the functions in the OptimisticTransaction class to
// write to this transaction.
virtual WriteBatchWithIndex* GetWriteBatch() = 0;
protected:
// To begin a new transaction, see OptimisticTransactionDB::BeginTransaction()
explicit OptimisticTransaction(const OptimisticTransactionDB* db) {}
OptimisticTransaction() {}
private:
// No copying allowed
OptimisticTransaction(const OptimisticTransaction&);
void operator=(const OptimisticTransaction&);
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,64 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/optimistic_transaction.h"
namespace rocksdb {
class OptimisticTransaction;
// Database with Transaction support.
//
// See optimistic_transaction.h and examples/transaction_example.cc
class OptimisticTransactionDB {
public:
// Open an OptimisticTransactionDB similar to DB::Open().
static Status Open(const Options& options, const std::string& dbname,
OptimisticTransactionDB** dbptr);
static Status Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
OptimisticTransactionDB** dbptr);
virtual ~OptimisticTransactionDB() {}
// Starts a new OptimisticTransaction. Passing set_snapshot=true has the same
// effect
// as calling SetSnapshot().
//
// Caller should delete the returned transaction after calling
// Commit() or Rollback().
virtual OptimisticTransaction* BeginTransaction(
const WriteOptions& write_options,
const OptimisticTransactionOptions&
txn_options = OptimisticTransactionOptions()) = 0;
// Return the underlying Database that was opened
virtual DB* GetBaseDB() = 0;
protected:
// To Create an OptimisticTransactionDB, call Open()
explicit OptimisticTransactionDB(DB* db) {}
OptimisticTransactionDB() {}
private:
// No copying allowed
OptimisticTransactionDB(const OptimisticTransactionDB&);
void operator=(const OptimisticTransactionDB&);
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -22,6 +22,8 @@ class StackableDB : public DB {
return db_;
}
virtual DB* GetRootDB() override { return db_->GetRootDB(); }
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) override {

@ -47,7 +47,7 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(
rocksdb::MemTable* mem = new rocksdb::MemTable(
cmp, rocksdb::ImmutableCFOptions(options),
rocksdb::MutableCFOptions(options, rocksdb::ImmutableCFOptions(options)),
&wb);
&wb, rocksdb::kMaxSequenceNumber);
mem->Ref();
std::string state;
rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem);

@ -109,6 +109,8 @@ LIB_SOURCES = \
utilities/merge_operators/uint64add.cc \
utilities/redis/redis_lists.cc \
utilities/spatialdb/spatial_db.cc \
utilities/transactions/optimistic_transaction_impl.cc \
utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/ttl/db_ttl_impl.cc \
utilities/write_batch_with_index/write_batch_with_index.cc \
utilities/write_batch_with_index/write_batch_with_index_internal.cc \
@ -179,6 +181,7 @@ TEST_BENCH_SOURCES = \
db/wal_manager_test.cc \
db/write_batch_test.cc \
db/write_controller_test.cc \
db/write_callback_test.cc \
table/block_based_filter_block_test.cc \
table/block_hash_index_test.cc \
table/block_test.cc \
@ -211,6 +214,7 @@ TEST_BENCH_SOURCES = \
utilities/merge_operators/string_append/stringappend_test.cc \
utilities/redis/redis_lists_test.cc \
utilities/spatialdb/spatial_db_test.cc \
utilities/transactions/optimistic_transaction_test.cc \
utilities/ttl/ttl_test.cc \
utilities/write_batch_with_index/write_batch_with_index_test.cc \
util/log_write_bench.cc \
@ -266,4 +270,5 @@ XFUNC_TESTS = \
"managed_new" \
"managed_xftest_dropold" \
"managed_xftest_release" \
"inplace_lock_test"
"inplace_lock_test" \
"transaction"

@ -444,7 +444,8 @@ class MemTableConstructor: public Constructor {
options_.memtable_factory = table_factory_;
ImmutableCFOptions ioptions(options_);
memtable_ = new MemTable(internal_comparator_, ioptions,
MutableCFOptions(options_, ioptions), wb);
MutableCFOptions(options_, ioptions), wb,
kMaxSequenceNumber);
memtable_->Ref();
}
~MemTableConstructor() {
@ -458,7 +459,7 @@ class MemTableConstructor: public Constructor {
ImmutableCFOptions mem_ioptions(ioptions);
memtable_ = new MemTable(internal_comparator_, mem_ioptions,
MutableCFOptions(options_, mem_ioptions),
write_buffer_);
write_buffer_, kMaxSequenceNumber);
memtable_->Ref();
int seq = 1;
for (const auto kv : kv_map) {
@ -1949,7 +1950,8 @@ TEST_F(MemTableTest, Simple) {
ImmutableCFOptions ioptions(options);
WriteBuffer wb(options.db_write_buffer_size);
MemTable* memtable =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb);
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb,
kMaxSequenceNumber);
memtable->Ref();
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);

@ -3,7 +3,8 @@
if [ $# -ne 1 ]; then
echo -n "./benchmark.sh [bulkload/fillseq/overwrite/filluniquerandom/"
echo "readrandom/readwhilewriting/readwhilemerging/updaterandom/mergerandom]"
echo "readrandom/readwhilewriting/readwhilemerging/updaterandom/"
echo "mergerandom/randomtransaction]"
exit 0
fi
@ -278,6 +279,18 @@ function run_range {
summarize_result $output_dir/${out_name} ${full_name}.t${num_threads} seekrandom
}
function run_randomtransaction {
echo "..."
cmd="./db_bench $params_r --benchmarks=randomtransaction \
--num=$num_keys \
--transaction_db \
--threads=5 \
--transaction_sets=5 \
2>&1 | tee $output_dir/benchmark_randomtransaction.log"
echo $cmd | tee $output_dir/benchmark_rangescanwhilewriting.log
eval $cmd
}
function now() {
echo `date +"%s"`
}
@ -326,6 +339,8 @@ for job in ${jobs[@]}; do
run_rangewhile merging $job false
elif [ $job = revrangewhilemerging ]; then
run_rangewhile merging $job true
elif [ $job = randomtransaction ]; then
run_randomtransaction
elif [ $job = debug ]; then
num_keys=1000; # debug
echo "Setting num_keys to $num_keys"

@ -73,6 +73,9 @@ std::string Status::ToString() const {
case kAborted:
type = "Operation aborted: ";
break;
case kBusy:
type = "Resource busy: ";
break;
default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(code()));

@ -7,7 +7,12 @@
#include <string>
#include "db/db_impl.h"
#include "db/managed_iterator.h"
#include "db/write_callback.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/optimistic_transaction.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/write_batch.h"
#include "util/xfunc.h"
@ -74,6 +79,106 @@ void xf_transaction_clear_memtable_history(
*max_write_buffer_number_to_maintain = 0;
}
class XFTransactionWriteHandler : public WriteBatch::Handler {
public:
OptimisticTransaction* txn_;
DBImpl* db_impl_;
XFTransactionWriteHandler(OptimisticTransaction* txn, DBImpl* db_impl)
: txn_(txn), db_impl_(db_impl) {}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
InstrumentedMutexLock l(&db_impl_->mutex_);
ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id);
if (cfh == nullptr) {
return Status::InvalidArgument(
"XFUNC test could not find column family "
"handle for id ",
ToString(column_family_id));
}
txn_->Put(cfh, key, value);
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
InstrumentedMutexLock l(&db_impl_->mutex_);
ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id);
if (cfh == nullptr) {
return Status::InvalidArgument(
"XFUNC test could not find column family "
"handle for id ",
ToString(column_family_id));
}
txn_->Merge(cfh, key, value);
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
InstrumentedMutexLock l(&db_impl_->mutex_);
ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id);
if (cfh == nullptr) {
return Status::InvalidArgument(
"XFUNC test could not find column family "
"handle for id ",
ToString(column_family_id));
}
txn_->Delete(cfh, key);
return Status::OK();
}
virtual void LogData(const Slice& blob) override { txn_->PutLogData(blob); }
};
// Whenever DBImpl::Write is called, create a transaction and do the write via
// the transaction.
void xf_transaction_write(const WriteOptions& write_options,
const DBOptions& db_options, WriteBatch* my_batch,
WriteCallback* callback, DBImpl* db_impl, Status* s,
bool* write_attempted) {
if (callback != nullptr) {
// We may already be in a transaction, don't force a transaction
*write_attempted = false;
return;
}
OptimisticTransactionDB* txn_db = new OptimisticTransactionDB(db_impl);
OptimisticTransaction* txn =
OptimisticTransaction::BeginTransaction(txn_db, write_options);
XFTransactionWriteHandler handler(txn, db_impl);
*s = my_batch->Iterate(&handler);
if (!s->ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options.info_log,
"XFUNC test could not iterate batch. status: $s\n",
s->ToString().c_str());
}
*s = txn->Commit();
if (!s->ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options.info_log,
"XFUNC test could not commit transaction. status: $s\n",
s->ToString().c_str());
}
*write_attempted = true;
delete txn;
delete txn_db;
}
} // namespace rocksdb
#endif // XFUNC

@ -32,6 +32,7 @@ namespace rocksdb {
#else
struct Options;
struct WriteOptions;
class ManagedIterator;
class DBImpl;
void GetXFTestOptions(Options* options, int skip_policy);
@ -44,6 +45,11 @@ void xf_transaction_set_memtable_history(
int32_t* max_write_buffer_number_to_maintain);
void xf_transaction_clear_memtable_history(
int32_t* max_write_buffer_number_to_maintain);
void xf_transaction_write(const WriteOptions& write_options,
const DBOptions& db_options,
class WriteBatch* my_batch,
class WriteCallback* callback, DBImpl* db_impl,
Status* success, bool* write_attempted);
// This class provides the facility to run custom code to test a specific
// feature typically with all existing unit tests.

@ -0,0 +1,80 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include "utilities/transactions/optimistic_transaction_db_impl.h"
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "utilities/transactions/optimistic_transaction_impl.h"
namespace rocksdb {
OptimisticTransaction* OptimisticTransactionDBImpl::BeginTransaction(
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) {
OptimisticTransaction* txn =
new OptimisticTransactionImpl(this, write_options, txn_options);
return txn;
}
Status OptimisticTransactionDB::Open(const Options& options,
const std::string& dbname,
OptimisticTransactionDB** dbptr) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = Open(db_options, dbname, column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
// default column family
delete handles[0];
}
return s;
}
Status OptimisticTransactionDB::Open(
const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
OptimisticTransactionDB** dbptr) {
Status s;
DB* db;
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
// Enable MemTable History if not already enabled
for (auto& column_family : column_families_copy) {
ColumnFamilyOptions* options = &column_family.options;
if (options->max_write_buffer_number_to_maintain == 0) {
// Setting to -1 will set the History size to max_write_buffer_number.
options->max_write_buffer_number_to_maintain = -1;
}
}
s = DB::Open(db_options, dbname, column_families_copy, handles, &db);
if (s.ok()) {
*dbptr = new OptimisticTransactionDBImpl(db);
}
return s;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,33 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
namespace rocksdb {
class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
public:
explicit OptimisticTransactionDBImpl(DB* db)
: OptimisticTransactionDB(db), db_(db) {}
~OptimisticTransactionDBImpl() {}
OptimisticTransaction* BeginTransaction(
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options) override;
DB* GetBaseDB() override { return db_.get(); }
private:
std::unique_ptr<DB> db_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,339 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "utilities/transactions/optimistic_transaction_impl.h"
#include <string>
#include <vector>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "util/string_util.h"
namespace rocksdb {
struct WriteOptions;
OptimisticTransactionImpl::OptimisticTransactionImpl(
OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options)
: txn_db_(txn_db),
db_(txn_db->GetBaseDB()),
write_options_(write_options),
snapshot_(nullptr),
write_batch_(txn_options.cmp, 0, true) {
if (txn_options.set_snapshot) {
SetSnapshot();
} else {
start_sequence_number_ = db_->GetLatestSequenceNumber();
}
}
OptimisticTransactionImpl::~OptimisticTransactionImpl() {
tracked_keys_.clear();
if (snapshot_ != nullptr) {
db_->ReleaseSnapshot(snapshot_);
}
}
void OptimisticTransactionImpl::SetSnapshot() {
if (snapshot_ != nullptr) {
db_->ReleaseSnapshot(snapshot_);
}
snapshot_ = db_->GetSnapshot();
start_sequence_number_ = snapshot_->GetSequenceNumber();
}
Status OptimisticTransactionImpl::Commit() {
// Set up callback which will call CheckTransactionForConflicts() to
// check whether this transaction is safe to be committed.
OptimisticTransactionCallback callback(this);
DBImpl* db_impl = dynamic_cast<DBImpl*>(db_->GetRootDB());
if (db_impl == nullptr) {
// This should only happen if we support creating transactions from
// a StackableDB and someone overrides GetRootDB().
return Status::InvalidArgument(
"DB::GetRootDB() returned an unexpected DB class");
}
Status s = db_impl->WriteWithCallback(
write_options_, write_batch_.GetWriteBatch(), &callback);
if (s.ok()) {
tracked_keys_.clear();
write_batch_.Clear();
}
return s;
}
void OptimisticTransactionImpl::Rollback() {
tracked_keys_.clear();
write_batch_.Clear();
}
// Record this key so that we can check it for conflicts at commit time.
void OptimisticTransactionImpl::RecordOperation(
ColumnFamilyHandle* column_family, const Slice& key) {
uint32_t cfh_id = GetColumnFamilyID(column_family);
SequenceNumber seq;
if (snapshot_) {
seq = start_sequence_number_;
} else {
seq = db_->GetLatestSequenceNumber();
}
std::string key_str = key.ToString();
auto iter = tracked_keys_[cfh_id].find(key_str);
if (iter == tracked_keys_[cfh_id].end()) {
// key not yet seen, store it.
tracked_keys_[cfh_id].insert({std::move(key_str), seq});
} else {
SequenceNumber old_seq = iter->second;
if (seq < old_seq) {
// Snapshot has changed since we last saw this key, need to
// store the earliest seen sequence number.
tracked_keys_[cfh_id][key_str] = seq;
}
}
}
void OptimisticTransactionImpl::RecordOperation(
ColumnFamilyHandle* column_family, const SliceParts& key) {
size_t key_size = 0;
for (int i = 0; i < key.num_parts; ++i) {
key_size += key.parts[i].size();
}
std::string str;
str.reserve(key_size);
for (int i = 0; i < key.num_parts; ++i) {
str.append(key.parts[i].data(), key.parts[i].size());
}
RecordOperation(column_family, str);
}
Status OptimisticTransactionImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
value);
}
Status OptimisticTransactionImpl::GetForUpdate(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
// Regardless of whether the Get succeeded, track this key.
RecordOperation(column_family, key);
return Get(read_options, column_family, key, value);
}
std::vector<Status> OptimisticTransactionImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Regardless of whether the MultiGet succeeded, track these keys.
size_t num_keys = keys.size();
values->resize(num_keys);
// TODO(agiardullo): optimize multiget?
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
std::string* value = &(*values)[i];
stat_list[i] = Get(read_options, column_family[i], keys[i], value);
}
return stat_list;
}
std::vector<Status> OptimisticTransactionImpl::MultiGetForUpdate(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Regardless of whether the MultiGet succeeded, track these keys.
size_t num_keys = keys.size();
values->resize(num_keys);
// TODO(agiardullo): optimize multiget?
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
// Regardless of whether the Get succeeded, track this key.
RecordOperation(column_family[i], keys[i]);
std::string* value = &(*values)[i];
stat_list[i] = Get(read_options, column_family[i], keys[i], value);
}
return stat_list;
}
void OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
RecordOperation(column_family, key);
write_batch_.Put(column_family, key, value);
}
void OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
RecordOperation(column_family, key);
write_batch_.Put(column_family, key, value);
}
void OptimisticTransactionImpl::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
RecordOperation(column_family, key);
write_batch_.Merge(column_family, key, value);
}
void OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
RecordOperation(column_family, key);
write_batch_.Delete(column_family, key);
}
void OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
RecordOperation(column_family, key);
write_batch_.Delete(column_family, key);
}
void OptimisticTransactionImpl::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key,
const Slice& value) {
write_batch_.Put(column_family, key, value);
}
void OptimisticTransactionImpl::PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
write_batch_.Put(column_family, key, value);
}
void OptimisticTransactionImpl::MergeUntracked(
ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
write_batch_.Merge(column_family, key, value);
}
void OptimisticTransactionImpl::DeleteUntracked(
ColumnFamilyHandle* column_family, const Slice& key) {
write_batch_.Delete(column_family, key);
}
void OptimisticTransactionImpl::DeleteUntracked(
ColumnFamilyHandle* column_family, const SliceParts& key) {
write_batch_.Delete(column_family, key);
}
void OptimisticTransactionImpl::PutLogData(const Slice& blob) {
write_batch_.PutLogData(blob);
}
WriteBatchWithIndex* OptimisticTransactionImpl::GetWriteBatch() {
return &write_batch_;
}
// Returns OK if it is safe to commit this transaction. Returns Status::Busy
// if there are read or write conflicts that would prevent us from committing OR
// if we can not determine whether there would be any such conflicts.
//
// Should only be called on writer thread.
Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) {
Status result;
assert(dynamic_cast<DBImpl*>(db) != nullptr);
auto db_impl = reinterpret_cast<DBImpl*>(db);
for (auto& tracked_keys_iter : tracked_keys_) {
uint32_t cf_id = tracked_keys_iter.first;
const auto& keys = tracked_keys_iter.second;
SuperVersion* sv = db_impl->GetAndRefSuperVersion(cf_id);
if (sv == nullptr) {
result =
Status::Busy("Could not access column family " + ToString(cf_id));
break;
}
SequenceNumber earliest_seq =
db_impl->GetEarliestMemTableSequenceNumber(sv, true);
// For each of the keys in this transaction, check to see if someone has
// written to this key since the start of the transaction.
for (const auto& key_iter : keys) {
const auto& key = key_iter.first;
const SequenceNumber key_seq = key_iter.second;
// Since it would be too slow to check the SST files, we will only use
// the memtables to check whether there have been any recent writes
// to this key after it was accessed in this transaction. But if the
// memtables have been flushed recently, we cannot rely on them to tell
// whether there have been any recent writes and must fail this
// transaction.
if (earliest_seq == kMaxSequenceNumber) {
// The age of this memtable is unknown. Cannot rely on it to check
// for recent writes.
result = Status::Busy(
"Could not commit transaction with as the MemTable does not "
"countain a long enough history to check write at SequenceNumber: ",
ToString(key_seq));
} else if (key_seq < earliest_seq) {
// The age of this memtable is too new to use to check for recent
// writes.
char msg[255];
snprintf(
msg, sizeof(msg),
"Could not commit transaction with write at SequenceNumber %lu "
"as the MemTable only contains changes newer than SequenceNumber "
"%lu.",
key_seq, earliest_seq);
result = Status::Busy(msg);
} else {
SequenceNumber seq = kMaxSequenceNumber;
Status s = db_impl->GetLatestSequenceForKeyFromMemtable(sv, key, &seq);
if (!s.ok()) {
result = s;
} else if (seq != kMaxSequenceNumber && seq > key_seq) {
result = Status::Busy();
}
}
if (!result.ok()) {
break;
}
}
db_impl->ReturnAndCleanupSuperVersion(cf_id, sv);
if (!result.ok()) {
break;
}
}
return result;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,196 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <unordered_map>
#include <vector>
#include "db/write_callback.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/optimistic_transaction.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
namespace rocksdb {
using TransactionKeyMap =
std::unordered_map<uint32_t,
std::unordered_map<std::string, SequenceNumber>>;
class OptimisticTransactionImpl : public OptimisticTransaction {
public:
OptimisticTransactionImpl(OptimisticTransactionDB* db,
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options);
virtual ~OptimisticTransactionImpl();
Status Commit() override;
void Rollback() override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override {
return Get(options, db_->DefaultColumnFamily(), key, value);
}
Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override;
Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value) override {
return GetForUpdate(options, db_->DefaultColumnFamily(), key, value);
}
std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return MultiGet(options, std::vector<ColumnFamilyHandle*>(
keys.size(), db_->DefaultColumnFamily()),
keys, values);
}
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options, const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return MultiGetForUpdate(options,
std::vector<ColumnFamilyHandle*>(
keys.size(), db_->DefaultColumnFamily()),
keys, values);
}
void Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
void Put(const Slice& key, const Slice& value) override {
Put(nullptr, key, value);
}
void Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
void Put(const SliceParts& key, const SliceParts& value) override {
Put(nullptr, key, value);
}
void Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
void Merge(const Slice& key, const Slice& value) override {
Merge(nullptr, key, value);
}
void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
void Delete(const Slice& key) override { Delete(nullptr, key); }
void Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
void Delete(const SliceParts& key) override { Delete(nullptr, key); }
void PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
void PutUntracked(const Slice& key, const Slice& value) override {
PutUntracked(nullptr, key, value);
}
void PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
void PutUntracked(const SliceParts& key, const SliceParts& value) override {
PutUntracked(nullptr, key, value);
}
void MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
void MergeUntracked(const Slice& key, const Slice& value) override {
MergeUntracked(nullptr, key, value);
}
void DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) override;
void DeleteUntracked(const Slice& key) override {
DeleteUntracked(nullptr, key);
}
void DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
void DeleteUntracked(const SliceParts& key) override {
DeleteUntracked(nullptr, key);
}
void PutLogData(const Slice& blob) override;
const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; }
const Snapshot* GetSnapshot() const override { return snapshot_; }
void SetSnapshot() override;
WriteBatchWithIndex* GetWriteBatch() override;
protected:
OptimisticTransactionDB* const txn_db_;
DB* db_;
const WriteOptions write_options_;
const Snapshot* snapshot_;
SequenceNumber start_sequence_number_;
WriteBatchWithIndex write_batch_;
private:
// Map of Column Family IDs to keys and their sequence numbers
TransactionKeyMap tracked_keys_;
friend class OptimisticTransactionCallback;
// Returns OK if it is safe to commit this transaction. Returns Status::Busy
// if there are read or write conflicts that would prevent us from committing
// OR if we can not determine whether there would be any such conflicts.
//
// Should only be called on writer thread.
Status CheckTransactionForConflicts(DB* db);
void RecordOperation(ColumnFamilyHandle* column_family, const Slice& key);
void RecordOperation(ColumnFamilyHandle* column_family,
const SliceParts& key);
// No copying allowed
OptimisticTransactionImpl(const OptimisticTransactionImpl&);
void operator=(const OptimisticTransactionImpl&);
};
// Used at commit time to trigger transaction validation
class OptimisticTransactionCallback : public WriteCallback {
public:
explicit OptimisticTransactionCallback(OptimisticTransactionImpl* txn)
: txn_(txn) {}
Status Callback(DB* db) override {
return txn_->CheckTransactionForConflicts(db);
}
private:
OptimisticTransactionImpl* txn_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,846 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/utilities/optimistic_transaction.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "util/logging.h"
#include "util/testharness.h"
using std::string;
namespace rocksdb {
class OptimisticTransactionTest : public testing::Test {
public:
OptimisticTransactionDB* txn_db;
DB* db;
string dbname;
Options options;
OptimisticTransactionTest() {
options.create_if_missing = true;
options.max_write_buffer_number = 2;
dbname = test::TmpDir() + "/optimistic_transaction_testdb";
DestroyDB(dbname, options);
Status s = OptimisticTransactionDB::Open(options, dbname, &txn_db);
assert(s.ok());
db = txn_db->GetBaseDB();
}
~OptimisticTransactionTest() {
delete txn_db;
DestroyDB(dbname, options);
}
};
TEST_F(OptimisticTransactionTest, SuccessTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar"));
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
txn->GetForUpdate(read_options, "foo", &value);
ASSERT_EQ(value, "bar");
txn->Put(Slice("foo"), Slice("bar2"));
txn->GetForUpdate(read_options, "foo", &value);
ASSERT_EQ(value, "bar2");
s = txn->Commit();
ASSERT_OK(s);
db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "bar2");
delete txn;
}
TEST_F(OptimisticTransactionTest, WriteConflictTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
db->Put(write_options, "foo", "bar");
db->Put(write_options, "foo2", "bar");
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
txn->Put("foo", "bar2");
// This Put outside of a transaction will conflict with the previous write
s = db->Put(write_options, "foo", "barz");
ASSERT_OK(s);
s = db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
s = txn->Commit();
ASSERT_NOK(s); // Txn should not commit
// Verify that transaction did not write anything
db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
db->Get(read_options, "foo2", &value);
ASSERT_EQ(value, "bar");
delete txn;
}
TEST_F(OptimisticTransactionTest, WriteConflictTest2) {
WriteOptions write_options;
ReadOptions read_options;
OptimisticTransactionOptions txn_options;
string value;
Status s;
db->Put(write_options, "foo", "bar");
db->Put(write_options, "foo2", "bar");
txn_options.set_snapshot = true;
OptimisticTransaction* txn =
txn_db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn);
// This Put outside of a transaction will conflict with a later write
s = db->Put(write_options, "foo", "barz");
ASSERT_OK(s);
txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken
s = db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
s = txn->Commit();
ASSERT_NOK(s); // Txn should not commit
// Verify that transaction did not write anything
db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
db->Get(read_options, "foo2", &value);
ASSERT_EQ(value, "bar");
delete txn;
}
TEST_F(OptimisticTransactionTest, ReadConflictTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
OptimisticTransactionOptions txn_options;
string value;
Status s;
db->Put(write_options, "foo", "bar");
db->Put(write_options, "foo2", "bar");
txn_options.set_snapshot = true;
OptimisticTransaction* txn =
txn_db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn);
txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_EQ(value, "bar");
// This Put outside of a transaction will conflict with the previous read
s = db->Put(write_options, "foo", "barz");
ASSERT_OK(s);
s = db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
s = txn->Commit();
ASSERT_NOK(s); // Txn should not commit
// Verify that transaction did not write anything
txn->GetForUpdate(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
txn->GetForUpdate(read_options, "foo2", &value);
ASSERT_EQ(value, "bar");
delete txn;
}
TEST_F(OptimisticTransactionTest, TxnOnlyTest) {
// Test to make sure transactions work when there are no other writes in an
// empty db.
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
txn->Put("x", "y");
s = txn->Commit();
ASSERT_OK(s);
delete txn;
}
TEST_F(OptimisticTransactionTest, FlushTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
Status s;
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar"));
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
snapshot_read_options.snapshot = txn->GetSnapshot();
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_EQ(value, "bar");
txn->Put(Slice("foo"), Slice("bar2"));
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_EQ(value, "bar2");
// Put a random key so we have a memtable to flush
s = db->Put(write_options, "dummy", "dummy");
ASSERT_OK(s);
// force a memtable flush
FlushOptions flush_ops;
db->Flush(flush_ops);
s = txn->Commit();
// txn should commit since the flushed table is still in MemtableList History
ASSERT_OK(s);
db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "bar2");
delete txn;
}
TEST_F(OptimisticTransactionTest, FlushTest2) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
Status s;
db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar"));
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
snapshot_read_options.snapshot = txn->GetSnapshot();
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_EQ(value, "bar");
txn->Put(Slice("foo"), Slice("bar2"));
txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_EQ(value, "bar2");
// Put a random key so we have a MemTable to flush
s = db->Put(write_options, "dummy", "dummy");
ASSERT_OK(s);
// force a memtable flush
FlushOptions flush_ops;
db->Flush(flush_ops);
// Put a random key so we have a MemTable to flush
s = db->Put(write_options, "dummy", "dummy2");
ASSERT_OK(s);
// force a memtable flush
db->Flush(flush_ops);
s = db->Put(write_options, "dummy", "dummy3");
ASSERT_OK(s);
// force a memtable flush
// Since our test db has max_write_buffer_number=2, this flush will cause
// the first memtable to get purged from the MemtableList history.
db->Flush(flush_ops);
s = txn->Commit();
// txn should not commit since MemTableList History is not large enough
ASSERT_NOK(s);
db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "bar");
delete txn;
}
TEST_F(OptimisticTransactionTest, NoSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
db->Put(write_options, "AAA", "bar");
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
// Modify key after transaction start
db->Put(write_options, "AAA", "bar1");
// Read and write without a snapshot
txn->GetForUpdate(read_options, "AAA", &value);
ASSERT_EQ(value, "bar1");
txn->Put("AAA", "bar2");
// Should commit since read/write was done after data changed
s = txn->Commit();
ASSERT_OK(s);
txn->GetForUpdate(read_options, "AAA", &value);
ASSERT_EQ(value, "bar2");
delete txn;
}
TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
Status s;
db->Put(write_options, "AAA", "bar");
db->Put(write_options, "BBB", "bar");
db->Put(write_options, "CCC", "bar");
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
db->Put(write_options, "AAA", "bar1");
// Read and write without a snapshot
txn->GetForUpdate(read_options, "AAA", &value);
ASSERT_EQ(value, "bar1");
txn->Put("AAA", "bar2");
// Modify BBB before snapshot is taken
db->Put(write_options, "BBB", "bar1");
txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();
// Read and write with snapshot
txn->GetForUpdate(snapshot_read_options, "BBB", &value);
ASSERT_EQ(value, "bar1");
txn->Put("BBB", "bar2");
db->Put(write_options, "CCC", "bar1");
// Set a new snapshot
txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();
// Read and write with snapshot
txn->GetForUpdate(snapshot_read_options, "CCC", &value);
ASSERT_EQ(value, "bar1");
txn->Put("CCC", "bar2");
s = txn->GetForUpdate(read_options, "AAA", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
s = txn->GetForUpdate(read_options, "BBB", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
s = txn->GetForUpdate(read_options, "CCC", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
s = db->Get(read_options, "AAA", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar1");
s = db->Get(read_options, "BBB", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar1");
s = db->Get(read_options, "CCC", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar1");
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "AAA", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
s = db->Get(read_options, "BBB", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
s = db->Get(read_options, "CCC", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
// verify that we track multiple writes to the same key at different snapshots
delete txn;
txn = txn_db->BeginTransaction(write_options);
// Potentially conflicting writes
db->Put(write_options, "ZZZ", "zzz");
db->Put(write_options, "XXX", "xxx");
txn->SetSnapshot();
OptimisticTransactionOptions txn_options;
txn_options.set_snapshot = true;
OptimisticTransaction* txn2 =
txn_db->BeginTransaction(write_options, txn_options);
txn2->SetSnapshot();
// This should not conflict in txn since the snapshot is later than the
// previous write (spoiler alert: it will later conflict with txn2).
txn->Put("ZZZ", "zzzz");
s = txn->Commit();
ASSERT_OK(s);
delete txn;
// This will conflict since the snapshot is earlier than another write to ZZZ
txn2->Put("ZZZ", "xxxxx");
s = txn2->Commit();
ASSERT_NOK(s);
delete txn2;
}
TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
OptimisticTransactionOptions txn_options;
string value;
Status s;
ColumnFamilyHandle *cfa, *cfb;
ColumnFamilyOptions cf_options;
// Create 2 new column families
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
ASSERT_OK(s);
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
ASSERT_OK(s);
delete cfa;
delete cfb;
delete txn_db;
// open DB with three column families
std::vector<ColumnFamilyDescriptor> column_families;
// have to open default column family
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
// open the new column families
column_families.push_back(
ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
column_families.push_back(
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
std::vector<ColumnFamilyHandle*> handles;
s = OptimisticTransactionDB::Open(options, dbname, column_families, &handles,
&txn_db);
ASSERT_OK(s);
db = txn_db->GetBaseDB();
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();
txn_options.set_snapshot = true;
OptimisticTransaction* txn2 =
txn_db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn2);
// Write some data to the db
WriteBatch batch;
batch.Put("foo", "foo");
batch.Put(handles[1], "AAA", "bar");
batch.Put(handles[1], "AAAZZZ", "bar");
s = db->Write(write_options, &batch);
ASSERT_OK(s);
db->Delete(write_options, handles[1], "AAAZZZ");
// These keys do no conflict with existing writes since they're in
// different column families
txn->Delete("AAA");
txn->GetForUpdate(snapshot_read_options, handles[1], "foo", &value);
Slice key_slice("AAAZZZ");
Slice value_slices[2] = {Slice("bar"), Slice("bar")};
txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2));
// Txn should commit
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "AAA", &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, handles[2], "AAAZZZ", &value);
ASSERT_EQ(value, "barbar");
Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")};
Slice value_slice("barbarbar");
// This write will cause a conflict with the earlier batch write
txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1));
txn2->Delete(handles[2], "XXX");
txn2->Delete(handles[1], "XXX");
s = txn2->GetForUpdate(snapshot_read_options, handles[1], "AAA", &value);
ASSERT_TRUE(s.IsNotFound());
// Verify txn did not commit
s = txn2->Commit();
ASSERT_NOK(s);
s = db->Get(read_options, handles[1], "AAAZZZ", &value);
ASSERT_EQ(value, "barbar");
delete txn;
delete txn2;
txn = txn_db->BeginTransaction(write_options, txn_options);
snapshot_read_options.snapshot = txn->GetSnapshot();
txn2 = txn_db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn);
std::vector<ColumnFamilyHandle*> multiget_cfh = {handles[1], handles[2],
handles[0], handles[2]};
std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
std::vector<std::string> values(4);
std::vector<Status> results = txn->MultiGetForUpdate(
snapshot_read_options, multiget_cfh, multiget_keys, &values);
ASSERT_OK(results[0]);
ASSERT_OK(results[1]);
ASSERT_OK(results[2]);
ASSERT_TRUE(results[3].IsNotFound());
ASSERT_EQ(values[0], "bar");
ASSERT_EQ(values[1], "barbar");
ASSERT_EQ(values[2], "foo");
txn->Delete(handles[2], "ZZZ");
txn->Put(handles[2], "ZZZ", "YYY");
txn->Put(handles[2], "ZZZ", "YYYY");
txn->Delete(handles[2], "ZZZ");
txn->Put(handles[2], "AAAZZZ", "barbarbar");
// Txn should commit
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, handles[2], "ZZZ", &value);
ASSERT_TRUE(s.IsNotFound());
// Put a key which will conflict with the next txn using the previous snapshot
db->Put(write_options, handles[2], "foo", "000");
results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh,
multiget_keys, &values);
ASSERT_OK(results[0]);
ASSERT_OK(results[1]);
ASSERT_OK(results[2]);
ASSERT_TRUE(results[3].IsNotFound());
ASSERT_EQ(values[0], "bar");
ASSERT_EQ(values[1], "barbar");
ASSERT_EQ(values[2], "foo");
// Verify Txn Did not Commit
s = txn2->Commit();
ASSERT_NOK(s);
s = db->DropColumnFamily(handles[1]);
ASSERT_OK(s);
s = db->DropColumnFamily(handles[2]);
ASSERT_OK(s);
delete txn;
delete txn2;
for (auto handle : handles) {
delete handle;
}
}
TEST_F(OptimisticTransactionTest, EmptyTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
s = db->Put(write_options, "aaa", "aaa");
ASSERT_OK(s);
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
s = txn->Commit();
ASSERT_OK(s);
delete txn;
txn = txn_db->BeginTransaction(write_options);
txn->Rollback();
delete txn;
txn = txn_db->BeginTransaction(write_options);
s = txn->GetForUpdate(read_options, "aaa", &value);
ASSERT_EQ(value, "aaa");
s = txn->Commit();
ASSERT_OK(s);
delete txn;
txn = txn_db->BeginTransaction(write_options);
txn->SetSnapshot();
s = txn->GetForUpdate(read_options, "aaa", &value);
ASSERT_EQ(value, "aaa");
s = db->Put(write_options, "aaa", "xxx");
s = txn->Commit();
ASSERT_NOK(s);
delete txn;
}
TEST_F(OptimisticTransactionTest, PredicateManyPreceders) {
WriteOptions write_options;
ReadOptions read_options1, read_options2;
OptimisticTransactionOptions txn_options;
string value;
Status s;
txn_options.set_snapshot = true;
OptimisticTransaction* txn1 =
txn_db->BeginTransaction(write_options, txn_options);
read_options1.snapshot = txn1->GetSnapshot();
OptimisticTransaction* txn2 = txn_db->BeginTransaction(write_options);
txn2->SetSnapshot();
read_options2.snapshot = txn2->GetSnapshot();
std::vector<Slice> multiget_keys = {"1", "2", "3"};
std::vector<std::string> multiget_values;
std::vector<Status> results =
txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
ASSERT_TRUE(results[1].IsNotFound());
txn2->Put("2", "x");
s = txn2->Commit();
ASSERT_OK(s);
multiget_values.clear();
results =
txn1->MultiGetForUpdate(read_options1, multiget_keys, &multiget_values);
ASSERT_TRUE(results[1].IsNotFound());
// should not commit since txn2 wrote a key txn has read
s = txn1->Commit();
ASSERT_NOK(s);
delete txn1;
delete txn2;
txn1 = txn_db->BeginTransaction(write_options, txn_options);
read_options1.snapshot = txn1->GetSnapshot();
txn2 = txn_db->BeginTransaction(write_options, txn_options);
read_options2.snapshot = txn2->GetSnapshot();
txn1->Put("4", "x");
txn2->Delete("4");
// txn1 can commit since txn2's delete hasn't happened yet (it's just batched)
s = txn1->Commit();
ASSERT_OK(s);
s = txn2->GetForUpdate(read_options2, "4", &value);
ASSERT_TRUE(s.IsNotFound());
// txn2 cannot commit since txn1 changed "4"
s = txn2->Commit();
ASSERT_NOK(s);
delete txn1;
delete txn2;
}
TEST_F(OptimisticTransactionTest, LostUpdate) {
WriteOptions write_options;
ReadOptions read_options, read_options1, read_options2;
OptimisticTransactionOptions txn_options;
string value;
Status s;
// Test 2 transactions writing to the same key in multiple orders and
// with/without snapshots
OptimisticTransaction* txn1 = txn_db->BeginTransaction(write_options);
OptimisticTransaction* txn2 = txn_db->BeginTransaction(write_options);
txn1->Put("1", "1");
txn2->Put("1", "2");
s = txn1->Commit();
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_NOK(s);
delete txn1;
delete txn2;
txn_options.set_snapshot = true;
txn1 = txn_db->BeginTransaction(write_options, txn_options);
read_options1.snapshot = txn1->GetSnapshot();
txn2 = txn_db->BeginTransaction(write_options, txn_options);
read_options2.snapshot = txn2->GetSnapshot();
txn1->Put("1", "3");
txn2->Put("1", "4");
s = txn1->Commit();
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_NOK(s);
delete txn1;
delete txn2;
txn1 = txn_db->BeginTransaction(write_options, txn_options);
read_options1.snapshot = txn1->GetSnapshot();
txn2 = txn_db->BeginTransaction(write_options, txn_options);
read_options2.snapshot = txn2->GetSnapshot();
txn1->Put("1", "5");
s = txn1->Commit();
ASSERT_OK(s);
txn2->Put("1", "6");
s = txn2->Commit();
ASSERT_NOK(s);
delete txn1;
delete txn2;
txn1 = txn_db->BeginTransaction(write_options, txn_options);
read_options1.snapshot = txn1->GetSnapshot();
txn2 = txn_db->BeginTransaction(write_options, txn_options);
read_options2.snapshot = txn2->GetSnapshot();
txn1->Put("1", "5");
s = txn1->Commit();
ASSERT_OK(s);
txn2->SetSnapshot();
txn2->Put("1", "6");
s = txn2->Commit();
ASSERT_OK(s);
delete txn1;
delete txn2;
txn1 = txn_db->BeginTransaction(write_options);
txn2 = txn_db->BeginTransaction(write_options);
txn1->Put("1", "7");
s = txn1->Commit();
ASSERT_OK(s);
txn2->Put("1", "8");
s = txn2->Commit();
ASSERT_OK(s);
delete txn1;
delete txn2;
s = db->Get(read_options, "1", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "8");
}
TEST_F(OptimisticTransactionTest, UntrackedWrites) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
// Verify transaction rollback works for untracked keys.
OptimisticTransaction* txn = txn_db->BeginTransaction(write_options);
txn->PutUntracked("untracked", "0");
txn->Rollback();
s = db->Get(read_options, "untracked", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
txn = txn_db->BeginTransaction(write_options);
txn->Put("tracked", "1");
txn->PutUntracked("untracked", "1");
txn->MergeUntracked("untracked", "2");
txn->DeleteUntracked("untracked");
// Write to the untracked key outside of the transaction and verify
// it doesn't prevent the transaction from committing.
s = db->Put(write_options, "untracked", "x");
ASSERT_OK(s);
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "untracked", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
txn = txn_db->BeginTransaction(write_options);
txn->Put("tracked", "10");
txn->PutUntracked("untracked", "A");
// Write to tracked key outside of the transaction and verify that the
// untracked keys are not written when the commit fails.
s = db->Delete(write_options, "tracked");
s = txn->Commit();
ASSERT_NOK(s);
s = db->Get(read_options, "untracked", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif // ROCKSDB_LITE
Loading…
Cancel
Save