From d2f0912bd33273a830e1d4d28e863b7fb91c360e Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Thu, 2 Jul 2015 17:23:41 -0700 Subject: [PATCH] Merge the latest changes from github/master --- db/c.cc | 50 +++++ db/column_family_test.cc | 4 + db/db_bench.cc | 284 +++++++++++++++++++++++++- db/version_set.cc | 2 +- include/rocksdb/env.h | 43 ++++ include/rocksdb/table_properties.h | 2 +- include/rocksdb/thread_status.h | 5 - java/rocksjni/options.cc | 48 +++++ java/rocksjni/write_batch_test.cc | 2 +- port/win/env_win.cc | 121 +++++++---- port/win/win_logger.cc | 2 + util/arena.cc | 2 +- util/env_posix.cc | 83 ++++++-- util/env_test.cc | 81 ++++++++ util/options_test.cc | 2 + util/posix_logger.h | 2 + util/xfunc.cc | 115 +++++++++++ util/xfunc.h | 10 + utilities/backupable/backupable_db.cc | 49 ++++- 19 files changed, 823 insertions(+), 84 deletions(-) diff --git a/db/c.cc b/db/c.cc index 96eecb06d..96a5882d8 100644 --- a/db/c.cc +++ b/db/c.cc @@ -608,6 +608,10 @@ void rocksdb_close(rocksdb_t* db) { delete db; } +void rocksdb_options_set_uint64add_merge_operator(rocksdb_options_t* opt) { + opt->rep.merge_operator = rocksdb::MergeOperators::CreateUInt64AddOperator(); +} + rocksdb_t* rocksdb_open_column_families( const rocksdb_options_t* db_options, const char* name, @@ -1359,6 +1363,26 @@ void rocksdb_block_based_options_set_whole_key_filtering( options->rep.whole_key_filtering = v; } +void rocksdb_block_based_options_set_format_version( + rocksdb_block_based_table_options_t* options, int v) { + options->rep.format_version = v; +} + +void rocksdb_block_based_options_set_index_type( + rocksdb_block_based_table_options_t* options, int v) { + options->rep.index_type = static_cast(v); +} + +void rocksdb_block_based_options_set_hash_index_allow_collision( + rocksdb_block_based_table_options_t* options, unsigned char v) { + options->rep.hash_index_allow_collision = v; +} + +void rocksdb_block_based_options_set_cache_index_and_filter_blocks( + rocksdb_block_based_table_options_t* options, unsigned char v) { + options->rep.cache_index_and_filter_blocks = v; +} + void rocksdb_options_set_block_based_table_factory( rocksdb_options_t *opt, rocksdb_block_based_table_options_t* table_options) { @@ -1741,6 +1765,11 @@ void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t* opt opt->rep.min_write_buffer_number_to_merge = n; } +void rocksdb_options_set_max_write_buffer_number_to_maintain( + rocksdb_options_t* opt, int n) { + opt->rep.max_write_buffer_number_to_maintain = n; +} + void rocksdb_options_set_max_background_compactions(rocksdb_options_t* opt, int n) { opt->rep.max_background_compactions = n; } @@ -2284,6 +2313,27 @@ rocksdb_slicetransform_t* rocksdb_slicetransform_create_fixed_prefix(size_t pref return wrapper; } +rocksdb_slicetransform_t* rocksdb_slicetransform_create_noop() { + struct Wrapper : public rocksdb_slicetransform_t { + const SliceTransform* rep_; + ~Wrapper() { delete rep_; } + const char* Name() const override { return rep_->Name(); } + Slice Transform(const Slice& src) const override { + return rep_->Transform(src); + } + bool InDomain(const Slice& src) const override { + return rep_->InDomain(src); + } + bool InRange(const Slice& src) const override { return rep_->InRange(src); } + static void DoNothing(void*) { } + }; + Wrapper* wrapper = new Wrapper; + wrapper->rep_ = rocksdb::NewNoopTransform(); + wrapper->state_ = nullptr; + wrapper->destructor_ = &Wrapper::DoNothing; + return wrapper; +} + rocksdb_universal_compaction_options_t* rocksdb_universal_compaction_options_create() { rocksdb_universal_compaction_options_t* result = new rocksdb_universal_compaction_options_t; result->rep = new rocksdb::CompactionOptionsUniversal; diff --git a/db/column_family_test.cc b/db/column_family_test.cc index e70f916f3..c564103bd 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -711,15 +711,19 @@ TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) { default_cf.write_buffer_size = 100000; default_cf.max_write_buffer_number = 10; default_cf.min_write_buffer_number_to_merge = 1; + default_cf.max_write_buffer_number_to_maintain = 0; one.write_buffer_size = 200000; one.max_write_buffer_number = 10; one.min_write_buffer_number_to_merge = 2; + one.max_write_buffer_number_to_maintain = 1; two.write_buffer_size = 1000000; two.max_write_buffer_number = 10; two.min_write_buffer_number_to_merge = 3; + two.max_write_buffer_number_to_maintain = 2; three.write_buffer_size = 90000; three.max_write_buffer_number = 10; three.min_write_buffer_number_to_merge = 4; + three.max_write_buffer_number_to_maintain = -1; Reopen({default_cf, one, two, three}); diff --git a/db/db_bench.cc b/db/db_bench.cc index 425964a75..20e8dbc59 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -54,6 +54,8 @@ int main() { #include "rocksdb/slice_transform.h" #include "rocksdb/perf_context.h" #include "rocksdb/utilities/flashcache.h" +#include "rocksdb/utilities/optimistic_transaction.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" #include "port/port.h" #include "port/stack_trace.h" #include "util/crc32c.h" @@ -106,7 +108,8 @@ DEFINE_string(benchmarks, "compress," "uncompress," "acquireload," - "fillseekseq,", + "fillseekseq," + "randomtransaction", "Comma-separated list of operations to run in the specified order" "Actual benchmarks:\n" @@ -157,6 +160,8 @@ DEFINE_string(benchmarks, "\tacquireload -- load N*1000 times\n" "\tfillseekseq -- write N values in sequential key, then read " "them by seeking to each key\n" + "\trandomtransaction -- execute N random transactions and " + "verify correctness\n" "Meta operations:\n" "\tcompact -- Compact the entire DB\n" "\tstats -- Print DB stats\n" @@ -263,6 +268,20 @@ DEFINE_int32(min_write_buffer_number_to_merge, " writing less data to storage if there are duplicate records " " in each of these individual write buffers."); +DEFINE_int32(max_write_buffer_number_to_maintain, + rocksdb::Options().max_write_buffer_number_to_maintain, + "The total maximum number of write buffers to maintain in memory " + "including copies of buffers that have already been flushed. " + "Unlike max_write_buffer_number, this parameter does not affect " + "flushing. This controls the minimum amount of write history " + "that will be available in memory for conflict checking when " + "Transactions are used. If this value is too low, some " + "transactions may fail at commit time due to not being able to " + "determine whether there were any write conflicts. Setting this " + "value to 0 will cause write buffers to be freed immediately " + "after they are flushed. If this value is set to -1, " + "'max_write_buffer_number' will be used."); + DEFINE_int32(max_background_compactions, rocksdb::Options().max_background_compactions, "The maximum number of concurrent background compactions" @@ -425,6 +444,18 @@ DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/" DEFINE_uint64(delete_obsolete_files_period_micros, 0, "Ignored. Left here for backward compatibility"); +DEFINE_bool(transaction_db, false, + "Open a OptimisticTransactionDB instance. " + "Required for randomtransaction benchmark."); + +DEFINE_uint64(transaction_sets, 2, + "Number of keys each transaction will " + "modify (use in RandomTransaction only). Max: 9999"); + +DEFINE_int32(transaction_sleep, 0, + "Max microseconds to sleep in between " + "reading and writing a value (used in RandomTransaction only). "); + namespace { enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { assert(ctype); @@ -884,6 +915,7 @@ static void AppendWithSpace(std::string* str, Slice msg) { struct DBWithColumnFamilies { std::vector cfh; DB* db; + OptimisticTransactionDB* txn_db; std::atomic num_created; // Need to be updated after all the // new entries in cfh are set. size_t num_hot; // Number of column families to be queried at each moment. @@ -891,7 +923,7 @@ struct DBWithColumnFamilies { // Column families will be created and used to be queried. port::Mutex create_cf_mutex; // Only one thread can execute CreateNewCf() - DBWithColumnFamilies() : db(nullptr) { + DBWithColumnFamilies() : db(nullptr), txn_db(nullptr) { cfh.clear(); num_created = 0; num_hot = 0; @@ -900,9 +932,23 @@ struct DBWithColumnFamilies { DBWithColumnFamilies(const DBWithColumnFamilies& other) : cfh(other.cfh), db(other.db), + txn_db(other.txn_db), num_created(other.num_created.load()), num_hot(other.num_hot) {} + void DeleteDBs() { + std::for_each(cfh.begin(), cfh.end(), + [](ColumnFamilyHandle* cfhi) { delete cfhi; }); + cfh.clear(); + if (txn_db) { + delete txn_db; + txn_db = nullptr; + } else { + delete db; + } + db = nullptr; + } + ColumnFamilyHandle* GetCfh(int64_t rand_num) { assert(num_hot > 0); return cfh[num_created.load(std::memory_order_acquire) - num_hot + @@ -1604,9 +1650,7 @@ class Benchmark { } ~Benchmark() { - std::for_each(db_.cfh.begin(), db_.cfh.end(), - [](ColumnFamilyHandle* cfh) { delete cfh; }); - delete db_.db; + db_.DeleteDBs(); delete prefix_extractor_; if (cache_.get() != nullptr) { // this will leak, but we're shutting down so nobody cares @@ -1710,6 +1754,8 @@ class Benchmark { write_options_.disableWAL = FLAGS_disable_wal; void (Benchmark::*method)(ThreadState*) = nullptr; + void (Benchmark::*post_process_method)() = nullptr; + bool fresh_db = false; int num_threads = FLAGS_threads; @@ -1825,6 +1871,9 @@ class Benchmark { method = &Benchmark::Compress; } else if (name == Slice("uncompress")) { method = &Benchmark::Uncompress; + } else if (name == Slice("randomtransaction")) { + method = &Benchmark::RandomTransaction; + post_process_method = &Benchmark::RandomTransactionVerify; } else if (name == Slice("stats")) { PrintStats("rocksdb.stats"); } else if (name == Slice("levelstats")) { @@ -1845,11 +1894,7 @@ class Benchmark { method = nullptr; } else { if (db_.db != nullptr) { - std::for_each(db_.cfh.begin(), db_.cfh.end(), - [](ColumnFamilyHandle* cfh) { delete cfh; }); - delete db_.db; - db_.db = nullptr; - db_.cfh.clear(); + db_.DeleteDBs(); DestroyDB(FLAGS_db, open_options_); } for (size_t i = 0; i < multi_dbs_.size(); i++) { @@ -1865,6 +1910,9 @@ class Benchmark { fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); RunBenchmark(num_threads, name, method); } + if (post_process_method != nullptr) { + (this->*post_process_method)(); + } } if (FLAGS_statistics) { fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); @@ -2175,6 +2223,8 @@ class Benchmark { options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; + options.max_write_buffer_number_to_maintain = + FLAGS_max_write_buffer_number_to_maintain; options.max_background_compactions = FLAGS_max_background_compactions; options.max_background_flushes = FLAGS_max_background_flushes; options.compaction_style = FLAGS_compaction_style_e; @@ -2428,6 +2478,11 @@ class Benchmark { NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec)); } + if (FLAGS_readonly && FLAGS_transaction_db) { + fprintf(stderr, "Cannot use readonly flag with transaction_db\n"); + exit(1); + } + if (FLAGS_num_multi_db <= 1) { OpenDb(options, FLAGS_db, &db_); } else { @@ -2462,15 +2517,25 @@ class Benchmark { if (FLAGS_readonly) { s = DB::OpenForReadOnly(options, db_name, column_families, &db->cfh, &db->db); + } else if (FLAGS_transaction_db) { + s = OptimisticTransactionDB::Open(options, db_name, column_families, + &db->cfh, &db->txn_db); + if (s.ok()) { + db->db = db->txn_db->GetBaseDB(); + } } else { s = DB::Open(options, db_name, column_families, &db->cfh, &db->db); } db->cfh.resize(FLAGS_num_column_families); db->num_created = num_hot; db->num_hot = num_hot; - } else if (FLAGS_readonly) { s = DB::OpenForReadOnly(options, db_name, &db->db); + } else if (FLAGS_transaction_db) { + s = OptimisticTransactionDB::Open(options, db_name, &db->txn_db); + if (s.ok()) { + db->db = db->txn_db->GetBaseDB(); + } } else { s = DB::Open(options, db_name, &db->db); } @@ -3515,6 +3580,203 @@ class Benchmark { } } + // This benchmark stress tests Transactions. For a given --duration (or + // total number of --writes, a Transaction will perform a read-modify-write + // to increment the value of a key in each of N(--transaction-sets) sets of + // keys (where each set has --num keys). If --threads is set, this will be + // done in parallel. + // + // To test transactions, use --transaction_db=true. Not setting this + // parameter + // will run the same benchmark without transactions. + // + // RandomTransactionVerify() will then validate the correctness of the results + // by checking if the sum of all keys in each set is the same. + void RandomTransaction(ThreadState* thread) { + ReadOptions options(FLAGS_verify_checksum, true); + Duration duration(FLAGS_duration, readwrites_); + ReadOptions read_options(FLAGS_verify_checksum, true); + std::string value; + DB* db = db_.db; + uint64_t transactions_done = 0; + uint64_t transactions_aborted = 0; + Status s; + uint64_t num_prefix_ranges = FLAGS_transaction_sets; + bool use_txn = FLAGS_transaction_db; + + if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) { + fprintf(stderr, "invalid value for transaction_sets\n"); + abort(); + } + + if (FLAGS_num_multi_db > 1) { + fprintf(stderr, + "Cannot run RandomTransaction benchmark with " + "FLAGS_multi_db > 1."); + abort(); + } + + while (!duration.Done(1)) { + OptimisticTransaction* txn = nullptr; + WriteBatch* batch = nullptr; + + if (use_txn) { + txn = db_.txn_db->BeginTransaction(write_options_); + assert(txn); + } else { + batch = new WriteBatch(); + } + + // pick a random number to use to increment a key in each set + uint64_t incr = (thread->rand.Next() % 100) + 1; + + // For each set, pick a key at random and increment it + for (uint8_t i = 0; i < num_prefix_ranges; i++) { + uint64_t int_value; + char prefix_buf[5]; + + // key format: [SET#][random#] + std::string rand_key = ToString(thread->rand.Next() % FLAGS_num); + Slice base_key(rand_key); + + // Pad prefix appropriately so we can iterate over each set + snprintf(prefix_buf, sizeof(prefix_buf), "%04d", i + 1); + std::string full_key = std::string(prefix_buf) + base_key.ToString(); + Slice key(full_key); + + if (use_txn) { + s = txn->Get(read_options, key, &value); + } else { + s = db->Get(read_options, key, &value); + } + + if (s.ok()) { + int_value = std::stoull(value); + + if (int_value == 0 || int_value == ULONG_MAX) { + fprintf(stderr, "Get returned unexpected value: %s\n", + value.c_str()); + abort(); + } + } else if (s.IsNotFound()) { + int_value = 0; + } else { + fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); + abort(); + } + + if (FLAGS_transaction_sleep > 0) { + FLAGS_env->SleepForMicroseconds(thread->rand.Next() % + FLAGS_transaction_sleep); + } + + std::string sum = ToString(int_value + incr); + if (use_txn) { + txn->Put(key, sum); + } else { + batch->Put(key, sum); + } + } + + if (use_txn) { + s = txn->Commit(); + } else { + s = db->Write(write_options_, batch); + } + + if (!s.ok()) { + // Ideally, we'd want to run this stress test with enough concurrency + // on a small enough set of keys that we get some failed transactions + // due to conflicts. + if (use_txn && s.IsBusy()) { + transactions_aborted++; + } else { + fprintf(stderr, "Unexpected write error: %s\n", s.ToString().c_str()); + abort(); + } + } + + if (txn) { + delete txn; + } + if (batch) { + delete batch; + } + + transactions_done++; + } + + char msg[100]; + if (use_txn) { + snprintf(msg, sizeof(msg), + "( transactions:%" PRIu64 " aborts:%" PRIu64 ")", + transactions_done, transactions_aborted); + } else { + snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done); + } + thread->stats.AddMessage(msg); + + if (FLAGS_perf_level > 0) { + thread->stats.AddMessage(perf_context.ToString()); + } + } + + // Verifies consistency of data after RandomTransaction() has been run. + // Since each iteration of RandomTransaction() incremented a key in each set + // by the same value, the sum of the keys in each set should be the same. + void RandomTransactionVerify() { + if (!FLAGS_transaction_db) { + // transactions not used, nothing to verify. + return; + } + + uint64_t prev_total = 0; + + // For each set of keys with the same prefix, sum all the values + for (uint32_t i = 0; i < FLAGS_transaction_sets; i++) { + char prefix_buf[5]; + snprintf(prefix_buf, sizeof(prefix_buf), "%04u", i + 1); + uint64_t total = 0; + + Iterator* iter = db_.db->NewIterator(ReadOptions()); + + for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { + Slice key = iter->key(); + + // stop when we reach a different prefix + if (key.ToString().compare(0, 4, prefix_buf) != 0) { + break; + } + + Slice value = iter->value(); + uint64_t int_value = std::stoull(value.ToString()); + if (int_value == 0 || int_value == ULONG_MAX) { + fprintf(stderr, "Iter returned unexpected value: %s\n", + value.ToString().c_str()); + abort(); + } + + total += int_value; + } + delete iter; + + if (i > 0) { + if (total != prev_total) { + fprintf(stderr, + "RandomTransactionVerify found inconsistent totals. " + "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 + " \n", + i - 1, prev_total, i, total); + abort(); + } + } + prev_total = total; + } + + fprintf(stdout, "RandomTransactionVerify Success! Total:%" PRIu64 "\n", + prev_total); + } + void Compact(ThreadState* thread) { DB* db = SelectDB(thread); db->CompactRange(CompactRangeOptions(), nullptr, nullptr); diff --git a/db/version_set.cc b/db/version_set.cc index f7c2fc5cd..3527d2cb0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1561,7 +1561,7 @@ const char* VersionStorageInfo::LevelSummary( if (!files_marked_for_compaction_.empty()) { snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, - " (%zu files need compaction)", + " (%" ROCKSDB_PRIszt " files need compaction)", files_marked_for_compaction_.size()); } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index f25098921..6a62111a2 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -553,6 +553,8 @@ class WritableFile { void operator=(const WritableFile&); protected: + friend class WritableFileWrapper; + Env::IOPriority io_priority_; }; @@ -892,6 +894,47 @@ class EnvWrapper : public Env { Env* target_; }; +// An implementation of WritableFile that forwards all calls to another +// WritableFile. May be useful to clients who wish to override just part of the +// functionality of another WritableFile. +// It's declared as friend of WritableFile to allow forwarding calls to +// protected virtual methods. +class WritableFileWrapper : public WritableFile { + public: + explicit WritableFileWrapper(WritableFile* t) : target_(t) { } + + Status Append(const Slice& data) override { return target_->Append(data); } + Status Close() override { return target_->Close(); } + Status Flush() override { return target_->Flush(); } + Status Sync() override { return target_->Sync(); } + Status Fsync() override { return target_->Fsync(); } + void SetIOPriority(Env::IOPriority pri) override { + target_->SetIOPriority(pri); + } + uint64_t GetFileSize() override { return target_->GetFileSize(); } + void GetPreallocationStatus(size_t* block_size, + size_t* last_allocated_block) override { + target_->GetPreallocationStatus(block_size, last_allocated_block); + } + size_t GetUniqueId(char* id, size_t max_size) const override { + return target_->GetUniqueId(id, max_size); + } + Status InvalidateCache(size_t offset, size_t length) override { + return target_->InvalidateCache(offset, length); + } + + protected: + Status Allocate(off_t offset, off_t len) override { + return target_->Allocate(offset, len); + } + Status RangeSync(off_t offset, off_t nbytes) override { + return target_->RangeSync(offset, nbytes); + } + + private: + WritableFile* target_; +}; + // Returns a new environment that stores its data in memory and delegates // all non-file-storage tasks to base_env. The caller must delete the result // when it is no longer needed. diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index b44d5a0bd..892cfb26e 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -25,7 +25,7 @@ namespace rocksdb { // ++pos) { // ... // } -typedef std::map UserCollectedProperties; +typedef std::map UserCollectedProperties; // TableProperties contains a bunch of read-only properties of its associated // table. diff --git a/include/rocksdb/thread_status.h b/include/rocksdb/thread_status.h index 7aecb39ee..282c9eb2e 100644 --- a/include/rocksdb/thread_status.h +++ b/include/rocksdb/thread_status.h @@ -13,9 +13,6 @@ #pragma once -#ifndef STORAGE_ROCKSDB_INCLUDE_THREAD_STATUS_H_ -#define STORAGE_ROCKSDB_INCLUDE_THREAD_STATUS_H_ - #include #include #include @@ -205,5 +202,3 @@ struct ThreadStatus { } // namespace rocksdb - -#endif // STORAGE_ROCKSDB_INCLUDE_THREAD_STATUS_H_ diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 2186d0ce7..6f4ee5418 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -993,6 +993,30 @@ void Java_org_rocksdb_Options_setMinWriteBufferNumberToMerge( jhandle)->min_write_buffer_number_to_merge = static_cast(jmin_write_buffer_number_to_merge); } +/* + * Class: org_rocksdb_Options + * Method: maxWriteBufferNumberToMaintain + * Signature: (J)I + */ +jint Java_org_rocksdb_Options_maxWriteBufferNumberToMaintain(JNIEnv* env, + jobject jobj, + jlong jhandle) { + return reinterpret_cast(jhandle) + ->max_write_buffer_number_to_maintain; +} + +/* + * Class: org_rocksdb_Options + * Method: setMaxWriteBufferNumberToMaintain + * Signature: (JI)V + */ +void Java_org_rocksdb_Options_setMaxWriteBufferNumberToMaintain( + JNIEnv* env, jobject jobj, jlong jhandle, + jint jmax_write_buffer_number_to_maintain) { + reinterpret_cast(jhandle) + ->max_write_buffer_number_to_maintain = + static_cast(jmax_write_buffer_number_to_maintain); +} /* * Class: org_rocksdb_Options @@ -2153,6 +2177,30 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMinWriteBufferNumberToMerge( static_cast(jmin_write_buffer_number_to_merge); } +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: maxWriteBufferNumberToMaintain + * Signature: (J)I + */ +jint Java_org_rocksdb_ColumnFamilyOptions_maxWriteBufferNumberToMaintain( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle) + ->max_write_buffer_number_to_maintain; +} + +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: setMaxWriteBufferNumberToMaintain + * Signature: (JI)V + */ +void Java_org_rocksdb_ColumnFamilyOptions_setMaxWriteBufferNumberToMaintain( + JNIEnv* env, jobject jobj, jlong jhandle, + jint jmax_write_buffer_number_to_maintain) { + reinterpret_cast(jhandle) + ->max_write_buffer_number_to_maintain = + static_cast(jmax_write_buffer_number_to_maintain); +} + /* * Class: org_rocksdb_ColumnFamilyOptions * Method: setCompressionType diff --git a/java/rocksjni/write_batch_test.cc b/java/rocksjni/write_batch_test.cc index ff7aff836..d54029141 100644 --- a/java/rocksjni/write_batch_test.cc +++ b/java/rocksjni/write_batch_test.cc @@ -47,7 +47,7 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents( rocksdb::MemTable* mem = new rocksdb::MemTable( cmp, rocksdb::ImmutableCFOptions(options), rocksdb::MutableCFOptions(options, rocksdb::ImmutableCFOptions(options)), - &wb); + &wb, rocksdb::kMaxSequenceNumber); mem->Ref(); std::string state; rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem); diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 31cbb197c..c56873ce2 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -296,7 +296,11 @@ public: pending_fsync_ = true; - SSIZE_T done = pwrite(hFile_, src, left, offset); + SSIZE_T done = 0; + { + IOSTATS_TIMER_GUARD(write_nanos); + done = pwrite(hFile_, src, left, offset); + } if (done < 0) { return IOErrorFromWindowsError("pwrite failed to: " + filename_, GetLastError()); @@ -371,6 +375,11 @@ public: pending_fsync_ = false; return Status::OK(); } + + virtual Status Allocate(off_t offset, off_t len) override { + IOSTATS_TIMER_GUARD(allocate_nanos); + return fallocate(filename_, hFile_, len); + } }; @@ -459,6 +468,7 @@ private: // Normally it does not present a problem since in memory mapped files // we do not disable buffering Status ReserveFileSpace(uint64_t toSize) { + IOSTATS_TIMER_GUARD(allocate_nanos); return fallocate(filename_, hFile_, toSize); } @@ -1281,6 +1291,7 @@ public: return status; } + IOSTATS_TIMER_GUARD(allocate_nanos); status = fallocate(filename_, hFile_, spaceToReserve); if (status.ok()) { reservedsize_ = spaceToReserve; @@ -1500,13 +1511,17 @@ public: // Corruption test needs to rename and delete files of these kind // while they are still open with another handle. For that reason we // allow share_write and delete(allows rename). - HANDLE hFile = CreateFileA(fname.c_str(), - GENERIC_READ, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - NULL, - OPEN_EXISTING, // Original fopen mode is "rb" - FILE_ATTRIBUTE_NORMAL, - NULL); + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA(fname.c_str(), + GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, + OPEN_EXISTING, // Original fopen mode is "rb" + FILE_ATTRIBUTE_NORMAL, + NULL); + } if (hFile == INVALID_HANDLE_VALUE) { auto lastError = GetLastError(); @@ -1549,15 +1564,19 @@ public: } /// Shared access is necessary for corruption test to pass - // almost all tests wwould work with a possible exception of fault_injection - HANDLE hFile = CreateFileA( - fname.c_str(), - GENERIC_READ, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - NULL, - OPEN_EXISTING, - fileFlags, - NULL); + // almost all tests would work with a possible exception of fault_injection + HANDLE hFile; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA( + fname.c_str(), + GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, + OPEN_EXISTING, + fileFlags, + NULL); + } if (INVALID_HANDLE_VALUE == hFile) { auto lastError = GetLastError(); @@ -1649,14 +1668,18 @@ public: shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE); } - HANDLE hFile = CreateFileA(fname.c_str(), - desired_access, // Access desired - shared_mode, - NULL, // Security attributes - CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC - fileFlags, // Flags - NULL); // Template File - + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA(fname.c_str(), + desired_access, // Access desired + shared_mode, + NULL, // Security attributes + CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC + fileFlags, // Flags + NULL); // Template File + } + if (INVALID_HANDLE_VALUE == hFile) { auto lastError = GetLastError(); return IOErrorFromWindowsError("Failed to create a NewWriteableFile: " + fname, lastError); @@ -1683,14 +1706,18 @@ public: Status s; - HANDLE hFile = CreateFileA(fname.c_str(), - GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ, - NULL, - OPEN_ALWAYS, // Posix env specifies O_CREAT, it will open existing file or create new - FILE_ATTRIBUTE_NORMAL, - NULL); - + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA(fname.c_str(), + GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ, + NULL, + OPEN_ALWAYS, // Posix env specifies O_CREAT, it will open existing file or create new + FILE_ATTRIBUTE_NORMAL, + NULL); + } + if (hFile == INVALID_HANDLE_VALUE) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("Failed to Open/Create NewRandomRWFile" + fname, lastError); @@ -1710,6 +1737,7 @@ public: if (!DirExists(name)) { s = IOError("Directory does not exist: " + name, EEXIST); } else { + IOSTATS_TIMER_GUARD(open_nanos); result->reset(new WinDirectory); } return s; @@ -1889,9 +1917,12 @@ public: // Obtain exclusive access to the LOCK file // Previously, instead of NORMAL attr we set DELETE on close and that worked // well except with fault_injection test that insists on deleting it. - HANDLE hFile = CreateFileA(lockFname.c_str(), (GENERIC_READ | GENERIC_WRITE), - ExclusiveAccessON, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); - + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA(lockFname.c_str(), (GENERIC_READ | GENERIC_WRITE), + ExclusiveAccessON, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); + } if (INVALID_HANDLE_VALUE == hFile) { auto lastError = GetLastError(); @@ -1975,13 +2006,17 @@ public: result->reset(); - HANDLE hFile = CreateFileA(fname.c_str(), - GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_DELETE, // In RocksDb log files are renamed and deleted before they are closed. This enables doing so. - NULL, - CREATE_ALWAYS, // Original fopen mode is "w" - FILE_ATTRIBUTE_NORMAL, - NULL); + HANDLE hFile = 0; + { + IOSTATS_TIMER_GUARD(open_nanos); + hFile = CreateFileA(fname.c_str(), + GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_DELETE, // In RocksDb log files are renamed and deleted before they are closed. This enables doing so. + NULL, + CREATE_ALWAYS, // Original fopen mode is "w" + FILE_ATTRIBUTE_NORMAL, + NULL); + } if (hFile == INVALID_HANDLE_VALUE) { auto lastError = GetLastError(); diff --git a/port/win/win_logger.cc b/port/win/win_logger.cc index c4eab7082..1ef58e21b 100644 --- a/port/win/win_logger.cc +++ b/port/win/win_logger.cc @@ -20,6 +20,7 @@ #include "rocksdb/env.h" #include "port/win/win_logger.h" #include "port/sys_time.h" +#include "util/iostats_context_imp.h" namespace rocksdb { @@ -62,6 +63,7 @@ void WinLogger::Flush() { void WinLogger::Logv(const char* format, va_list ap) { const uint64_t thread_id = (*gettid_)(); + IOSTATS_TIMER_GUARD(logger_nanos); // We try twice: the first time with a fixed-size stack allocated buffer, // and the second time with a much larger dynamically allocated buffer. char buffer[500]; diff --git a/util/arena.cc b/util/arena.cc index 4c243527e..38b3c5f5c 100644 --- a/util/arena.cc +++ b/util/arena.cc @@ -51,7 +51,7 @@ Arena::~Arena() { for (const auto& block : blocks_) { delete[] block; } -#ifdef MAP_HUGETLB +#ifndef OS_WIN for (const auto& mmap_info : huge_blocks_) { auto ret = munmap(mmap_info.addr_, mmap_info.length_); if (ret != 0) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 65c0e848c..bb14c5af5 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -350,7 +350,7 @@ class PosixMmapReadableFile: public RandomAccessFile { virtual ~PosixMmapReadableFile() { int ret = munmap(mmapped_region_, length_); if (ret != 0) { - fprintf(stdout, "failed to munmap %p length %zu \n", + fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n", mmapped_region_, length_); } } @@ -443,14 +443,17 @@ class PosixMmapFile : public WritableFile { TEST_KILL_RANDOM(rocksdb_kill_odds); // we can't fallocate with FALLOC_FL_KEEP_SIZE here - int alloc_status = fallocate(fd_, 0, file_offset_, map_size_); - if (alloc_status != 0) { - // fallback to posix_fallocate - alloc_status = posix_fallocate(fd_, file_offset_, map_size_); - } - if (alloc_status != 0) { - return Status::IOError("Error allocating space to file : " + filename_ + - "Error : " + strerror(alloc_status)); + { + IOSTATS_TIMER_GUARD(allocate_nanos); + int alloc_status = fallocate(fd_, 0, file_offset_, map_size_); + if (alloc_status != 0) { + // fallback to posix_fallocate + alloc_status = posix_fallocate(fd_, file_offset_, map_size_); + } + if (alloc_status != 0) { + return Status::IOError("Error allocating space to file : " + filename_ + + "Error : " + strerror(alloc_status)); + } } TEST_KILL_RANDOM(rocksdb_kill_odds); @@ -639,6 +642,7 @@ class PosixMmapFile : public WritableFile { #ifdef ROCKSDB_FALLOCATE_PRESENT virtual Status Allocate(off_t offset, off_t len) override { TEST_KILL_RANDOM(rocksdb_kill_odds); + IOSTATS_TIMER_GUARD(allocate_nanos); int alloc_status = fallocate( fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); if (alloc_status == 0) { @@ -725,7 +729,12 @@ class PosixWritableFile : public WritableFile { cursize_ += left; } else { while (left != 0) { - ssize_t done = write(fd_, src, RequestToken(left)); + ssize_t done; + size_t size = RequestToken(left); + { + IOSTATS_TIMER_GUARD(write_nanos); + done = write(fd_, src, size); + } if (done < 0) { if (errno == EINTR) { continue; @@ -773,6 +782,7 @@ class PosixWritableFile : public WritableFile { // tmpfs (since Linux 3.5) // We ignore error since failure of this operation does not affect // correctness. + IOSTATS_TIMER_GUARD(allocate_nanos); fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_, block_size * last_allocated_block - filesize_); #endif @@ -791,7 +801,12 @@ class PosixWritableFile : public WritableFile { size_t left = cursize_; char* src = buf_.get(); while (left != 0) { - ssize_t done = write(fd_, src, RequestToken(left)); + ssize_t done; + size_t size = RequestToken(left); + { + IOSTATS_TIMER_GUARD(write_nanos); + done = write(fd_, src, size); + } if (done < 0) { if (errno == EINTR) { continue; @@ -865,7 +880,9 @@ class PosixWritableFile : public WritableFile { #ifdef ROCKSDB_FALLOCATE_PRESENT virtual Status Allocate(off_t offset, off_t len) override { TEST_KILL_RANDOM(rocksdb_kill_odds); - int alloc_status = fallocate( + int alloc_status; + IOSTATS_TIMER_GUARD(allocate_nanos); + alloc_status = fallocate( fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); if (alloc_status == 0) { return Status::OK(); @@ -875,6 +892,7 @@ class PosixWritableFile : public WritableFile { } virtual Status RangeSync(off_t offset, off_t nbytes) override { + IOSTATS_TIMER_GUARD(range_sync_nanos); if (sync_file_range(fd_, offset, nbytes, SYNC_FILE_RANGE_WRITE) == 0) { return Status::OK(); } else { @@ -933,7 +951,11 @@ class PosixRandomRWFile : public RandomRWFile { pending_fsync_ = true; while (left != 0) { - ssize_t done = pwrite(fd_, src, left, offset); + ssize_t done; + { + IOSTATS_TIMER_GUARD(write_nanos); + done = pwrite(fd_, src, left, offset); + } if (done < 0) { if (errno == EINTR) { continue; @@ -1009,6 +1031,7 @@ class PosixRandomRWFile : public RandomRWFile { #ifdef ROCKSDB_FALLOCATE_PRESENT virtual Status Allocate(off_t offset, off_t len) override { TEST_KILL_RANDOM(rocksdb_kill_odds); + IOSTATS_TIMER_GUARD(allocate_nanos); int alloc_status = fallocate( fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); if (alloc_status == 0) { @@ -1117,6 +1140,7 @@ class PosixEnv : public Env { result->reset(); FILE* f = nullptr; do { + IOSTATS_TIMER_GUARD(open_nanos); f = fopen(fname.c_str(), "r"); } while (f == nullptr && errno == EINTR); if (f == nullptr) { @@ -1135,7 +1159,11 @@ class PosixEnv : public Env { const EnvOptions& options) override { result->reset(); Status s; - int fd = open(fname.c_str(), O_RDONLY); + int fd; + { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), O_RDONLY); + } SetFD_CLOEXEC(fd, &options); if (fd < 0) { s = IOError(fname, errno); @@ -1168,6 +1196,7 @@ class PosixEnv : public Env { Status s; int fd = -1; do { + IOSTATS_TIMER_GUARD(open_nanos); fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); } while (fd < 0 && errno == EINTR); if (fd < 0) { @@ -1208,7 +1237,11 @@ class PosixEnv : public Env { return Status::NotSupported("No support for mmap read/write yet"); } Status s; - const int fd = open(fname.c_str(), O_CREAT | O_RDWR, 0644); + int fd; + { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), O_CREAT | O_RDWR, 0644); + } if (fd < 0) { s = IOError(fname, errno); } else { @@ -1221,7 +1254,11 @@ class PosixEnv : public Env { virtual Status NewDirectory(const std::string& name, unique_ptr* result) override { result->reset(); - const int fd = open(name.c_str(), 0); + int fd; + { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(name.c_str(), 0); + } if (fd < 0) { return IOError(name, errno); } else { @@ -1333,7 +1370,11 @@ class PosixEnv : public Env { virtual Status LockFile(const std::string& fname, FileLock** lock) override { *lock = nullptr; Status result; - int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); + int fd; + { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); + } if (fd < 0) { result = IOError(fname, errno); } else if (LockOrUnlock(fname, fd, true) == -1) { @@ -1408,7 +1449,11 @@ class PosixEnv : public Env { virtual Status NewLogger(const std::string& fname, shared_ptr* result) override { - FILE* f = fopen(fname.c_str(), "w"); + FILE* f; + { + IOSTATS_TIMER_GUARD(open_nanos); + f = fopen(fname.c_str(), "w"); + } if (f == nullptr) { result->reset(); return IOError(fname, errno); @@ -1782,7 +1827,7 @@ class PosixEnv : public Env { #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) #if __GLIBC_PREREQ(2, 12) char name_buf[16]; - snprintf(name_buf, sizeof name_buf, "rocksdb:bg%zu", bgthreads_.size()); + snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt, bgthreads_.size()); name_buf[sizeof name_buf - 1] = '\0'; pthread_setname_np(t, name_buf); #endif diff --git a/util/env_test.cc b/util/env_test.cc index b9cca22d3..931c95215 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -992,6 +992,87 @@ TEST_F(EnvPosixTest, Preallocation) { ASSERT_EQ(last_allocated_block, 7UL); } +// Test that all WritableFileWrapper forwards all calls to WritableFile. +TEST_F(EnvPosixTest, WritableFileWrapper) { + class Base : public WritableFile { + public: + mutable int *step_; + + void inc(int x) const { + EXPECT_EQ(x, (*step_)++); + } + + explicit Base(int* step) : step_(step) { + inc(0); + } + + Status Append(const Slice& data) override { inc(1); return Status::OK(); } + Status Close() override { inc(2); return Status::OK(); } + Status Flush() override { inc(3); return Status::OK(); } + Status Sync() override { inc(4); return Status::OK(); } + Status Fsync() override { inc(5); return Status::OK(); } + void SetIOPriority(Env::IOPriority pri) override { inc(6); } + uint64_t GetFileSize() override { inc(7); return 0; } + void GetPreallocationStatus(size_t* block_size, + size_t* last_allocated_block) override { + inc(8); + } + size_t GetUniqueId(char* id, size_t max_size) const override { + inc(9); + return 0; + } + Status InvalidateCache(size_t offset, size_t length) override { + inc(10); + return Status::OK(); + } + + protected: + Status Allocate(off_t offset, off_t len) override { + inc(11); + return Status::OK(); + } + Status RangeSync(off_t offset, off_t nbytes) override { + inc(12); + return Status::OK(); + } + + public: + ~Base() { + inc(13); + } + }; + + class Wrapper : public WritableFileWrapper { + public: + explicit Wrapper(WritableFile* target) : WritableFileWrapper(target) {} + + void CallProtectedMethods() { + Allocate(0, 0); + RangeSync(0, 0); + } + }; + + int step = 0; + + { + Base b(&step); + Wrapper w(&b); + w.Append(Slice()); + w.Close(); + w.Flush(); + w.Sync(); + w.Fsync(); + w.SetIOPriority(Env::IOPriority::IO_HIGH); + w.GetFileSize(); + w.GetPreallocationStatus(nullptr, nullptr); + w.GetUniqueId(nullptr, 0); + w.InvalidateCache(0, 0); + w.CallProtectedMethods(); + } + + EXPECT_EQ(14, step); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/util/options_test.cc b/util/options_test.cc index 9538653d3..3d8a84b99 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -98,6 +98,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"write_buffer_size", "1"}, {"max_write_buffer_number", "2"}, {"min_write_buffer_number_to_merge", "3"}, + {"max_write_buffer_number_to_maintain", "99"}, {"compression", "kSnappyCompression"}, {"compression_per_level", "kNoCompression:" @@ -184,6 +185,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.write_buffer_size, 1U); ASSERT_EQ(new_cf_opt.max_write_buffer_number, 2); ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3); + ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99); ASSERT_EQ(new_cf_opt.compression, kSnappyCompression); ASSERT_EQ(new_cf_opt.compression_per_level.size(), 6U); ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression); diff --git a/util/posix_logger.h b/util/posix_logger.h index 7bba30bd5..1d495973b 100644 --- a/util/posix_logger.h +++ b/util/posix_logger.h @@ -61,6 +61,8 @@ class PosixLogger : public Logger { using Logger::Logv; virtual void Logv(const char* format, va_list ap) override { + IOSTATS_TIMER_GUARD(logger_nanos); + const uint64_t thread_id = (*gettid_)(); // We try twice: the first time with a fixed-size stack allocated buffer, diff --git a/util/xfunc.cc b/util/xfunc.cc index c5d6b5afd..d80565247 100644 --- a/util/xfunc.cc +++ b/util/xfunc.cc @@ -7,7 +7,12 @@ #include #include "db/db_impl.h" #include "db/managed_iterator.h" +#include "db/write_callback.h" +#include "rocksdb/db.h" #include "rocksdb/options.h" +#include "rocksdb/utilities/optimistic_transaction.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/write_batch.h" #include "util/xfunc.h" @@ -64,6 +69,116 @@ void xf_manage_new(DBImpl* db, ReadOptions* read_options, void xf_manage_create(ManagedIterator* iter) { iter->SetDropOld(false); } +void xf_transaction_set_memtable_history( + int32_t* max_write_buffer_number_to_maintain) { + *max_write_buffer_number_to_maintain = 10; +} + +void xf_transaction_clear_memtable_history( + int32_t* max_write_buffer_number_to_maintain) { + *max_write_buffer_number_to_maintain = 0; +} + +class XFTransactionWriteHandler : public WriteBatch::Handler { + public: + OptimisticTransaction* txn_; + DBImpl* db_impl_; + + XFTransactionWriteHandler(OptimisticTransaction* txn, DBImpl* db_impl) + : txn_(txn), db_impl_(db_impl) {} + + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + InstrumentedMutexLock l(&db_impl_->mutex_); + + ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id); + if (cfh == nullptr) { + return Status::InvalidArgument( + "XFUNC test could not find column family " + "handle for id ", + ToString(column_family_id)); + } + + txn_->Put(cfh, key, value); + + return Status::OK(); + } + + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + InstrumentedMutexLock l(&db_impl_->mutex_); + + ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id); + if (cfh == nullptr) { + return Status::InvalidArgument( + "XFUNC test could not find column family " + "handle for id ", + ToString(column_family_id)); + } + + txn_->Merge(cfh, key, value); + + return Status::OK(); + } + + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + InstrumentedMutexLock l(&db_impl_->mutex_); + + ColumnFamilyHandle* cfh = db_impl_->GetColumnFamilyHandle(column_family_id); + if (cfh == nullptr) { + return Status::InvalidArgument( + "XFUNC test could not find column family " + "handle for id ", + ToString(column_family_id)); + } + + txn_->Delete(cfh, key); + + return Status::OK(); + } + + virtual void LogData(const Slice& blob) override { txn_->PutLogData(blob); } +}; + +// Whenever DBImpl::Write is called, create a transaction and do the write via +// the transaction. +void xf_transaction_write(const WriteOptions& write_options, + const DBOptions& db_options, WriteBatch* my_batch, + WriteCallback* callback, DBImpl* db_impl, Status* s, + bool* write_attempted) { + if (callback != nullptr) { + // We may already be in a transaction, don't force a transaction + *write_attempted = false; + return; + } + + OptimisticTransactionDB* txn_db = new OptimisticTransactionDB(db_impl); + OptimisticTransaction* txn = + OptimisticTransaction::BeginTransaction(txn_db, write_options); + + XFTransactionWriteHandler handler(txn, db_impl); + *s = my_batch->Iterate(&handler); + + if (!s->ok()) { + Log(InfoLogLevel::ERROR_LEVEL, db_options.info_log, + "XFUNC test could not iterate batch. status: $s\n", + s->ToString().c_str()); + } + + *s = txn->Commit(); + + if (!s->ok()) { + Log(InfoLogLevel::ERROR_LEVEL, db_options.info_log, + "XFUNC test could not commit transaction. status: $s\n", + s->ToString().c_str()); + } + + *write_attempted = true; + delete txn; + delete txn_db; +} + } // namespace rocksdb #endif // XFUNC diff --git a/util/xfunc.h b/util/xfunc.h index 78004cbe0..2b3b0e3ee 100644 --- a/util/xfunc.h +++ b/util/xfunc.h @@ -32,6 +32,7 @@ namespace rocksdb { #else struct Options; +struct WriteOptions; class ManagedIterator; class DBImpl; void GetXFTestOptions(Options* options, int skip_policy); @@ -40,6 +41,15 @@ void xf_manage_new(DBImpl* db, ReadOptions* readoptions, bool is_snapshot_supported); void xf_manage_create(ManagedIterator* iter); void xf_manage_options(ReadOptions* read_options); +void xf_transaction_set_memtable_history( + int32_t* max_write_buffer_number_to_maintain); +void xf_transaction_clear_memtable_history( + int32_t* max_write_buffer_number_to_maintain); +void xf_transaction_write(const WriteOptions& write_options, + const DBOptions& db_options, + class WriteBatch* my_batch, + class WriteCallback* callback, DBImpl* db_impl, + Status* success, bool* write_attempted); // This class provides the facility to run custom code to test a specific // feature typically with all existing unit tests. diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 3cfcc6ea9..6b61b56e4 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -328,7 +328,27 @@ class BackupEngineImpl : public BackupEngine { BackupRateLimiter* rate_limiter; uint64_t size_limit; std::promise result; + CopyWorkItem() {} + CopyWorkItem(const CopyWorkItem&) = delete; + CopyWorkItem& operator=(const CopyWorkItem&) = delete; + + CopyWorkItem(CopyWorkItem&& o) { + *this = std::move(o); + } + + CopyWorkItem& operator=(CopyWorkItem&& o) { + src_path = std::move(o.src_path); + dst_path = std::move(o.dst_path); + src_env = o.src_env; + dst_env = o.dst_env; + sync = o.sync; + rate_limiter = o.rate_limiter; + size_limit = o.size_limit; + result = std::move(o.result); + return *this; + } + CopyWorkItem(std::string _src_path, std::string _dst_path, Env* _src_env, @@ -354,7 +374,23 @@ class BackupEngineImpl : public BackupEngine { std::string dst_path; std::string dst_relative; BackupAfterCopyWorkItem() {} - BackupAfterCopyWorkItem(std::future _result, + + BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) { + *this = std::move(o); + } + + BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) { + result = std::move(o.result); + shared = o.shared; + needed_to_copy = o.needed_to_copy; + backup_env = o.backup_env; + dst_path_tmp = std::move(o.dst_path_tmp); + dst_path = std::move(o.dst_path); + dst_relative = std::move(o.dst_relative); + return *this; + } + + BackupAfterCopyWorkItem(std::future&& _result, bool _shared, bool _needed_to_copy, Env* _backup_env, @@ -374,10 +410,19 @@ class BackupEngineImpl : public BackupEngine { std::future result; uint32_t checksum_value; RestoreAfterCopyWorkItem() {} - RestoreAfterCopyWorkItem(std::future _result, + RestoreAfterCopyWorkItem(std::future&& _result, uint32_t _checksum_value) : result(std::move(_result)), checksum_value(_checksum_value) {} + RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) { + *this = std::move(o); + } + + RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) { + result = std::move(o.result); + checksum_value = o.checksum_value; + return *this; + } }; channel files_to_copy_;