diff --git a/CMakeLists.txt b/CMakeLists.txt index 20d9fdfab..80cf032fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -382,12 +382,13 @@ set(SOURCES env/env_chroot.cc env/env_hdfs.cc env/mock_env.cc + memtable/alloc_tracker.cc memtable/hash_cuckoo_rep.cc memtable/hash_linklist_rep.cc memtable/hash_skiplist_rep.cc - memtable/memtable_allocator.cc memtable/skiplistrep.cc memtable/vectorrep.cc + memtable/write_buffer_manager.cc monitoring/histogram.cc monitoring/histogram_windowing.cc monitoring/instrumented_mutex.cc @@ -666,6 +667,7 @@ set(TESTS env/mock_env_test.cc memtable/inlineskiplist_test.cc memtable/skiplist_test.cc + memtable/write_buffer_manager_test.cc monitoring/histogram_test.cc monitoring/iostats_context_test.cc monitoring/statistics_test.cc diff --git a/HISTORY.md b/HISTORY.md index f647a7b2b..c40d10e20 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,8 @@ ### New Features * Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads. +* Users can pass a cache object to write buffer manager, so that they can cap memory usage for memtable and block cache using one single limit. +* Flush will be triggered when 7/8 of the limit introduced by write_buffer_manager or db_write_buffer_size is triggered, so that the hard threshold is hard to hit. ## 5.5.0 (05/17/2017) ### New Features diff --git a/Makefile b/Makefile index 0af4af9c2..1394910c9 100644 --- a/Makefile +++ b/Makefile @@ -399,6 +399,7 @@ TESTS = \ external_sst_file_test \ prefix_test \ skiplist_test \ + write_buffer_manager_test \ stringappend_test \ ttl_test \ date_tiered_test \ @@ -1222,6 +1223,9 @@ inlineskiplist_test: memtable/inlineskiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) skiplist_test: memtable/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +write_buffer_manager_test: memtable/write_buffer_manager_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 510b86852..36437006b 100644 --- a/TARGETS +++ b/TARGETS @@ -1,950 +1,468 @@ REPO_PATH = "internal_repo_rocksdb/repo/" - BUCK_BINS = "buck-out/gen/" + REPO_PATH - TEST_RUNNER = REPO_PATH + "buckifier/rocks_test_runner.sh" - rocksdb_compiler_flags = [ - "-msse", - "-msse4.2", - "-fno-builtin-memcmp", - "-DROCKSDB_PLATFORM_POSIX", - "-DROCKSDB_LIB_IO_POSIX", - "-DROCKSDB_FALLOCATE_PRESENT", - "-DROCKSDB_MALLOC_USABLE_SIZE", - "-DROCKSDB_RANGESYNC_PRESENT", - "-DROCKSDB_SCHED_GETCPU_PRESENT", - "-DROCKSDB_SUPPORT_THREAD_LOCAL", - "-DOS_LINUX", - # Flags to enable libs we include - "-DSNAPPY", - "-DZLIB", - "-DBZIP2", - "-DLZ4", - "-DZSTD", - "-DGFLAGS=gflags", - "-DNUMA", - "-DTBB", - # Needed to compile in fbcode - "-Wno-expansion-to-defined", + "-msse", + "-msse4.2", + "-fno-builtin-memcmp", + "-DROCKSDB_PLATFORM_POSIX", + "-DROCKSDB_LIB_IO_POSIX", + "-DROCKSDB_FALLOCATE_PRESENT", + "-DROCKSDB_MALLOC_USABLE_SIZE", + "-DROCKSDB_RANGESYNC_PRESENT", + "-DROCKSDB_SCHED_GETCPU_PRESENT", + "-DROCKSDB_SUPPORT_THREAD_LOCAL", + "-DOS_LINUX", + # Flags to enable libs we include + "-DSNAPPY", + "-DZLIB", + "-DBZIP2", + "-DLZ4", + "-DZSTD", + "-DGFLAGS=gflags", + "-DNUMA", + "-DTBB", + # Needed to compile in fbcode + "-Wno-expansion-to-defined", ] rocksdb_external_deps = [ - ("bzip2", None, "bz2"), - ("snappy", None, "snappy"), - ("zlib", None, "z"), - ("gflags", None, "gflags"), - ("lz4", None, "lz4"), - ("zstd", None), - ("tbb", None), - ("numa", "2.0.8", "numa"), - ("googletest", None, "gtest"), + ('bzip2', None, 'bz2'), + ('snappy', None, "snappy"), + ('zlib', None, 'z'), + ('gflags', None, 'gflags'), + ('lz4', None, 'lz4'), + ('zstd', None), + ('tbb', None), + ("numa", "2.0.8", "numa"), + ("googletest", None, "gtest"), ] rocksdb_preprocessor_flags = [ - # Directories with files for #include - "-I" + REPO_PATH + "include/", - "-I" + REPO_PATH, + # Directories with files for #include + "-I" + REPO_PATH + "include/", + "-I" + REPO_PATH, ] cpp_library( name = "rocksdb_lib", + headers = AutoHeaders.RECURSIVE_GLOB, srcs = [ - "cache/clock_cache.cc", - "cache/lru_cache.cc", - "cache/sharded_cache.cc", - "db/builder.cc", - "db/c.cc", - "db/column_family.cc", - "db/compacted_db_impl.cc", - "db/compaction.cc", - "db/compaction_iterator.cc", - "db/compaction_job.cc", - "db/compaction_picker.cc", - "db/compaction_picker_universal.cc", - "db/convenience.cc", - "db/db_filesnapshot.cc", - "db/db_impl.cc", - "db/db_impl_compaction_flush.cc", - "db/db_impl_debug.cc", - "db/db_impl_experimental.cc", - "db/db_impl_files.cc", - "db/db_impl_open.cc", - "db/db_impl_readonly.cc", - "db/db_impl_write.cc", - "db/db_info_dumper.cc", - "db/db_iter.cc", - "db/dbformat.cc", - "db/event_helpers.cc", - "db/experimental.cc", - "db/external_sst_file_ingestion_job.cc", - "db/file_indexer.cc", - "db/flush_job.cc", - "db/flush_scheduler.cc", - "db/forward_iterator.cc", - "db/internal_stats.cc", - "db/log_reader.cc", - "db/log_writer.cc", - "db/malloc_stats.cc", - "db/managed_iterator.cc", - "db/memtable.cc", - "db/memtable_list.cc", - "db/merge_helper.cc", - "db/merge_operator.cc", - "db/range_del_aggregator.cc", - "db/repair.cc", - "db/snapshot_impl.cc", - "db/table_cache.cc", - "db/table_properties_collector.cc", - "db/transaction_log_impl.cc", - "db/version_builder.cc", - "db/version_edit.cc", - "db/version_set.cc", - "db/wal_manager.cc", - "db/write_batch.cc", - "db/write_batch_base.cc", - "db/write_controller.cc", - "db/write_thread.cc", - "env/env.cc", - "env/env_chroot.cc", - "env/env_hdfs.cc", - "env/env_posix.cc", - "env/io_posix.cc", - "env/mock_env.cc", - "memtable/hash_cuckoo_rep.cc", - "memtable/hash_linklist_rep.cc", - "memtable/hash_skiplist_rep.cc", - "memtable/memtable_allocator.cc", - "memtable/skiplistrep.cc", - "memtable/vectorrep.cc", - "monitoring/histogram.cc", - "monitoring/histogram_windowing.cc", - "monitoring/instrumented_mutex.cc", - "monitoring/iostats_context.cc", - "monitoring/perf_context.cc", - "monitoring/perf_level.cc", - "monitoring/statistics.cc", - "monitoring/thread_status_impl.cc", - "monitoring/thread_status_updater.cc", - "monitoring/thread_status_updater_debug.cc", - "monitoring/thread_status_util.cc", - "monitoring/thread_status_util_debug.cc", - "options/cf_options.cc", - "options/db_options.cc", - "options/options.cc", - "options/options_helper.cc", - "options/options_parser.cc", - "options/options_sanity_check.cc", - "port/port_posix.cc", - "port/stack_trace.cc", - "table/adaptive_table_factory.cc", - "table/block.cc", - "table/block_based_filter_block.cc", - "table/block_based_table_builder.cc", - "table/block_based_table_factory.cc", - "table/block_based_table_reader.cc", - "table/block_builder.cc", - "table/block_prefix_index.cc", - "table/bloom_block.cc", - "table/cuckoo_table_builder.cc", - "table/cuckoo_table_factory.cc", - "table/cuckoo_table_reader.cc", - "table/flush_block_policy.cc", - "table/format.cc", - "table/full_filter_block.cc", - "table/get_context.cc", - "table/index_builder.cc", - "table/iterator.cc", - "table/merging_iterator.cc", - "table/meta_blocks.cc", - "table/partitioned_filter_block.cc", - "table/persistent_cache_helper.cc", - "table/plain_table_builder.cc", - "table/plain_table_factory.cc", - "table/plain_table_index.cc", - "table/plain_table_key_coding.cc", - "table/plain_table_reader.cc", - "table/sst_file_writer.cc", - "table/table_properties.cc", - "table/two_level_iterator.cc", - "tools/dump/db_dump_tool.cc", - "tools/ldb_cmd.cc", - "tools/ldb_tool.cc", - "tools/sst_dump_tool.cc", - "util/arena.cc", - "util/auto_roll_logger.cc", - "util/bloom.cc", - "util/build_version.cc", - "util/coding.cc", - "util/compaction_job_stats_impl.cc", - "util/comparator.cc", - "util/concurrent_arena.cc", - "util/crc32c.cc", - "util/delete_scheduler.cc", - "util/dynamic_bloom.cc", - "util/event_logger.cc", - "util/file_reader_writer.cc", - "util/file_util.cc", - "util/filename.cc", - "util/filter_policy.cc", - "util/hash.cc", - "util/log_buffer.cc", - "util/murmurhash.cc", - "util/random.cc", - "util/rate_limiter.cc", - "util/slice.cc", - "util/sst_file_manager_impl.cc", - "util/status.cc", - "util/status_message.cc", - "util/string_util.cc", - "util/sync_point.cc", - "util/thread_local.cc", - "util/threadpool_imp.cc", - "util/transaction_test_util.cc", - "util/xxhash.cc", - "utilities/backupable/backupable_db.cc", - "utilities/blob_db/blob_db.cc", - "utilities/blob_db/blob_db_impl.cc", - "utilities/blob_db/blob_db_options_impl.cc", - "utilities/blob_db/blob_dump_tool.cc", - "utilities/blob_db/blob_file.cc", - "utilities/blob_db/blob_log_format.cc", - "utilities/blob_db/blob_log_reader.cc", - "utilities/blob_db/blob_log_writer.cc", - "utilities/checkpoint/checkpoint_impl.cc", - "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", - "utilities/convenience/info_log_finder.cc", - "utilities/date_tiered/date_tiered_db_impl.cc", - "utilities/debug.cc", - "utilities/document/document_db.cc", - "utilities/document/json_document.cc", - "utilities/document/json_document_builder.cc", - "utilities/env_mirror.cc", - "utilities/env_timed.cc", - "utilities/geodb/geodb_impl.cc", - "utilities/leveldb_options/leveldb_options.cc", - "utilities/lua/rocks_lua_compaction_filter.cc", - "utilities/memory/memory_util.cc", - "utilities/merge_operators/max.cc", - "utilities/merge_operators/put.cc", - "utilities/merge_operators/string_append/stringappend.cc", - "utilities/merge_operators/string_append/stringappend2.cc", - "utilities/merge_operators/uint64add.cc", - "utilities/option_change_migration/option_change_migration.cc", - "utilities/options/options_util.cc", - "utilities/persistent_cache/block_cache_tier.cc", - "utilities/persistent_cache/block_cache_tier_file.cc", - "utilities/persistent_cache/block_cache_tier_metadata.cc", - "utilities/persistent_cache/persistent_cache_tier.cc", - "utilities/persistent_cache/volatile_tier_impl.cc", - "utilities/redis/redis_lists.cc", - "utilities/simulator_cache/sim_cache.cc", - "utilities/spatialdb/spatial_db.cc", - "utilities/table_properties_collectors/compact_on_deletion_collector.cc", - "utilities/transactions/optimistic_transaction_db_impl.cc", - "utilities/transactions/optimistic_transaction_impl.cc", - "utilities/transactions/transaction_base.cc", - "utilities/transactions/transaction_db_impl.cc", - "utilities/transactions/transaction_db_mutex_impl.cc", - "utilities/transactions/transaction_impl.cc", - "utilities/transactions/transaction_lock_mgr.cc", - "utilities/transactions/transaction_util.cc", - "utilities/ttl/db_ttl_impl.cc", - "utilities/write_batch_with_index/write_batch_with_index.cc", - "utilities/write_batch_with_index/write_batch_with_index_internal.cc", + "cache/clock_cache.cc", + "cache/lru_cache.cc", + "cache/sharded_cache.cc", + "db/builder.cc", + "db/c.cc", + "db/column_family.cc", + "db/compacted_db_impl.cc", + "db/compaction.cc", + "db/compaction_iterator.cc", + "db/compaction_job.cc", + "db/compaction_picker.cc", + "db/compaction_picker_universal.cc", + "db/convenience.cc", + "db/db_filesnapshot.cc", + "db/db_impl.cc", + "db/db_impl_write.cc", + "db/db_impl_compaction_flush.cc", + "db/db_impl_files.cc", + "db/db_impl_open.cc", + "db/db_impl_debug.cc", + "db/db_impl_experimental.cc", + "db/db_impl_readonly.cc", + "db/db_info_dumper.cc", + "db/db_iter.cc", + "db/dbformat.cc", + "db/event_helpers.cc", + "db/experimental.cc", + "db/external_sst_file_ingestion_job.cc", + "db/file_indexer.cc", + "db/flush_job.cc", + "db/flush_scheduler.cc", + "db/forward_iterator.cc", + "db/internal_stats.cc", + "db/log_reader.cc", + "db/log_writer.cc", + "db/malloc_stats.cc", + "db/managed_iterator.cc", + "db/memtable.cc", + "db/memtable_list.cc", + "db/merge_helper.cc", + "db/merge_operator.cc", + "db/range_del_aggregator.cc", + "db/repair.cc", + "db/snapshot_impl.cc", + "db/table_cache.cc", + "db/table_properties_collector.cc", + "db/transaction_log_impl.cc", + "db/version_builder.cc", + "db/version_edit.cc", + "db/version_set.cc", + "db/wal_manager.cc", + "db/write_batch.cc", + "db/write_batch_base.cc", + "db/write_controller.cc", + "db/write_thread.cc", + "env/env.cc", + "env/env_chroot.cc", + "env/env_hdfs.cc", + "env/env_posix.cc", + "env/io_posix.cc", + "env/mock_env.cc", + "memtable/alloc_tracker.cc", + "memtable/hash_cuckoo_rep.cc", + "memtable/hash_linklist_rep.cc", + "memtable/hash_skiplist_rep.cc", + "memtable/skiplistrep.cc", + "memtable/vectorrep.cc", + "memtable/write_buffer_manager.cc", + "monitoring/histogram.cc", + "monitoring/histogram_windowing.cc", + "monitoring/instrumented_mutex.cc", + "monitoring/iostats_context.cc", + "monitoring/perf_context.cc", + "monitoring/perf_level.cc", + "monitoring/statistics.cc", + "monitoring/thread_status_impl.cc", + "monitoring/thread_status_updater.cc", + "monitoring/thread_status_updater_debug.cc", + "monitoring/thread_status_util.cc", + "monitoring/thread_status_util_debug.cc", + "options/cf_options.cc", + "options/db_options.cc", + "options/options.cc", + "options/options_helper.cc", + "options/options_parser.cc", + "options/options_sanity_check.cc", + "port/port_posix.cc", + "port/stack_trace.cc", + "table/adaptive_table_factory.cc", + "table/block.cc", + "table/block_based_filter_block.cc", + "table/block_based_table_builder.cc", + "table/block_based_table_factory.cc", + "table/block_based_table_reader.cc", + "table/block_builder.cc", + "table/block_prefix_index.cc", + "table/bloom_block.cc", + "table/cuckoo_table_builder.cc", + "table/cuckoo_table_factory.cc", + "table/cuckoo_table_reader.cc", + "table/flush_block_policy.cc", + "table/format.cc", + "table/full_filter_block.cc", + "table/get_context.cc", + "table/index_builder.cc", + "table/iterator.cc", + "table/merging_iterator.cc", + "table/meta_blocks.cc", + "table/partitioned_filter_block.cc", + "table/persistent_cache_helper.cc", + "table/plain_table_builder.cc", + "table/plain_table_factory.cc", + "table/plain_table_index.cc", + "table/plain_table_key_coding.cc", + "table/plain_table_reader.cc", + "table/sst_file_writer.cc", + "table/table_properties.cc", + "table/two_level_iterator.cc", + "tools/dump/db_dump_tool.cc", + "util/arena.cc", + "util/auto_roll_logger.cc", + "util/bloom.cc", + "util/build_version.cc", + "util/coding.cc", + "util/compaction_job_stats_impl.cc", + "util/comparator.cc", + "util/concurrent_arena.cc", + "util/crc32c.cc", + "util/delete_scheduler.cc", + "util/dynamic_bloom.cc", + "util/event_logger.cc", + "util/file_reader_writer.cc", + "util/file_util.cc", + "util/filename.cc", + "util/filter_policy.cc", + "util/hash.cc", + "util/log_buffer.cc", + "util/murmurhash.cc", + "util/random.cc", + "util/rate_limiter.cc", + "util/slice.cc", + "util/sst_file_manager_impl.cc", + "util/status.cc", + "util/status_message.cc", + "util/string_util.cc", + "util/sync_point.cc", + "util/thread_local.cc", + "util/threadpool_imp.cc", + "util/transaction_test_util.cc", + "util/xxhash.cc", + "utilities/backupable/backupable_db.cc", + "utilities/blob_db/blob_db.cc", + "utilities/blob_db/blob_db_impl.cc", + "utilities/blob_db/blob_db_options_impl.cc", + "utilities/blob_db/blob_file.cc", + "utilities/blob_db/blob_log_reader.cc", + "utilities/blob_db/blob_log_writer.cc", + "utilities/blob_db/blob_log_format.cc", + "utilities/checkpoint/checkpoint_impl.cc", + "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", + "utilities/convenience/info_log_finder.cc", + "utilities/date_tiered/date_tiered_db_impl.cc", + "utilities/debug.cc", + "utilities/document/document_db.cc", + "utilities/document/json_document.cc", + "utilities/document/json_document_builder.cc", + "utilities/env_mirror.cc", + "utilities/env_timed.cc", + "utilities/geodb/geodb_impl.cc", + "utilities/leveldb_options/leveldb_options.cc", + "utilities/lua/rocks_lua_compaction_filter.cc", + "utilities/memory/memory_util.cc", + "utilities/merge_operators/max.cc", + "utilities/merge_operators/put.cc", + "utilities/merge_operators/string_append/stringappend.cc", + "utilities/merge_operators/string_append/stringappend2.cc", + "utilities/merge_operators/uint64add.cc", + "utilities/option_change_migration/option_change_migration.cc", + "utilities/options/options_util.cc", + "utilities/persistent_cache/block_cache_tier.cc", + "utilities/persistent_cache/block_cache_tier_file.cc", + "utilities/persistent_cache/block_cache_tier_metadata.cc", + "utilities/persistent_cache/persistent_cache_tier.cc", + "utilities/persistent_cache/volatile_tier_impl.cc", + "utilities/redis/redis_lists.cc", + "utilities/simulator_cache/sim_cache.cc", + "utilities/spatialdb/spatial_db.cc", + "utilities/table_properties_collectors/compact_on_deletion_collector.cc", + "utilities/transactions/optimistic_transaction_db_impl.cc", + "utilities/transactions/optimistic_transaction_impl.cc", + "utilities/transactions/transaction_base.cc", + "utilities/transactions/transaction_db_impl.cc", + "utilities/transactions/transaction_db_mutex_impl.cc", + "utilities/transactions/transaction_impl.cc", + "utilities/transactions/transaction_lock_mgr.cc", + "utilities/transactions/transaction_util.cc", + "utilities/ttl/db_ttl_impl.cc", + "utilities/write_batch_with_index/write_batch_with_index.cc", + "utilities/write_batch_with_index/write_batch_with_index_internal.cc", + "tools/ldb_cmd.cc", + "tools/ldb_tool.cc", + "tools/sst_dump_tool.cc", + "utilities/blob_db/blob_dump_tool.cc", ], - headers = AutoHeaders.RECURSIVE_GLOB, - compiler_flags = rocksdb_compiler_flags, - preprocessor_flags = rocksdb_preprocessor_flags, deps = [], + preprocessor_flags = rocksdb_preprocessor_flags, + compiler_flags = rocksdb_compiler_flags, external_deps = rocksdb_external_deps, ) cpp_library( name = "rocksdb_test_lib", + headers = AutoHeaders.RECURSIVE_GLOB, srcs = [ - "db/db_test_util.cc", - "table/mock_table.cc", - "util/fault_injection_test_env.cc", - "util/testharness.cc", - "util/testutil.cc", - "utilities/col_buf_decoder.cc", - "utilities/col_buf_encoder.cc", - "utilities/column_aware_encoding_util.cc", + "table/mock_table.cc", + "util/fault_injection_test_env.cc", + "util/testharness.cc", + "util/testutil.cc", + "db/db_test_util.cc", + "utilities/col_buf_encoder.cc", + "utilities/col_buf_decoder.cc", + "utilities/column_aware_encoding_util.cc", ], - headers = AutoHeaders.RECURSIVE_GLOB, - compiler_flags = rocksdb_compiler_flags, - preprocessor_flags = rocksdb_preprocessor_flags, deps = [":rocksdb_lib"], + preprocessor_flags = rocksdb_preprocessor_flags, + compiler_flags = rocksdb_compiler_flags, external_deps = rocksdb_external_deps, ) cpp_library( name = "rocksdb_tools_lib", + headers = AutoHeaders.RECURSIVE_GLOB, srcs = [ - "tools/db_bench_tool.cc", - "util/testutil.cc", + "tools/db_bench_tool.cc", + "util/testutil.cc", ], - headers = AutoHeaders.RECURSIVE_GLOB, - compiler_flags = rocksdb_compiler_flags, - preprocessor_flags = rocksdb_preprocessor_flags, deps = [":rocksdb_lib"], + preprocessor_flags = rocksdb_preprocessor_flags, + compiler_flags = rocksdb_compiler_flags, external_deps = rocksdb_external_deps, ) cpp_library( name = "env_basic_test_lib", - srcs = ["env/env_basic_test.cc"], headers = AutoHeaders.RECURSIVE_GLOB, - compiler_flags = rocksdb_compiler_flags, - preprocessor_flags = rocksdb_preprocessor_flags, + srcs = ["env/env_basic_test.cc"], deps = [":rocksdb_test_lib"], + preprocessor_flags = rocksdb_preprocessor_flags, + compiler_flags = rocksdb_compiler_flags, external_deps = rocksdb_external_deps, ) # [test_name, test_src, test_type] -ROCKS_TESTS = [ - [ - "merger_test", - "table/merger_test.cc", - "serial", - ], - [ - "cache_test", - "cache/cache_test.cc", - "serial", - ], - [ - "options_file_test", - "db/options_file_test.cc", - "serial", - ], - [ - "compaction_picker_test", - "db/compaction_picker_test.cc", - "serial", - ], - [ - "db_log_iter_test", - "db/db_log_iter_test.cc", - "serial", - ], - [ - "thread_list_test", - "util/thread_list_test.cc", - "serial", - ], - [ - "table_properties_collector_test", - "db/table_properties_collector_test.cc", - "serial", - ], - [ - "document_db_test", - "utilities/document/document_db_test.cc", - "serial", - ], - [ - "event_logger_test", - "util/event_logger_test.cc", - "serial", - ], - [ - "coding_test", - "util/coding_test.cc", - "serial", - ], - [ - "statistics_test", - "monitoring/statistics_test.cc", - "serial", - ], - [ - "options_settable_test", - "options/options_settable_test.cc", - "serial", - ], - [ - "sst_dump_test", - "tools/sst_dump_test.cc", - "serial", - ], - [ - "column_aware_encoding_test", - "utilities/column_aware_encoding_test.cc", - "serial", - ], - [ - "db_iterator_test", - "db/db_iterator_test.cc", - "serial", - ], - [ - "db_sst_test", - "db/db_sst_test.cc", - "parallel", - ], - [ - "geodb_test", - "utilities/geodb/geodb_test.cc", - "serial", - ], - [ - "listener_test", - "db/listener_test.cc", - "serial", - ], - [ - "write_callback_test", - "db/write_callback_test.cc", - "serial", - ], - [ - "version_set_test", - "db/version_set_test.cc", - "serial", - ], - [ - "full_filter_block_test", - "table/full_filter_block_test.cc", - "serial", - ], - [ - "cleanable_test", - "table/cleanable_test.cc", - "serial", - ], - [ - "checkpoint_test", - "utilities/checkpoint/checkpoint_test.cc", - "serial", - ], - [ - "compact_files_test", - "db/compact_files_test.cc", - "serial", - ], - [ - "db_options_test", - "db/db_options_test.cc", - "serial", - ], - [ - "object_registry_test", - "utilities/object_registry_test.cc", - "serial", - ], - [ - "auto_roll_logger_test", - "util/auto_roll_logger_test.cc", - "serial", - ], - [ - "dbformat_test", - "db/dbformat_test.cc", - "serial", - ], - [ - "db_write_test", - "db/db_write_test.cc", - "serial", - ], - [ - "write_batch_with_index_test", - "utilities/write_batch_with_index/write_batch_with_index_test.cc", - "serial", - ], - [ - "json_document_test", - "utilities/document/json_document_test.cc", - "serial", - ], - [ - "file_reader_writer_test", - "util/file_reader_writer_test.cc", - "serial", - ], - [ - "repair_test", - "db/repair_test.cc", - "serial", - ], - [ - "persistent_cache_test", - "utilities/persistent_cache/persistent_cache_test.cc", - "parallel", - ], - [ - "db_bloom_filter_test", - "db/db_bloom_filter_test.cc", - "serial", - ], - [ - "external_sst_file_basic_test", - "db/external_sst_file_basic_test.cc", - "serial", - ], - [ - "options_test", - "options/options_test.cc", - "serial", - ], - [ - "perf_context_test", - "db/perf_context_test.cc", - "serial", - ], - [ - "db_block_cache_test", - "db/db_block_cache_test.cc", - "serial", - ], - [ - "heap_test", - "util/heap_test.cc", - "serial", - ], - [ - "db_test2", - "db/db_test2.cc", - "serial", - ], - [ - "filelock_test", - "util/filelock_test.cc", - "serial", - ], - [ - "write_controller_test", - "db/write_controller_test.cc", - "serial", - ], - [ - "compaction_iterator_test", - "db/compaction_iterator_test.cc", - "serial", - ], - [ - "spatial_db_test", - "utilities/spatialdb/spatial_db_test.cc", - "serial", - ], - [ - "c_test", - "db/c_test.c", - "serial", - ], - [ - "range_del_aggregator_test", - "db/range_del_aggregator_test.cc", - "serial", - ], - [ - "date_tiered_test", - "utilities/date_tiered/date_tiered_test.cc", - "serial", - ], - [ - "ldb_cmd_test", - "tools/ldb_cmd_test.cc", - "serial", - ], - [ - "db_test", - "db/db_test.cc", - "parallel", - ], - [ - "block_based_filter_block_test", - "table/block_based_filter_block_test.cc", - "serial", - ], - [ - "merge_test", - "db/merge_test.cc", - "serial", - ], - [ - "bloom_test", - "util/bloom_test.cc", - "serial", - ], - [ - "block_test", - "table/block_test.cc", - "serial", - ], - [ - "cuckoo_table_builder_test", - "table/cuckoo_table_builder_test.cc", - "serial", - ], - [ - "backupable_db_test", - "utilities/backupable/backupable_db_test.cc", - "parallel", - ], - [ - "db_flush_test", - "db/db_flush_test.cc", - "serial", - ], - [ - "filename_test", - "db/filename_test.cc", - "serial", - ], - [ - "cuckoo_table_reader_test", - "table/cuckoo_table_reader_test.cc", - "serial", - ], - [ - "slice_transform_test", - "util/slice_transform_test.cc", - "serial", - ], - [ - "cuckoo_table_db_test", - "db/cuckoo_table_db_test.cc", - "serial", - ], - [ - "inlineskiplist_test", - "memtable/inlineskiplist_test.cc", - "parallel", - ], - [ - "optimistic_transaction_test", - "utilities/transactions/optimistic_transaction_test.cc", - "serial", - ], - [ - "hash_table_test", - "utilities/persistent_cache/hash_table_test.cc", - "serial", - ], - [ - "db_dynamic_level_test", - "db/db_dynamic_level_test.cc", - "serial", - ], - [ - "option_change_migration_test", - "utilities/option_change_migration/option_change_migration_test.cc", - "serial", - ], - [ - "db_inplace_update_test", - "db/db_inplace_update_test.cc", - "serial", - ], - [ - "autovector_test", - "util/autovector_test.cc", - "serial", - ], - [ - "db_iter_test", - "db/db_iter_test.cc", - "serial", - ], - [ - "flush_job_test", - "db/flush_job_test.cc", - "serial", - ], - [ - "wal_manager_test", - "db/wal_manager_test.cc", - "serial", - ], - [ - "write_batch_test", - "db/write_batch_test.cc", - "serial", - ], - [ - "crc32c_test", - "util/crc32c_test.cc", - "serial", - ], - [ - "rate_limiter_test", - "util/rate_limiter_test.cc", - "serial", - ], - [ - "external_sst_file_test", - "db/external_sst_file_test.cc", - "parallel", - ], - [ - "compaction_job_test", - "db/compaction_job_test.cc", - "serial", - ], - [ - "mock_env_test", - "env/mock_env_test.cc", - "serial", - ], - [ - "db_table_properties_test", - "db/db_table_properties_test.cc", - "serial", - ], - [ - "db_compaction_test", - "db/db_compaction_test.cc", - "parallel", - ], - [ - "arena_test", - "util/arena_test.cc", - "serial", - ], - [ - "stringappend_test", - "utilities/merge_operators/string_append/stringappend_test.cc", - "serial", - ], - [ - "reduce_levels_test", - "tools/reduce_levels_test.cc", - "serial", - ], - [ - "prefix_test", - "db/prefix_test.cc", - "serial", - ], - [ - "ttl_test", - "utilities/ttl/ttl_test.cc", - "serial", - ], - [ - "merge_helper_test", - "db/merge_helper_test.cc", - "serial", - ], - [ - "file_indexer_test", - "db/file_indexer_test.cc", - "serial", - ], - [ - "db_statistics_test", - "db/db_statistics_test.cc", - "serial", - ], - [ - "memory_test", - "utilities/memory/memory_test.cc", - "serial", - ], - [ - "log_test", - "db/log_test.cc", - "serial", - ], - [ - "env_timed_test", - "utilities/env_timed_test.cc", - "serial", - ], - [ - "deletefile_test", - "db/deletefile_test.cc", - "serial", - ], - [ - "partitioned_filter_block_test", - "table/partitioned_filter_block_test.cc", - "serial", - ], - [ - "comparator_db_test", - "db/comparator_db_test.cc", - "serial", - ], - [ - "compaction_job_stats_test", - "db/compaction_job_stats_test.cc", - "serial", - ], - [ - "thread_local_test", - "util/thread_local_test.cc", - "serial", - ], - [ - "version_builder_test", - "db/version_builder_test.cc", - "serial", - ], - [ - "db_range_del_test", - "db/db_range_del_test.cc", - "serial", - ], - [ - "table_test", - "table/table_test.cc", - "parallel", - ], - [ - "db_tailing_iter_test", - "db/db_tailing_iter_test.cc", - "serial", - ], - [ - "db_compaction_filter_test", - "db/db_compaction_filter_test.cc", - "parallel", - ], - [ - "options_util_test", - "utilities/options/options_util_test.cc", - "serial", - ], - [ - "dynamic_bloom_test", - "util/dynamic_bloom_test.cc", - "serial", - ], - [ - "db_basic_test", - "db/db_basic_test.cc", - "serial", - ], - [ - "db_merge_operator_test", - "db/db_merge_operator_test.cc", - "serial", - ], - [ - "manual_compaction_test", - "db/manual_compaction_test.cc", - "parallel", - ], - [ - "delete_scheduler_test", - "util/delete_scheduler_test.cc", - "serial", - ], - [ - "transaction_test", - "utilities/transactions/transaction_test.cc", - "serial", - ], - [ - "db_io_failure_test", - "db/db_io_failure_test.cc", - "serial", - ], - [ - "corruption_test", - "db/corruption_test.cc", - "serial", - ], - [ - "compact_on_deletion_collector_test", - "utilities/table_properties_collectors/compact_on_deletion_collector_test.cc", - "serial", - ], - [ - "env_test", - "env/env_test.cc", - "serial", - ], - [ - "db_wal_test", - "db/db_wal_test.cc", - "parallel", - ], - [ - "timer_queue_test", - "util/timer_queue_test.cc", - "serial", - ], - [ - "sim_cache_test", - "utilities/simulator_cache/sim_cache_test.cc", - "serial", - ], - [ - "db_memtable_test", - "db/db_memtable_test.cc", - "serial", - ], - [ - "db_universal_compaction_test", - "db/db_universal_compaction_test.cc", - "parallel", - ], - [ - "histogram_test", - "monitoring/histogram_test.cc", - "serial", - ], - [ - "util_merge_operators_test", - "utilities/util_merge_operators_test.cc", - "serial", - ], - [ - "fault_injection_test", - "db/fault_injection_test.cc", - "parallel", - ], - [ - "env_basic_test", - "env/env_basic_test.cc", - "serial", - ], - [ - "iostats_context_test", - "monitoring/iostats_context_test.cc", - "serial", - ], - [ - "memtable_list_test", - "db/memtable_list_test.cc", - "serial", - ], - [ - "column_family_test", - "db/column_family_test.cc", - "serial", - ], - [ - "db_properties_test", - "db/db_properties_test.cc", - "serial", - ], - [ - "version_edit_test", - "db/version_edit_test.cc", - "serial", - ], - [ - "skiplist_test", - "memtable/skiplist_test.cc", - "serial", - ], - [ - "lru_cache_test", - "cache/lru_cache_test.cc", - "serial", - ], - [ - "plain_table_db_test", - "db/plain_table_db_test.cc", - "serial", - ], -] +ROCKS_TESTS = [['merger_test', 'table/merger_test.cc', 'serial'], + ['cache_test', 'cache/cache_test.cc', 'serial'], + ['options_file_test', 'db/options_file_test.cc', 'serial'], + ['compaction_picker_test', 'db/compaction_picker_test.cc', 'serial'], + ['db_log_iter_test', 'db/db_log_iter_test.cc', 'serial'], + ['thread_list_test', 'util/thread_list_test.cc', 'serial'], + ['table_properties_collector_test', + 'db/table_properties_collector_test.cc', + 'serial'], + ['document_db_test', 'utilities/document/document_db_test.cc', 'serial'], + ['event_logger_test', 'util/event_logger_test.cc', 'serial'], + ['coding_test', 'util/coding_test.cc', 'serial'], + ['statistics_test', 'monitoring/statistics_test.cc', 'serial'], + ['options_settable_test', 'options/options_settable_test.cc', 'serial'], + ['sst_dump_test', 'tools/sst_dump_test.cc', 'serial'], + ['column_aware_encoding_test', + 'utilities/column_aware_encoding_test.cc', + 'serial'], + ['db_iterator_test', 'db/db_iterator_test.cc', 'serial'], + ['db_sst_test', 'db/db_sst_test.cc', 'parallel'], + ['geodb_test', 'utilities/geodb/geodb_test.cc', 'serial'], + ['listener_test', 'db/listener_test.cc', 'serial'], + ['write_callback_test', 'db/write_callback_test.cc', 'serial'], + ['version_set_test', 'db/version_set_test.cc', 'serial'], + ['full_filter_block_test', 'table/full_filter_block_test.cc', 'serial'], + ['cleanable_test', 'table/cleanable_test.cc', 'serial'], + ['checkpoint_test', 'utilities/checkpoint/checkpoint_test.cc', 'serial'], + ['compact_files_test', 'db/compact_files_test.cc', 'serial'], + ['db_options_test', 'db/db_options_test.cc', 'serial'], + ['object_registry_test', 'utilities/object_registry_test.cc', 'serial'], + ['auto_roll_logger_test', 'util/auto_roll_logger_test.cc', 'serial'], + ['dbformat_test', 'db/dbformat_test.cc', 'serial'], + ['db_write_test', 'db/db_write_test.cc', 'serial'], + ['write_batch_with_index_test', + 'utilities/write_batch_with_index/write_batch_with_index_test.cc', + 'serial'], + ['json_document_test', 'utilities/document/json_document_test.cc', 'serial'], + ['file_reader_writer_test', 'util/file_reader_writer_test.cc', 'serial'], + ['repair_test', 'db/repair_test.cc', 'serial'], + ['persistent_cache_test', + 'utilities/persistent_cache/persistent_cache_test.cc', + 'parallel'], + ['db_bloom_filter_test', 'db/db_bloom_filter_test.cc', 'serial'], + ['external_sst_file_basic_test', + 'db/external_sst_file_basic_test.cc', + 'serial'], + ['write_buffer_manager_test', + 'memtable/write_buffer_manager_test.cc', + 'serial'], + ['options_test', 'options/options_test.cc', 'serial'], + ['perf_context_test', 'db/perf_context_test.cc', 'serial'], + ['db_block_cache_test', 'db/db_block_cache_test.cc', 'serial'], + ['heap_test', 'util/heap_test.cc', 'serial'], + ['db_test2', 'db/db_test2.cc', 'serial'], + ['filelock_test', 'util/filelock_test.cc', 'serial'], + ['write_controller_test', 'db/write_controller_test.cc', 'serial'], + ['compaction_iterator_test', 'db/compaction_iterator_test.cc', 'serial'], + ['spatial_db_test', 'utilities/spatialdb/spatial_db_test.cc', 'serial'], + ['c_test', 'db/c_test.c', 'serial'], + ['range_del_aggregator_test', 'db/range_del_aggregator_test.cc', 'serial'], + ['date_tiered_test', 'utilities/date_tiered/date_tiered_test.cc', 'serial'], + ['ldb_cmd_test', 'tools/ldb_cmd_test.cc', 'serial'], + ['db_test', 'db/db_test.cc', 'parallel'], + ['block_based_filter_block_test', + 'table/block_based_filter_block_test.cc', + 'serial'], + ['merge_test', 'db/merge_test.cc', 'serial'], + ['bloom_test', 'util/bloom_test.cc', 'serial'], + ['block_test', 'table/block_test.cc', 'serial'], + ['cuckoo_table_builder_test', 'table/cuckoo_table_builder_test.cc', 'serial'], + ['backupable_db_test', + 'utilities/backupable/backupable_db_test.cc', + 'parallel'], + ['db_flush_test', 'db/db_flush_test.cc', 'serial'], + ['filename_test', 'db/filename_test.cc', 'serial'], + ['cuckoo_table_reader_test', 'table/cuckoo_table_reader_test.cc', 'serial'], + ['slice_transform_test', 'util/slice_transform_test.cc', 'serial'], + ['cuckoo_table_db_test', 'db/cuckoo_table_db_test.cc', 'serial'], + ['inlineskiplist_test', 'memtable/inlineskiplist_test.cc', 'parallel'], + ['optimistic_transaction_test', + 'utilities/transactions/optimistic_transaction_test.cc', + 'serial'], + ['hash_table_test', + 'utilities/persistent_cache/hash_table_test.cc', + 'serial'], + ['db_dynamic_level_test', 'db/db_dynamic_level_test.cc', 'serial'], + ['option_change_migration_test', + 'utilities/option_change_migration/option_change_migration_test.cc', + 'serial'], + ['db_inplace_update_test', 'db/db_inplace_update_test.cc', 'serial'], + ['autovector_test', 'util/autovector_test.cc', 'serial'], + ['db_iter_test', 'db/db_iter_test.cc', 'serial'], + ['flush_job_test', 'db/flush_job_test.cc', 'serial'], + ['wal_manager_test', 'db/wal_manager_test.cc', 'serial'], + ['write_batch_test', 'db/write_batch_test.cc', 'serial'], + ['crc32c_test', 'util/crc32c_test.cc', 'serial'], + ['rate_limiter_test', 'util/rate_limiter_test.cc', 'serial'], + ['external_sst_file_test', 'db/external_sst_file_test.cc', 'parallel'], + ['compaction_job_test', 'db/compaction_job_test.cc', 'serial'], + ['mock_env_test', 'env/mock_env_test.cc', 'serial'], + ['db_table_properties_test', 'db/db_table_properties_test.cc', 'serial'], + ['db_compaction_test', 'db/db_compaction_test.cc', 'parallel'], + ['arena_test', 'util/arena_test.cc', 'serial'], + ['stringappend_test', + 'utilities/merge_operators/string_append/stringappend_test.cc', + 'serial'], + ['reduce_levels_test', 'tools/reduce_levels_test.cc', 'serial'], + ['prefix_test', 'db/prefix_test.cc', 'serial'], + ['ttl_test', 'utilities/ttl/ttl_test.cc', 'serial'], + ['merge_helper_test', 'db/merge_helper_test.cc', 'serial'], + ['file_indexer_test', 'db/file_indexer_test.cc', 'serial'], + ['db_statistics_test', 'db/db_statistics_test.cc', 'serial'], + ['memory_test', 'utilities/memory/memory_test.cc', 'serial'], + ['log_test', 'db/log_test.cc', 'serial'], + ['env_timed_test', 'utilities/env_timed_test.cc', 'serial'], + ['deletefile_test', 'db/deletefile_test.cc', 'serial'], + ['partitioned_filter_block_test', + 'table/partitioned_filter_block_test.cc', + 'serial'], + ['comparator_db_test', 'db/comparator_db_test.cc', 'serial'], + ['compaction_job_stats_test', 'db/compaction_job_stats_test.cc', 'serial'], + ['thread_local_test', 'util/thread_local_test.cc', 'serial'], + ['version_builder_test', 'db/version_builder_test.cc', 'serial'], + ['db_range_del_test', 'db/db_range_del_test.cc', 'serial'], + ['table_test', 'table/table_test.cc', 'parallel'], + ['db_tailing_iter_test', 'db/db_tailing_iter_test.cc', 'serial'], + ['db_compaction_filter_test', 'db/db_compaction_filter_test.cc', 'parallel'], + ['options_util_test', 'utilities/options/options_util_test.cc', 'serial'], + ['dynamic_bloom_test', 'util/dynamic_bloom_test.cc', 'serial'], + ['db_basic_test', 'db/db_basic_test.cc', 'serial'], + ['db_merge_operator_test', 'db/db_merge_operator_test.cc', 'serial'], + ['manual_compaction_test', 'db/manual_compaction_test.cc', 'parallel'], + ['delete_scheduler_test', 'util/delete_scheduler_test.cc', 'serial'], + ['transaction_test', 'utilities/transactions/transaction_test.cc', 'serial'], + ['db_io_failure_test', 'db/db_io_failure_test.cc', 'serial'], + ['corruption_test', 'db/corruption_test.cc', 'serial'], + ['compact_on_deletion_collector_test', + 'utilities/table_properties_collectors/compact_on_deletion_collector_test.cc', + 'serial'], + ['env_test', 'env/env_test.cc', 'serial'], + ['db_wal_test', 'db/db_wal_test.cc', 'parallel'], + ['timer_queue_test', 'util/timer_queue_test.cc', 'serial'], + ['sim_cache_test', 'utilities/simulator_cache/sim_cache_test.cc', 'serial'], + ['db_memtable_test', 'db/db_memtable_test.cc', 'serial'], + ['db_universal_compaction_test', + 'db/db_universal_compaction_test.cc', + 'parallel'], + ['histogram_test', 'monitoring/histogram_test.cc', 'serial'], + ['util_merge_operators_test', + 'utilities/util_merge_operators_test.cc', + 'serial'], + ['fault_injection_test', 'db/fault_injection_test.cc', 'parallel'], + ['env_basic_test', 'env/env_basic_test.cc', 'serial'], + ['iostats_context_test', 'monitoring/iostats_context_test.cc', 'serial'], + ['memtable_list_test', 'db/memtable_list_test.cc', 'serial'], + ['column_family_test', 'db/column_family_test.cc', 'serial'], + ['db_properties_test', 'db/db_properties_test.cc', 'serial'], + ['version_edit_test', 'db/version_edit_test.cc', 'serial'], + ['skiplist_test', 'memtable/skiplist_test.cc', 'serial'], + ['lru_cache_test', 'cache/lru_cache_test.cc', 'serial'], + ['plain_table_db_test', 'db/plain_table_db_test.cc', 'serial']] + # Generate a test rule for each entry in ROCKS_TESTS for test_cfg in ROCKS_TESTS: @@ -971,12 +489,12 @@ for test_cfg in ROCKS_TESTS: custom_unittest( name = "make_rocksdbjavastatic", - command = ["internal_repo_rocksdb/make_rocksdbjavastatic.sh"], type = "simple", + command = ["internal_repo_rocksdb/make_rocksdbjavastatic.sh"], ) custom_unittest( name = "make_rocksdb_lite_release", - command = ["internal_repo_rocksdb/make_rocksdb_lite_release.sh"], type = "simple", + command = ["internal_repo_rocksdb/make_rocksdb_lite_release.sh"], ) diff --git a/db/db_dynamic_level_test.cc b/db/db_dynamic_level_test.cc index e00fe6d6d..ec7a40aa1 100644 --- a/db/db_dynamic_level_test.cc +++ b/db/db_dynamic_level_test.cc @@ -59,7 +59,6 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBase) { Options options; options.env = env.get(); options.create_if_missing = true; - options.db_write_buffer_size = 2048; options.write_buffer_size = 2048; options.max_write_buffer_number = 2; options.level0_file_num_compaction_trigger = 2; @@ -129,7 +128,6 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBase2) { Options options = CurrentOptions(); options.create_if_missing = true; - options.db_write_buffer_size = 204800; options.write_buffer_size = 20480; options.max_write_buffer_number = 2; options.level0_file_num_compaction_trigger = 2; @@ -286,7 +284,6 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesCompactRange) { Options options = CurrentOptions(); options.create_if_missing = true; - options.db_write_buffer_size = 2048; options.write_buffer_size = 2048; options.max_write_buffer_number = 2; options.level0_file_num_compaction_trigger = 2; @@ -364,7 +361,6 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesCompactRange) { TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBaseInc) { Options options = CurrentOptions(); options.create_if_missing = true; - options.db_write_buffer_size = 2048; options.write_buffer_size = 2048; options.max_write_buffer_number = 2; options.level0_file_num_compaction_trigger = 2; @@ -377,6 +373,7 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBaseInc) { options.soft_rate_limit = 1.1; options.max_background_compactions = 2; options.num_levels = 5; + options.max_compaction_bytes = 100000000; DestroyAndReopen(options); @@ -416,7 +413,6 @@ TEST_F(DBTestDynamicLevel, DISABLED_MigrateToDynamicLevelMaxBytesBase) { Options options; options.create_if_missing = true; - options.db_write_buffer_size = 2048; options.write_buffer_size = 2048; options.max_write_buffer_number = 8; options.level0_file_num_compaction_trigger = 4; diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc index 220ceac7e..1bf9a41c9 100644 --- a/db/db_memtable_test.cc +++ b/db/db_memtable_test.cc @@ -23,7 +23,7 @@ class DBMemTableTest : public DBTestBase { class MockMemTableRep : public MemTableRep { public: - explicit MockMemTableRep(MemTableAllocator* allocator, MemTableRep* rep) + explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep) : MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {} virtual KeyHandle Allocate(const size_t len, char** buf) override { @@ -74,7 +74,7 @@ class MockMemTableRep : public MemTableRep { class MockMemTableRepFactory : public MemTableRepFactory { public: virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp, - MemTableAllocator* allocator, + Allocator* allocator, const SliceTransform* transform, Logger* logger) override { SkipListFactory factory; @@ -85,7 +85,7 @@ class MockMemTableRepFactory : public MemTableRepFactory { } virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp, - MemTableAllocator* allocator, + Allocator* allocator, const SliceTransform* transform, Logger* logger, uint32_t column_family_id) override { diff --git a/db/db_test.cc b/db/db_test.cc index 756c01c2a..93e5fe609 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -62,7 +62,6 @@ #include "util/file_reader_writer.h" #include "util/filename.h" #include "util/hash.h" -#include "util/logging.h" #include "util/mutexlock.h" #include "util/rate_limiter.h" #include "util/string_util.h" @@ -3692,19 +3691,14 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { Random rnd(301); Options options; options.create_if_missing = true; - options.db_write_buffer_size = 6000; - options.write_buffer_size = 6000; + options.db_write_buffer_size = 6000000; + options.write_buffer_size = 600000; options.max_write_buffer_number = 2; options.level0_file_num_compaction_trigger = 2; options.level0_slowdown_writes_trigger = 2; options.level0_stop_writes_trigger = 2; options.soft_pending_compaction_bytes_limit = 1024 * 1024; - - // Use file size to distinguish levels - // L1: 10, L2: 20, L3 40, L4 80 - // L0 is less than 30 - options.target_file_size_base = 10; - options.target_file_size_multiplier = 2; + options.target_file_size_base = 20; options.level_compaction_dynamic_level_bytes = true; options.max_bytes_for_level_base = 200; @@ -3741,10 +3735,11 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); for (int i = 0; i < 100; i++) { - ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); - - if (i % 25 == 0) { - dbfull()->TEST_WaitForFlushMemTable(); + std::string value = RandomString(&rnd, 200); + ASSERT_OK(Put(Key(keys[i]), value)); + if (i % 25 == 24) { + Flush(); + dbfull()->TEST_WaitForCompact(); } } @@ -3785,7 +3780,8 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); for (int i = 101; i < 500; i++) { - ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); + std::string value = RandomString(&rnd, 200); + ASSERT_OK(Put(Key(keys[i]), value)); if (i % 100 == 99) { Flush(); dbfull()->TEST_WaitForCompact(); diff --git a/db/db_test2.cc b/db/db_test2.cc index dd2da10c3..714342a8b 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -167,20 +167,48 @@ TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) { #ifndef ROCKSDB_LITE class DBTestSharedWriteBufferAcrossCFs : public DBTestBase, - public testing::WithParamInterface { + public testing::WithParamInterface> { public: DBTestSharedWriteBufferAcrossCFs() : DBTestBase("/db_test_shared_write_buffer") {} - void SetUp() override { use_old_interface_ = GetParam(); } + void SetUp() override { + use_old_interface_ = std::get<0>(GetParam()); + cost_cache_ = std::get<1>(GetParam()); + } bool use_old_interface_; + bool cost_cache_; }; TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { Options options = CurrentOptions(); + options.arena_block_size = 4096; + + // Avoid undeterministic value by malloc_usable_size(); + // Force arena block size to 1 + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "Arena::Arena:0", [&](void* arg) { + size_t* block_size = static_cast(arg); + *block_size = 1; + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "Arena::AllocateNewBlock:0", [&](void* arg) { + std::pair* pair = + static_cast*>(arg); + *std::get<0>(*pair) = *std::get<1>(*pair); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // The total soft write buffer size is about 105000 + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 1024 * 1024); + if (use_old_interface_) { - options.db_write_buffer_size = 100000; // this is the real limit + options.db_write_buffer_size = 120000; // this is the real limit + } else if (!cost_cache_) { + options.write_buffer_manager.reset(new WriteBufferManager(114285)); } else { - options.write_buffer_manager.reset(new WriteBufferManager(100000)); + options.write_buffer_manager.reset(new WriteBufferManager(114285, cache)); } options.write_buffer_size = 500000; // this is never hit CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); @@ -188,6 +216,13 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { WriteOptions wo; wo.disableWAL = true; + std::function wait_flush = [&]() { + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + }; + // Create some data and flush "default" and "nikitich" so that they // are newer CFs created. ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); @@ -201,13 +236,20 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { static_cast(1)); ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + if (cost_cache_) { + ASSERT_GE(cache->GetUsage(), 1024 * 1024); + ASSERT_LE(cache->GetUsage(), 2 * 1024 * 1024); + } + wait_flush(); ASSERT_OK(Put(0, Key(1), DummyString(60000), wo)); + if (cost_cache_) { + ASSERT_GE(cache->GetUsage(), 1024 * 1024); + ASSERT_LE(cache->GetUsage(), 2 * 1024 * 1024); + } + wait_flush(); ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); // No flush should trigger - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + wait_flush(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(1)); @@ -221,11 +263,9 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { // Trigger a flush. Flushing "nikitich". ASSERT_OK(Put(3, Key(2), DummyString(30000), wo)); + wait_flush(); ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + wait_flush(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(1)); @@ -239,12 +279,11 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { // Without hitting the threshold, no flush should trigger. ASSERT_OK(Put(2, Key(1), DummyString(30000), wo)); + wait_flush(); ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + wait_flush(); ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + wait_flush(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(1)); @@ -259,14 +298,15 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { // Hit the write buffer limit again. "default" // will have been flushed. ASSERT_OK(Put(2, Key(2), DummyString(10000), wo)); + wait_flush(); ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + wait_flush(); ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + wait_flush(); ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + wait_flush(); ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + wait_flush(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(2)); @@ -281,13 +321,14 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { // Trigger another flush. This time "dobrynia". "pikachu" should not // be flushed, althrough it was never flushed. ASSERT_OK(Put(1, Key(1), DummyString(1), wo)); + wait_flush(); ASSERT_OK(Put(2, Key(1), DummyString(80000), wo)); + wait_flush(); ASSERT_OK(Put(1, Key(1), DummyString(1), wo)); + wait_flush(); ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + wait_flush(); + { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(2)); @@ -298,16 +339,45 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } + if (cost_cache_) { + ASSERT_GE(cache->GetUsage(), 1024 * 1024); + Close(); + options.write_buffer_manager.reset(); + ASSERT_LT(cache->GetUsage(), 1024 * 1024); + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs, - DBTestSharedWriteBufferAcrossCFs, ::testing::Bool()); + DBTestSharedWriteBufferAcrossCFs, + ::testing::Values(std::make_tuple(true, false), + std::make_tuple(false, false), + std::make_tuple(false, true))); TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { std::string dbname2 = test::TmpDir(env_) + "/db_shared_wb_db2"; Options options = CurrentOptions(); + options.arena_block_size = 4096; + // Avoid undeterministic value by malloc_usable_size(); + // Force arena block size to 1 + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "Arena::Arena:0", [&](void* arg) { + size_t* block_size = static_cast(arg); + *block_size = 1; + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "Arena::AllocateNewBlock:0", [&](void* arg) { + std::pair* pair = + static_cast*>(arg); + *std::get<0>(*pair) = *std::get<1>(*pair); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + options.write_buffer_size = 500000; // this is never hit - options.write_buffer_manager.reset(new WriteBufferManager(100000)); + // Use a write buffer total size so that the soft limit is about + // 105000. + options.write_buffer_manager.reset(new WriteBufferManager(120000)); CreateAndReopenWithCF({"cf1", "cf2"}, options); ASSERT_OK(DestroyDB(dbname2, options)); @@ -317,17 +387,25 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { WriteOptions wo; wo.disableWAL = true; + std::function wait_flush = [&]() { + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + static_cast(db2)->TEST_WaitForFlushMemTable(); + }; + // Trigger a flush on cf2 ASSERT_OK(Put(2, Key(1), DummyString(70000), wo)); + wait_flush(); ASSERT_OK(Put(0, Key(1), DummyString(20000), wo)); + wait_flush(); // Insert to DB2 ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000))); + wait_flush(); ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + wait_flush(); static_cast(db2)->TEST_WaitForFlushMemTable(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") + @@ -340,10 +418,9 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { // Triggering to flush another CF in DB1 ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000))); + wait_flush(); ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + wait_flush(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(1)); @@ -357,10 +434,9 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { // Triggering flush in DB2. ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000))); + wait_flush(); ASSERT_OK(db2->Put(wo, Key(1), DummyString(1))); - dbfull()->TEST_WaitForFlushMemTable(handles_[0]); - dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + wait_flush(); static_cast(db2)->TEST_WaitForFlushMemTable(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), @@ -375,6 +451,8 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { delete db2; ASSERT_OK(DestroyDB(dbname2, options)); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } namespace { diff --git a/db/db_test_util.h b/db/db_test_util.h index 36fdecf42..372baed57 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -123,8 +123,8 @@ enum SkipPolicy { kSkipNone = 0, kSkipNoSnapshot = 1, kSkipNoPrefix = 2 }; // A hacky skip list mem table that triggers flush after number of entries. class SpecialMemTableRep : public MemTableRep { public: - explicit SpecialMemTableRep(MemTableAllocator* allocator, - MemTableRep* memtable, int num_entries_flush) + explicit SpecialMemTableRep(Allocator* allocator, MemTableRep* memtable, + int num_entries_flush) : MemTableRep(allocator), memtable_(memtable), num_entries_flush_(num_entries_flush), @@ -186,7 +186,7 @@ class SpecialSkipListFactory : public MemTableRepFactory { using MemTableRepFactory::CreateMemTableRep; virtual MemTableRep* CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* transform, Logger* logger) override { return new SpecialMemTableRep( allocator, factory_.CreateMemTableRep(compare, allocator, transform, 0), diff --git a/db/memtable.cc b/db/memtable.cc index bfb562fed..271b18e43 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -13,7 +13,6 @@ #include #include -#include #include "db/dbformat.h" #include "db/merge_context.h" @@ -37,7 +36,6 @@ #include "util/memory_usage.h" #include "util/murmurhash.h" #include "util/mutexlock.h" -#include "util/stop_watch.h" namespace rocksdb { @@ -68,14 +66,18 @@ MemTable::MemTable(const InternalKeyComparator& cmp, moptions_(ioptions, mutable_cf_options), refs_(0), kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), - arena_(moptions_.arena_block_size, - mutable_cf_options.memtable_huge_page_size), - allocator_(&arena_, write_buffer_manager), + mem_tracker_(write_buffer_manager), + arena_( + moptions_.arena_block_size, + (write_buffer_manager != nullptr && write_buffer_manager->enabled()) + ? &mem_tracker_ + : nullptr, + mutable_cf_options.memtable_huge_page_size), table_(ioptions.memtable_factory->CreateMemTableRep( - comparator_, &allocator_, ioptions.prefix_extractor, - ioptions.info_log, column_family_id)), + comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log, + column_family_id)), range_del_table_(SkipListFactory().CreateMemTableRep( - comparator_, &allocator_, nullptr /* transform */, ioptions.info_log, + comparator_, &arena_, nullptr /* transform */, ioptions.info_log, column_family_id)), is_range_del_table_empty_(true), data_size_(0), @@ -103,13 +105,16 @@ MemTable::MemTable(const InternalKeyComparator& cmp, if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom( - &allocator_, moptions_.memtable_prefix_bloom_bits, - ioptions.bloom_locality, 6 /* hard coded 6 probes */, nullptr, - moptions_.memtable_huge_page_size, ioptions.info_log)); + &arena_, moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality, + 6 /* hard coded 6 probes */, nullptr, moptions_.memtable_huge_page_size, + ioptions.info_log)); } } -MemTable::~MemTable() { assert(refs_ == 0); } +MemTable::~MemTable() { + mem_tracker_.FreeMem(); + assert(refs_ == 0); +} size_t MemTable::ApproximateMemoryUsage() { autovector usages = {arena_.ApproximateMemoryUsage(), diff --git a/db/memtable.h b/db/memtable.h index 260a1232a..c5a0a8cf6 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -20,12 +20,12 @@ #include "db/dbformat.h" #include "db/range_del_aggregator.h" #include "db/version_edit.h" -#include "memtable/memtable_allocator.h" #include "monitoring/instrumented_mutex.h" #include "options/cf_options.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" +#include "util/allocator.h" #include "util/concurrent_arena.h" #include "util/dynamic_bloom.h" #include "util/hash.h" @@ -319,7 +319,7 @@ class MemTable { // write anything to this MemTable(). (Ie. do not call Add() or Update()). void MarkImmutable() { table_->MarkReadOnly(); - allocator_.DoneAllocating(); + mem_tracker_.DoneAllocating(); } // return true if the current MemTableRep supports merge operator. @@ -361,8 +361,8 @@ class MemTable { const MemTableOptions moptions_; int refs_; const size_t kArenaBlockSize; + AllocTracker mem_tracker_; ConcurrentArena arena_; - MemTableAllocator allocator_; unique_ptr table_; unique_ptr range_del_table_; bool is_range_del_table_empty_; diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 3753d9bc3..e00a03cd9 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -43,7 +43,7 @@ namespace rocksdb { class Arena; -class MemTableAllocator; +class Allocator; class LookupKey; class Slice; class SliceTransform; @@ -68,7 +68,7 @@ class MemTableRep { virtual ~KeyComparator() { } }; - explicit MemTableRep(MemTableAllocator* allocator) : allocator_(allocator) {} + explicit MemTableRep(Allocator* allocator) : allocator_(allocator) {} // Allocate a buf of len size for storing key. The idea is that a // specific memtable representation knows its underlying data structure @@ -208,7 +208,7 @@ class MemTableRep { // user key. virtual Slice UserKey(const char* key) const; - MemTableAllocator* allocator_; + Allocator* allocator_; }; // This is the base class for all factories that are used by RocksDB to create @@ -218,11 +218,10 @@ class MemTableRepFactory { virtual ~MemTableRepFactory() {} virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&, - MemTableAllocator*, - const SliceTransform*, + Allocator*, const SliceTransform*, Logger* logger) = 0; virtual MemTableRep* CreateMemTableRep( - const MemTableRep::KeyComparator& key_cmp, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& key_cmp, Allocator* allocator, const SliceTransform* slice_transform, Logger* logger, uint32_t /* column_family_id */) { return CreateMemTableRep(key_cmp, allocator, slice_transform, logger); @@ -248,8 +247,7 @@ class SkipListFactory : public MemTableRepFactory { using MemTableRepFactory::CreateMemTableRep; virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&, - MemTableAllocator*, - const SliceTransform*, + Allocator*, const SliceTransform*, Logger* logger) override; virtual const char* Name() const override { return "SkipListFactory"; } @@ -276,8 +274,7 @@ class VectorRepFactory : public MemTableRepFactory { using MemTableRepFactory::CreateMemTableRep; virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&, - MemTableAllocator*, - const SliceTransform*, + Allocator*, const SliceTransform*, Logger* logger) override; virtual const char* Name() const override { diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 19af90325..63f9add9f 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -16,17 +16,19 @@ #include #include +#include "rocksdb/cache.h" namespace rocksdb { class WriteBufferManager { public: - // _buffer_size = 0 indicates no limit. Memory won't be tracked, + // _buffer_size = 0 indicates no limit. Memory won't be capped. // memory_usage() won't be valid and ShouldFlush() will always return true. - explicit WriteBufferManager(size_t _buffer_size) - : buffer_size_(_buffer_size), memory_used_(0) {} - - ~WriteBufferManager() {} + // if `cache` is provided, we'll put dummy entries in the cache and cost + // the memory allocated to the cache. It can be used even if _buffer_size = 0. + explicit WriteBufferManager(size_t _buffer_size, + std::shared_ptr cache = {}); + ~WriteBufferManager(); bool enabled() const { return buffer_size_ != 0; } @@ -34,21 +36,41 @@ class WriteBufferManager { size_t memory_usage() const { return memory_used_.load(std::memory_order_relaxed); } + size_t mutable_memtable_memory_usage() const { + return memory_active_.load(std::memory_order_relaxed); + } size_t buffer_size() const { return buffer_size_; } // Should only be called from write thread bool ShouldFlush() const { - return enabled() && memory_usage() >= buffer_size(); + // Flush if memory usage hits a hard limit, or total size that hasn't been + // scheduled to free hits a soft limit, which is 7/8 of the hard limit. + return enabled() && + (memory_usage() >= buffer_size() || + mutable_memtable_memory_usage() >= buffer_size() / 8 * 7); } - // Should only be called from write thread void ReserveMem(size_t mem) { - if (enabled()) { + if (cache_rep_ != nullptr) { + ReserveMemWithCache(mem); + } else if (enabled()) { memory_used_.fetch_add(mem, std::memory_order_relaxed); } + if (enabled()) { + memory_active_.fetch_add(mem, std::memory_order_relaxed); + } } - void FreeMem(size_t mem) { + // We are in the process of freeing `mem` bytes, so it is not considered + // when checking the soft limit. + void ScheduleFreeMem(size_t mem) { if (enabled()) { + memory_active_.fetch_sub(mem, std::memory_order_relaxed); + } + } + void FreeMem(size_t mem) { + if (cache_rep_ != nullptr) { + FreeMemWithCache(mem); + } else if (enabled()) { memory_used_.fetch_sub(mem, std::memory_order_relaxed); } } @@ -56,6 +78,13 @@ class WriteBufferManager { private: const size_t buffer_size_; std::atomic memory_used_; + // Memory that hasn't been scheduled to free. + std::atomic memory_active_; + struct CacheRep; + std::unique_ptr cache_rep_; + + void ReserveMemWithCache(size_t mem); + void FreeMemWithCache(size_t mem); // No copying allowed WriteBufferManager(const WriteBufferManager&) = delete; diff --git a/memtable/memtable_allocator.cc b/memtable/alloc_tracker.cc similarity index 54% rename from memtable/memtable_allocator.cc rename to memtable/alloc_tracker.cc index b973cf3a7..fa3072c60 100644 --- a/memtable/memtable_allocator.cc +++ b/memtable/alloc_tracker.cc @@ -9,53 +9,53 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "memtable/memtable_allocator.h" - #include #include "rocksdb/write_buffer_manager.h" +#include "util/allocator.h" #include "util/arena.h" namespace rocksdb { -MemTableAllocator::MemTableAllocator(Allocator* allocator, - WriteBufferManager* write_buffer_manager) - : allocator_(allocator), - write_buffer_manager_(write_buffer_manager), - bytes_allocated_(0) {} +AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager) + : write_buffer_manager_(write_buffer_manager), + bytes_allocated_(0), + done_allocating_(false), + freed_(false) {} -MemTableAllocator::~MemTableAllocator() { DoneAllocating(); } +AllocTracker::~AllocTracker() { FreeMem(); } -char* MemTableAllocator::Allocate(size_t bytes) { +void AllocTracker::Allocate(size_t bytes) { assert(write_buffer_manager_ != nullptr); if (write_buffer_manager_->enabled()) { bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); write_buffer_manager_->ReserveMem(bytes); } - return allocator_->Allocate(bytes); } -char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size, - Logger* logger) { - assert(write_buffer_manager_ != nullptr); - if (write_buffer_manager_->enabled()) { - bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); - write_buffer_manager_->ReserveMem(bytes); +void AllocTracker::DoneAllocating() { + if (write_buffer_manager_ != nullptr && !done_allocating_) { + if (write_buffer_manager_->enabled()) { + write_buffer_manager_->ScheduleFreeMem( + bytes_allocated_.load(std::memory_order_relaxed)); + } else { + assert(bytes_allocated_.load(std::memory_order_relaxed) == 0); + } + done_allocating_ = true; } - return allocator_->AllocateAligned(bytes, huge_page_size, logger); } -void MemTableAllocator::DoneAllocating() { - if (write_buffer_manager_ != nullptr) { +void AllocTracker::FreeMem() { + if (!done_allocating_) { + DoneAllocating(); + } + if (write_buffer_manager_ != nullptr && !freed_) { if (write_buffer_manager_->enabled()) { write_buffer_manager_->FreeMem( bytes_allocated_.load(std::memory_order_relaxed)); } else { assert(bytes_allocated_.load(std::memory_order_relaxed) == 0); } - write_buffer_manager_ = nullptr; + freed_ = true; } } - -size_t MemTableAllocator::BlockSize() const { return allocator_->BlockSize(); } - } // namespace rocksdb diff --git a/memtable/hash_cuckoo_rep.cc b/memtable/hash_cuckoo_rep.cc index c4a4564b1..bbcd2dab4 100644 --- a/memtable/hash_cuckoo_rep.cc +++ b/memtable/hash_cuckoo_rep.cc @@ -61,8 +61,7 @@ struct CuckooStep { class HashCuckooRep : public MemTableRep { public: explicit HashCuckooRep(const MemTableRep::KeyComparator& compare, - MemTableAllocator* allocator, - const size_t bucket_count, + Allocator* allocator, const size_t bucket_count, const unsigned int hash_func_count, const size_t approximate_entry_size) : MemTableRep(allocator), @@ -198,7 +197,7 @@ class HashCuckooRep : public MemTableRep { private: const MemTableRep::KeyComparator& compare_; // the pointer to Allocator to allocate memory, immutable after construction. - MemTableAllocator* const allocator_; + Allocator* const allocator_; // the number of hash bucket in the hash table. const size_t bucket_count_; // approximate size of each entry @@ -625,7 +624,7 @@ void HashCuckooRep::Iterator::SeekToLast() { } // anom namespace MemTableRep* HashCuckooRepFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* transform, Logger* logger) { // The estimated average fullness. The write performance of any close hash // degrades as the fullness of the mem-table increases. Setting kFullness diff --git a/memtable/hash_cuckoo_rep.h b/memtable/hash_cuckoo_rep.h index fdb977c24..b4418f84f 100644 --- a/memtable/hash_cuckoo_rep.h +++ b/memtable/hash_cuckoo_rep.h @@ -30,7 +30,7 @@ class HashCuckooRepFactory : public MemTableRepFactory { using MemTableRepFactory::CreateMemTableRep; virtual MemTableRep* CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* transform, Logger* logger) override; virtual const char* Name() const override { return "HashCuckooRepFactory"; } diff --git a/memtable/hash_linklist_rep.cc b/memtable/hash_linklist_rep.cc index 13950d23d..f0e9b8691 100644 --- a/memtable/hash_linklist_rep.cc +++ b/memtable/hash_linklist_rep.cc @@ -58,7 +58,7 @@ struct SkipListBucketHeader { MemtableSkipList skip_list; explicit SkipListBucketHeader(const MemTableRep::KeyComparator& cmp, - MemTableAllocator* allocator, uint32_t count) + Allocator* allocator, uint32_t count) : Counting_header(this, // Pointing to itself to indicate header type. count), skip_list(cmp, allocator) {} @@ -164,7 +164,7 @@ struct Node { class HashLinkListRep : public MemTableRep { public: HashLinkListRep(const MemTableRep::KeyComparator& compare, - MemTableAllocator* allocator, const SliceTransform* transform, + Allocator* allocator, const SliceTransform* transform, size_t bucket_size, uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, Logger* logger, int bucket_entries_logging_threshold, @@ -496,14 +496,11 @@ class HashLinkListRep : public MemTableRep { }; }; -HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, - MemTableAllocator* allocator, - const SliceTransform* transform, - size_t bucket_size, - uint32_t threshold_use_skiplist, - size_t huge_page_tlb_size, Logger* logger, - int bucket_entries_logging_threshold, - bool if_log_bucket_dist_when_flash) +HashLinkListRep::HashLinkListRep( + const MemTableRep::KeyComparator& compare, Allocator* allocator, + const SliceTransform* transform, size_t bucket_size, + uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, Logger* logger, + int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) : MemTableRep(allocator), bucket_size_(bucket_size), // Threshold to use skip list doesn't make sense if less than 3, so we @@ -831,7 +828,7 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head, } // anon namespace MemTableRep* HashLinkListRepFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* transform, Logger* logger) { return new HashLinkListRep(compare, allocator, transform, bucket_count_, threshold_use_skiplist_, huge_page_tlb_size_, diff --git a/memtable/hash_linklist_rep.h b/memtable/hash_linklist_rep.h index f792b5b88..bb7863e50 100644 --- a/memtable/hash_linklist_rep.h +++ b/memtable/hash_linklist_rep.h @@ -30,7 +30,7 @@ class HashLinkListRepFactory : public MemTableRepFactory { using MemTableRepFactory::CreateMemTableRep; virtual MemTableRep* CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* transform, Logger* logger) override; virtual const char* Name() const override { diff --git a/memtable/hash_skiplist_rep.cc b/memtable/hash_skiplist_rep.cc index 56c33cfca..360f48f89 100644 --- a/memtable/hash_skiplist_rep.cc +++ b/memtable/hash_skiplist_rep.cc @@ -26,7 +26,7 @@ namespace { class HashSkipListRep : public MemTableRep { public: HashSkipListRep(const MemTableRep::KeyComparator& compare, - MemTableAllocator* allocator, const SliceTransform* transform, + Allocator* allocator, const SliceTransform* transform, size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor); @@ -65,7 +65,7 @@ class HashSkipListRep : public MemTableRep { const MemTableRep::KeyComparator& compare_; // immutable after construction - MemTableAllocator* const allocator_; + Allocator* const allocator_; inline size_t GetHash(const Slice& slice) const { return MurmurHash(slice.data(), static_cast(slice.size()), 0) % @@ -233,7 +233,7 @@ class HashSkipListRep : public MemTableRep { }; HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, - MemTableAllocator* allocator, + Allocator* allocator, const SliceTransform* transform, size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor) @@ -336,7 +336,7 @@ MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) { } // anon namespace MemTableRep* HashSkipListRepFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* transform, Logger* logger) { return new HashSkipListRep(compare, allocator, transform, bucket_count_, skiplist_height_, skiplist_branching_factor_); diff --git a/memtable/hash_skiplist_rep.h b/memtable/hash_skiplist_rep.h index e26e0f57b..bf04f5641 100644 --- a/memtable/hash_skiplist_rep.h +++ b/memtable/hash_skiplist_rep.h @@ -27,7 +27,7 @@ class HashSkipListRepFactory : public MemTableRepFactory { using MemTableRepFactory::CreateMemTableRep; virtual MemTableRep* CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* transform, Logger* logger) override; virtual const char* Name() const override { diff --git a/memtable/memtable_allocator.h b/memtable/memtable_allocator.h deleted file mode 100644 index 1364eb408..000000000 --- a/memtable/memtable_allocator.h +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// This source code is also licensed under the GPLv2 license found in the -// COPYING file in the root directory of this source tree. -// -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. -// -// This is used by the MemTable to allocate write buffer memory. It connects -// to WriteBufferManager so we can track and enforce overall write buffer -// limits. - -#pragma once - -#include -#include "rocksdb/write_buffer_manager.h" -#include "util/allocator.h" - -namespace rocksdb { - -class Logger; - -class MemTableAllocator : public Allocator { - public: - explicit MemTableAllocator(Allocator* allocator, - WriteBufferManager* write_buffer_manager); - ~MemTableAllocator(); - - // Allocator interface - char* Allocate(size_t bytes) override; - char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, - Logger* logger = nullptr) override; - size_t BlockSize() const override; - - // Call when we're finished allocating memory so we can free it from - // the write buffer's limit. - void DoneAllocating(); - - private: - Allocator* allocator_; - WriteBufferManager* write_buffer_manager_; - std::atomic bytes_allocated_; - - // No copying allowed - MemTableAllocator(const MemTableAllocator&); - void operator=(const MemTableAllocator&); -}; - -} // namespace rocksdb diff --git a/memtable/memtablerep_bench.cc b/memtable/memtablerep_bench.cc index 9e725864e..d91efc00e 100644 --- a/memtable/memtablerep_bench.cc +++ b/memtable/memtablerep_bench.cc @@ -627,11 +627,10 @@ int main(int argc, char** argv) { rocksdb::MemTable::KeyComparator key_comp(internal_key_comp); rocksdb::Arena arena; rocksdb::WriteBufferManager wb(FLAGS_write_buffer_size); - rocksdb::MemTableAllocator memtable_allocator(&arena, &wb); uint64_t sequence; auto createMemtableRep = [&] { sequence = 0; - return factory->CreateMemTableRep(key_comp, &memtable_allocator, + return factory->CreateMemTableRep(key_comp, &arena, options.prefix_extractor.get(), options.info_log.get()); }; diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index d42d0a1cd..15ce89a6c 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -20,16 +20,18 @@ class SkipListRep : public MemTableRep { friend class LookaheadIterator; public: - explicit SkipListRep(const MemTableRep::KeyComparator& compare, - MemTableAllocator* allocator, - const SliceTransform* transform, const size_t lookahead) - : MemTableRep(allocator), skip_list_(compare, allocator), cmp_(compare), - transform_(transform), lookahead_(lookahead) { - } - - virtual KeyHandle Allocate(const size_t len, char** buf) override { - *buf = skip_list_.AllocateKey(len); - return static_cast(*buf); + explicit SkipListRep(const MemTableRep::KeyComparator& compare, + Allocator* allocator, const SliceTransform* transform, + const size_t lookahead) + : MemTableRep(allocator), + skip_list_(compare, allocator), + cmp_(compare), + transform_(transform), + lookahead_(lookahead) {} + + virtual KeyHandle Allocate(const size_t len, char** buf) override { + *buf = skip_list_.AllocateKey(len); + return static_cast(*buf); } // Insert key into the list. @@ -269,7 +271,7 @@ public: } MemTableRep* SkipListFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform* transform, Logger* logger) { return new SkipListRep(compare, allocator, transform, lookahead_); } diff --git a/memtable/vectorrep.cc b/memtable/vectorrep.cc index bb0e1cfe8..dcba842fd 100644 --- a/memtable/vectorrep.cc +++ b/memtable/vectorrep.cc @@ -27,8 +27,7 @@ using namespace stl_wrappers; class VectorRep : public MemTableRep { public: - VectorRep(const KeyComparator& compare, MemTableAllocator* allocator, - size_t count); + VectorRep(const KeyComparator& compare, Allocator* allocator, size_t count); // Insert key into the collection. (The caller will pack key and value into a // single buffer and pass that in as the parameter to Insert) @@ -138,13 +137,15 @@ size_t VectorRep::ApproximateMemoryUsage() { ); } -VectorRep::VectorRep(const KeyComparator& compare, MemTableAllocator* allocator, +VectorRep::VectorRep(const KeyComparator& compare, Allocator* allocator, size_t count) - : MemTableRep(allocator), - bucket_(new Bucket()), - immutable_(false), - sorted_(false), - compare_(compare) { bucket_.get()->reserve(count); } + : MemTableRep(allocator), + bucket_(new Bucket()), + immutable_(false), + sorted_(false), + compare_(compare) { + bucket_.get()->reserve(count); +} VectorRep::Iterator::Iterator(class VectorRep* vrep, std::shared_ptr> bucket, @@ -296,7 +297,7 @@ MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) { } // anon namespace MemTableRep* VectorRepFactory::CreateMemTableRep( - const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator, + const MemTableRep::KeyComparator& compare, Allocator* allocator, const SliceTransform*, Logger* logger) { return new VectorRep(compare, allocator, count_); } diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc new file mode 100644 index 000000000..bb61c85d2 --- /dev/null +++ b/memtable/write_buffer_manager.cc @@ -0,0 +1,125 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "rocksdb/write_buffer_manager.h" +#include +#include "util/coding.h" + +namespace rocksdb { +#ifndef ROCKSDB_LITE +namespace { +const size_t kSizeDummyEntry = 1024 * 1024; +// The key will be longer than keys for blocks in SST files so they won't +// conflict. +const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1; +} // namespace + +struct WriteBufferManager::CacheRep { + std::shared_ptr cache_; + std::mutex cache_mutex_; + std::atomic cache_allocated_size_; + // The non-prefix part will be updated according to the ID to use. + char cache_key_[kCacheKeyPrefix + kMaxVarint64Length]; + uint64_t next_cache_key_id_ = 0; + std::vector dummy_handles_; + + explicit CacheRep(std::shared_ptr cache) + : cache_(cache), cache_allocated_size_(0) { + memset(cache_key_, 0, kCacheKeyPrefix); + size_t pointer_size = sizeof(const void*); + assert(pointer_size <= kCacheKeyPrefix); + memcpy(cache_key_, static_cast(this), pointer_size); + } + + Slice GetNextCacheKey() { + memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length); + char* end = + EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++); + return Slice(cache_key_, static_cast(end - cache_key_)); + } +}; +#else +struct WriteBufferManager::CacheRep {}; +#endif // ROCKSDB_LITE + +WriteBufferManager::WriteBufferManager(size_t _buffer_size, + std::shared_ptr cache) + : buffer_size_(_buffer_size), + memory_used_(0), + memory_active_(0), + cache_rep_(nullptr) { +#ifndef ROCKSDB_LITE + if (cache) { + // Construct the cache key using the pointer to this. + cache_rep_.reset(new CacheRep(cache)); + } +#endif // ROCKSDB_LITE +} + +WriteBufferManager::~WriteBufferManager() { +#ifndef ROCKSDB_LITE + if (cache_rep_) { + for (auto* handle : cache_rep_->dummy_handles_) { + cache_rep_->cache_->Release(handle, true); + } + } +#endif // ROCKSDB_LITE +} + +// Should only be called from write thread +void WriteBufferManager::ReserveMemWithCache(size_t mem) { +#ifndef ROCKSDB_LITE + assert(cache_rep_ != nullptr); + // Use a mutex to protect various data structures. Can be optimzied to a + // lock-free solution if it ends up with a performance bottleneck. + std::lock_guard lock(cache_rep_->cache_mutex_); + + size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem; + memory_used_.store(new_mem_used, std::memory_order_relaxed); + while (new_mem_used > cache_rep_->cache_allocated_size_) { + // Expand size by at least 1MB. + // Add a dummy record to the cache + Cache::Handle* handle; + cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr, + kSizeDummyEntry, nullptr, &handle); + cache_rep_->dummy_handles_.push_back(handle); + cache_rep_->cache_allocated_size_ += kSizeDummyEntry; + } +#endif // ROCKSDB_LITE +} + +void WriteBufferManager::FreeMemWithCache(size_t mem) { +#ifndef ROCKSDB_LITE + assert(cache_rep_ != nullptr); + // Use a mutex to protect various data structures. Can be optimzied to a + // lock-free solution if it ends up with a performance bottleneck. + std::lock_guard lock(cache_rep_->cache_mutex_); + size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem; + memory_used_.store(new_mem_used, std::memory_order_relaxed); + // Gradually shrink memory costed in the block cache if the actual + // usage is less than 3/4 of what we reserve from the block cache. + // We do this becausse: + // 1. we don't pay the cost of the block cache immediately a memtable is + // freed, as block cache insert is expensive; + // 2. eventually, if we walk away from a temporary memtable size increase, + // we make sure shrink the memory costed in block cache over time. + // In this way, we only shrink costed memory showly even there is enough + // margin. + if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 && + cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) { + assert(!cache_rep_->dummy_handles_.empty()); + cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true); + cache_rep_->dummy_handles_.pop_back(); + cache_rep_->cache_allocated_size_ -= kSizeDummyEntry; + } +#endif // ROCKSDB_LITE +} +} // namespace rocksdb diff --git a/memtable/write_buffer_manager_test.cc b/memtable/write_buffer_manager_test.cc new file mode 100644 index 000000000..64ec840b6 --- /dev/null +++ b/memtable/write_buffer_manager_test.cc @@ -0,0 +1,141 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "rocksdb/write_buffer_manager.h" +#include "util/testharness.h" + +namespace rocksdb { + +class WriteBufferManagerTest : public testing::Test {}; + +#ifndef ROCKSDB_LITE +TEST_F(WriteBufferManagerTest, ShouldFlush) { + // A write buffer manager of size 50MB + std::unique_ptr wbf( + new WriteBufferManager(10 * 1024 * 1024)); + + wbf->ReserveMem(8 * 1024 * 1024); + ASSERT_FALSE(wbf->ShouldFlush()); + // 90% of the hard limit will hit the condition + wbf->ReserveMem(1 * 1024 * 1024); + ASSERT_TRUE(wbf->ShouldFlush()); + // Scheduling for feeing will release the condition + wbf->ScheduleFreeMem(1 * 1024 * 1024); + ASSERT_FALSE(wbf->ShouldFlush()); + + wbf->ReserveMem(2 * 1024 * 1024); + ASSERT_TRUE(wbf->ShouldFlush()); + wbf->ScheduleFreeMem(5 * 1024 * 1024); + // hard limit still hit + ASSERT_TRUE(wbf->ShouldFlush()); + wbf->FreeMem(10 * 1024 * 1024); + ASSERT_FALSE(wbf->ShouldFlush()); +} + +TEST_F(WriteBufferManagerTest, CacheCost) { + // 1GB cache + std::shared_ptr cache = NewLRUCache(1024 * 1024 * 1024, 4); + // A write buffer manager of size 50MB + std::unique_ptr wbf( + new WriteBufferManager(50 * 1024 * 1024, cache)); + + // Allocate 1.5MB will allocate 2MB + wbf->ReserveMem(1536 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 2 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 2 * 1024 * 1024 + 10000); + + // Allocate another 2MB + wbf->ReserveMem(2 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 4 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 4 * 1024 * 1024 + 10000); + + // Allocate another 20MB + wbf->ReserveMem(20 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 24 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 24 * 1024 * 1024 + 10000); + + // Free 2MB will not cause any change in cache cost + wbf->FreeMem(2 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 24 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 24 * 1024 * 1024 + 10000); + + ASSERT_FALSE(wbf->ShouldFlush()); + + // Allocate another 30MB + wbf->ReserveMem(30 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 52 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 52 * 1024 * 1024 + 10000); + ASSERT_TRUE(wbf->ShouldFlush()); + + ASSERT_TRUE(wbf->ShouldFlush()); + + wbf->ScheduleFreeMem(20 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 52 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 52 * 1024 * 1024 + 10000); + + // Still need flush as the hard limit hits + ASSERT_TRUE(wbf->ShouldFlush()); + + // Free 20MB will releae 1MB from cache + wbf->FreeMem(20 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 + 10000); + + ASSERT_FALSE(wbf->ShouldFlush()); + + // Every free will release 1MB if still not hit 3/4 + wbf->FreeMem(16 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 50 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 50 * 1024 * 1024 + 10000); + + wbf->FreeMem(16 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 49 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 49 * 1024 * 1024 + 10000); + + // Free 2MB will not cause any change in cache cost + wbf->ReserveMem(2 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 49 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 49 * 1024 * 1024 + 10000); + + wbf->FreeMem(16 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 48 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 48 * 1024 * 1024 + 10000); + + // Destory write buffer manger should free everything + wbf.reset(); + ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024); +} + +TEST_F(WriteBufferManagerTest, NoCapCacheCost) { + // 1GB cache + std::shared_ptr cache = NewLRUCache(1024 * 1024 * 1024, 4); + // A write buffer manager of size 256MB + std::unique_ptr wbf(new WriteBufferManager(0, cache)); + // Allocate 1.5MB will allocate 2MB + wbf->ReserveMem(10 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 10 * 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 10 * 1024 * 1024 + 10000); + ASSERT_FALSE(wbf->ShouldFlush()); + + wbf->FreeMem(9 * 1024 * 1024); + for (int i = 0; i < 10; i++) { + wbf->FreeMem(16 * 1024); + } + ASSERT_GE(cache->GetPinnedUsage(), 1024 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024 + 10000); +} +#endif // ROCKSDB_LITE +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/options/options_helper.h b/options/options_helper.h index 18b099168..747efc300 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -344,8 +344,8 @@ static std::unordered_map db_options_type_info = { OptionType::kBoolean, OptionVerificationType::kNormal, true, offsetof(struct MutableDBOptions, avoid_flush_during_shutdown)}}, {"allow_ingest_behind", - {offsetof(struct DBOptions, allow_ingest_behind), - OptionType::kBoolean, OptionVerificationType::kNormal, false, + {offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean, + OptionVerificationType::kNormal, false, offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}}; // offset_of is used to get the offset of a class data member diff --git a/src.mk b/src.mk index bd362c641..6f9c8233f 100644 --- a/src.mk +++ b/src.mk @@ -60,13 +60,14 @@ LIB_SOURCES = \ env/env_hdfs.cc \ env/env_posix.cc \ env/io_posix.cc \ - env/mock_env.cc \ + env/mock_env.cc \ + memtable/alloc_tracker.cc \ memtable/hash_cuckoo_rep.cc \ memtable/hash_linklist_rep.cc \ memtable/hash_skiplist_rep.cc \ - memtable/memtable_allocator.cc \ memtable/skiplistrep.cc \ memtable/vectorrep.cc \ + memtable/write_buffer_manager.cc \ monitoring/histogram.cc \ monitoring/histogram_windowing.cc \ monitoring/instrumented_mutex.cc \ @@ -286,6 +287,7 @@ MAIN_SOURCES = \ memtable/inlineskiplist_test.cc \ memtable/memtablerep_bench.cc \ memtable/skiplist_test.cc \ + memtable/write_buffer_manager_test.cc \ monitoring/histogram_test.cc \ monitoring/iostats_context_test.cc \ monitoring/statistics_test.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 08ba19bcd..870d89d3e 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -279,6 +279,9 @@ DEFINE_bool(enable_numa, false, DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size, "Number of bytes to buffer in all memtables before compacting"); +DEFINE_bool(cost_write_buffer_to_cache, false, + "The usage of memtable is costed to the block cache"); + DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size, "Number of bytes to buffer in memtable before compacting"); @@ -2823,7 +2826,10 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.create_missing_column_families = FLAGS_num_column_families > 1; options.max_open_files = FLAGS_open_files; - options.db_write_buffer_size = FLAGS_db_write_buffer_size; + if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) { + options.write_buffer_manager.reset( + new WriteBufferManager(FLAGS_db_write_buffer_size, cache_)); + } options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.min_write_buffer_number_to_merge = diff --git a/util/allocator.h b/util/allocator.h index 77d9dc4cd..3772a278f 100644 --- a/util/allocator.h +++ b/util/allocator.h @@ -13,8 +13,9 @@ // when the allocator object is destroyed. See the Arena class for more info. #pragma once -#include #include +#include +#include "rocksdb/write_buffer_manager.h" namespace rocksdb { @@ -31,4 +32,28 @@ class Allocator { virtual size_t BlockSize() const = 0; }; +class AllocTracker { + public: + explicit AllocTracker(WriteBufferManager* write_buffer_manager); + ~AllocTracker(); + void Allocate(size_t bytes); + // Call when we're finished allocating memory so we can free it from + // the write buffer's limit. + void DoneAllocating(); + + void FreeMem(); + + bool is_freed() const { return write_buffer_manager_ == nullptr || freed_; } + + private: + WriteBufferManager* write_buffer_manager_; + std::atomic bytes_allocated_; + bool done_allocating_; + bool freed_; + + // No copying allowed + AllocTracker(const AllocTracker&); + void operator=(const AllocTracker&); +}; + } // namespace rocksdb diff --git a/util/arena.cc b/util/arena.cc index d3129b55f..b5ddf564a 100644 --- a/util/arena.cc +++ b/util/arena.cc @@ -24,6 +24,7 @@ #include "port/port.h" #include "rocksdb/env.h" #include "util/logging.h" +#include "util/sync_point.h" namespace rocksdb { @@ -49,10 +50,11 @@ size_t OptimizeBlockSize(size_t block_size) { return block_size; } -Arena::Arena(size_t block_size, size_t huge_page_size) - : kBlockSize(OptimizeBlockSize(block_size)) { +Arena::Arena(size_t block_size, AllocTracker* tracker, size_t huge_page_size) + : kBlockSize(OptimizeBlockSize(block_size)), tracker_(tracker) { assert(kBlockSize >= kMinBlockSize && kBlockSize <= kMaxBlockSize && kBlockSize % kAlignUnit == 0); + TEST_SYNC_POINT_CALLBACK("Arena::Arena:0", const_cast(&kBlockSize)); alloc_bytes_remaining_ = sizeof(inline_block_); blocks_memory_ += alloc_bytes_remaining_; aligned_alloc_ptr_ = inline_block_; @@ -63,9 +65,16 @@ Arena::Arena(size_t block_size, size_t huge_page_size) hugetlb_size_ = ((kBlockSize - 1U) / hugetlb_size_ + 1U) * hugetlb_size_; } #endif + if (tracker_ != nullptr) { + tracker_->Allocate(kInlineSize); + } } Arena::~Arena() { + if (tracker_ != nullptr) { + assert(tracker_->is_freed()); + tracker_->FreeMem(); + } for (const auto& block : blocks_) { delete[] block; } @@ -134,6 +143,9 @@ char* Arena::AllocateFromHugePage(size_t bytes) { // the following shouldn't throw because of the above reserve() huge_blocks_.emplace_back(MmapInfo(addr, bytes)); blocks_memory_ += bytes; + if (tracker_ != nullptr) { + tracker_->Allocate(bytes); + } return reinterpret_cast(addr); #else return nullptr; @@ -190,12 +202,22 @@ char* Arena::AllocateNewBlock(size_t block_bytes) { blocks_.reserve(blocks_.size() + 1); char* block = new char[block_bytes]; - + size_t allocated_size; #ifdef ROCKSDB_MALLOC_USABLE_SIZE - blocks_memory_ += malloc_usable_size(block); + allocated_size = malloc_usable_size(block); +#ifndef NDEBUG + // It's hard to predict what malloc_usable_size() returns. + // A callback can allow users to change the costed size. + std::pair pair(&allocated_size, &block_bytes); + TEST_SYNC_POINT_CALLBACK("Arena::AllocateNewBlock:0", &pair); +#endif // NDEBUG #else - blocks_memory_ += block_bytes; + allocated_size = block_bytes; #endif // ROCKSDB_MALLOC_USABLE_SIZE + blocks_memory_ += allocated_size; + if (tracker_ != nullptr) { + tracker_->Allocate(allocated_size); + } // the following shouldn't throw because of the above reserve() blocks_.push_back(block); return block; diff --git a/util/arena.h b/util/arena.h index 8871aef7a..94f81eeab 100644 --- a/util/arena.h +++ b/util/arena.h @@ -40,7 +40,8 @@ class Arena : public Allocator { // huge_page_size: if 0, don't use huge page TLB. If > 0 (should set to the // supported hugepage size of the system), block allocation will try huge // page TLB first. If allocation fails, will fall back to normal case. - explicit Arena(size_t block_size = kMinBlockSize, size_t huge_page_size = 0); + explicit Arena(size_t block_size = kMinBlockSize, + AllocTracker* tracker = nullptr, size_t huge_page_size = 0); ~Arena(); char* Allocate(size_t bytes) override; @@ -114,6 +115,7 @@ class Arena : public Allocator { // Bytes of memory in blocks allocated so far size_t blocks_memory_ = 0; + AllocTracker* tracker_; }; inline char* Arena::Allocate(size_t bytes) { diff --git a/util/arena_test.cc b/util/arena_test.cc index 760f6d67c..10501ac74 100644 --- a/util/arena_test.cc +++ b/util/arena_test.cc @@ -37,7 +37,7 @@ void MemoryAllocatedBytesTest(size_t huge_page_size) { size_t bsz = 32 * 1024; // block size size_t expected_memory_allocated; - Arena arena(bsz, huge_page_size); + Arena arena(bsz, nullptr, huge_page_size); // requested size > quarter of a block: // allocate requested size separately @@ -89,7 +89,7 @@ static void ApproximateMemoryUsageTest(size_t huge_page_size) { const size_t kBlockSize = 4096; const size_t kEntrySize = kBlockSize / 8; const size_t kZero = 0; - Arena arena(kBlockSize, huge_page_size); + Arena arena(kBlockSize, nullptr, huge_page_size); ASSERT_EQ(kZero, arena.ApproximateMemoryUsage()); // allocate inline bytes @@ -131,7 +131,7 @@ static void ApproximateMemoryUsageTest(size_t huge_page_size) { static void SimpleTest(size_t huge_page_size) { std::vector> allocated; - Arena arena(Arena::kMinBlockSize, huge_page_size); + Arena arena(Arena::kMinBlockSize, nullptr, huge_page_size); const int N = 100000; size_t bytes = 0; Random rnd(301); diff --git a/util/concurrent_arena.cc b/util/concurrent_arena.cc index 7e4310080..a99da1547 100644 --- a/util/concurrent_arena.cc +++ b/util/concurrent_arena.cc @@ -11,7 +11,6 @@ #include "util/concurrent_arena.h" #include -#include "port/likely.h" #include "port/port.h" #include "util/random.h" @@ -21,10 +20,11 @@ namespace rocksdb { __thread size_t ConcurrentArena::tls_cpuid = 0; #endif -ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size) +ConcurrentArena::ConcurrentArena(size_t block_size, AllocTracker* tracker, + size_t huge_page_size) : shard_block_size_(block_size / 8), shards_(), - arena_(block_size, huge_page_size) { + arena_(block_size, tracker, huge_page_size) { Fixup(); } diff --git a/util/concurrent_arena.h b/util/concurrent_arena.h index d2d448dc8..395598729 100644 --- a/util/concurrent_arena.h +++ b/util/concurrent_arena.h @@ -46,6 +46,7 @@ class ConcurrentArena : public Allocator { // shards compute their shard_block_size as a fraction of block_size // that varies according to the hardware concurrency level. explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize, + AllocTracker* tracker = nullptr, size_t huge_page_size = 0); char* Allocate(size_t bytes) override {