Explictly fail when memtable doesn't support concurrent insert

Summary: If users turn on concurrent insert but the memtable doesn't support it, they might see unexcepted crash. Fix it by explicitly fail.

Test Plan:
Run different setting of stress_test and make sure it fails correctly.
Will add a unit test too.

Reviewers: anthony, kradhakrishnan, IslamAbdelRahman, yhchiang, andrewkr, ngbronson

Reviewed By: ngbronson

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D53895
main
sdong 9 years ago
parent 8ed3438778
commit b1887c5dd9
  1. 4
      db/column_family.cc
  2. 22
      db/db_test.cc
  3. 10
      include/rocksdb/memtablerep.h
  4. 2
      memtable/skiplistrep.cc
  5. 2
      tools/db_crashtest.py

@ -135,6 +135,10 @@ Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
"Delete filtering (filter_deletes) is not compatible with concurrent " "Delete filtering (filter_deletes) is not compatible with concurrent "
"memtable writes (allow_concurrent_memtable_writes)"); "memtable writes (allow_concurrent_memtable_writes)");
} }
if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
return Status::InvalidArgument(
"Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
}
return Status::OK(); return Status::OK();
} }

@ -5736,8 +5736,30 @@ TEST_F(DBTest, TableOptionsSanitizeTest) {
options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.prefix_extractor.reset(NewFixedPrefixTransform(1));
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBTest, ConcurrentMemtableNotSupported) {
Options options = CurrentOptions();
options.allow_concurrent_memtable_write = true;
options.soft_pending_compaction_bytes_limit = 0;
options.hard_pending_compaction_bytes_limit = 100;
options.create_if_missing = true;
DestroyDB(dbname_, options);
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true, 4));
ASSERT_NOK(TryReopen(options));
options.memtable_factory.reset(new SkipListFactory);
ASSERT_OK(TryReopen(options));
ColumnFamilyOptions cf_options(options);
cf_options.memtable_factory.reset(
NewHashLinkListRepFactory(4, 0, 3, true, 4));
ColumnFamilyHandle* handle;
ASSERT_NOK(db_->CreateColumnFamily(cf_options, "name", &handle));
}
TEST_F(DBTest, SanitizeNumThreads) { TEST_F(DBTest, SanitizeNumThreads) {
for (int attempt = 0; attempt < 2; attempt++) { for (int attempt = 0; attempt < 2; attempt++) {
const size_t kTotalTasks = 8; const size_t kTotalTasks = 8;

@ -188,10 +188,6 @@ class MemTableRep {
// Default: true // Default: true
virtual bool IsSnapshotSupported() const { return true; } virtual bool IsSnapshotSupported() const { return true; }
// Return true if the current MemTableRep supports concurrent inserts
// Default: false
virtual bool IsInsertConcurrentlySupported() const { return false; }
protected: protected:
// When *key is an internal key concatenated with the value, returns the // When *key is an internal key concatenated with the value, returns the
// user key. // user key.
@ -210,6 +206,10 @@ class MemTableRepFactory {
const SliceTransform*, const SliceTransform*,
Logger* logger) = 0; Logger* logger) = 0;
virtual const char* Name() const = 0; virtual const char* Name() const = 0;
// Return true if the current MemTableRep supports concurrent inserts
// Default: false
virtual bool IsInsertConcurrentlySupported() const { return false; }
}; };
// This uses a skip list to store keys. It is the default. // This uses a skip list to store keys. It is the default.
@ -229,6 +229,8 @@ class SkipListFactory : public MemTableRepFactory {
Logger* logger) override; Logger* logger) override;
virtual const char* Name() const override { return "SkipListFactory"; } virtual const char* Name() const override { return "SkipListFactory"; }
bool IsInsertConcurrentlySupported() const override { return true; }
private: private:
const size_t lookahead_; const size_t lookahead_;
}; };

@ -25,8 +25,6 @@ public:
transform_(transform), lookahead_(lookahead) { transform_(transform), lookahead_(lookahead) {
} }
virtual bool IsInsertConcurrentlySupported() const override { return true; }
virtual KeyHandle Allocate(const size_t len, char** buf) override { virtual KeyHandle Allocate(const size_t len, char** buf) override {
*buf = skip_list_.AllocateKey(len); *buf = skip_list_.AllocateKey(len);
return static_cast<KeyHandle>(*buf); return static_cast<KeyHandle>(*buf);

@ -24,7 +24,7 @@ default_params = {
"disable_data_sync": 0, "disable_data_sync": 0,
"disable_wal": 0, "disable_wal": 0,
"filter_deletes": lambda: random.randint(0, 1), "filter_deletes": lambda: random.randint(0, 1),
"allow_concurrent_memtable_write": lambda: random.randint(0, 1), "allow_concurrent_memtable_write": 0,
"iterpercent": 10, "iterpercent": 10,
"max_background_compactions": 20, "max_background_compactions": 20,
"max_bytes_for_level_base": 10485760, "max_bytes_for_level_base": 10485760,

Loading…
Cancel
Save