Divide file_reader_writer.h and .cc (#5803)

Summary:
file_reader_writer.h and .cc contain several files and helper function, and it's hard to navigate. Separate it to multiple files and put them under file/
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5803

Test Plan: Build whole project using make and cmake.

Differential Revision: D17374550

fbshipit-source-id: 10efca907721e7a78ed25bbf74dc5410dea05987
main
sdong 5 years ago committed by Facebook Github Bot
parent 915d72d849
commit b931f84e56
  1. 7
      CMakeLists.txt
  2. 7
      TARGETS
  3. 3
      db/builder.cc
  4. 3
      db/compaction/compaction_job.cc
  5. 2
      db/compaction/compaction_job_test.cc
  6. 2
      db/db_impl/db_impl.cc
  7. 2
      db/db_impl/db_impl_open.cc
  8. 1
      db/db_test.cc
  9. 2
      db/external_sst_file_ingestion_job.cc
  10. 2
      db/flush_job_test.cc
  11. 2
      db/import_column_family_job.cc
  12. 3
      db/log_reader.cc
  13. 5
      db/log_reader.h
  14. 3
      db/log_test.cc
  15. 2
      db/log_writer.cc
  16. 2
      db/repair.cc
  17. 3
      db/table_cache.cc
  18. 6
      db/table_properties_collector_test.cc
  19. 2
      db/transaction_log_impl.cc
  20. 4
      db/version_set.cc
  21. 2
      db/wal_manager.cc
  22. 2
      db/wal_manager_test.cc
  23. 133
      file/file_prefetch_buffer.cc
  24. 97
      file/file_prefetch_buffer.h
  25. 4
      file/file_util.cc
  26. 2
      file/filename.cc
  27. 188
      file/random_access_file_reader.cc
  28. 120
      file/random_access_file_reader.h
  29. 61
      file/read_write_util.cc
  30. 29
      file/read_write_util.h
  31. 170
      file/readahead_raf.cc
  32. 27
      file/readahead_raf.h
  33. 241
      file/sequence_file_reader.cc
  34. 66
      file/sequence_file_reader.h
  35. 405
      file/writable_file_writer.cc
  36. 155
      file/writable_file_writer.h
  37. 2
      logging/env_logger.h
  38. 3
      options/options_parser.cc
  39. 7
      src.mk
  40. 4
      table/block_based/block_based_table_reader.cc
  41. 2
      table/block_based/block_based_table_reader.h
  42. 1
      table/block_fetcher.cc
  43. 2
      table/cuckoo/cuckoo_table_builder.cc
  44. 3
      table/cuckoo/cuckoo_table_builder_test.cc
  45. 2
      table/cuckoo/cuckoo_table_reader.h
  46. 2
      table/format.cc
  47. 4
      table/format.h
  48. 1
      table/iterator_wrapper.h
  49. 2
      table/meta_blocks.cc
  50. 2
      table/mock_table.cc
  51. 2
      table/plain/plain_table_builder.cc
  52. 2
      table/plain/plain_table_key_coding.cc
  53. 2
      table/plain/plain_table_reader.h
  54. 2
      table/sst_file_reader.cc
  55. 2
      table/sst_file_writer.cc
  56. 2
      table/table_builder.h
  57. 2
      table/table_reader_bench.cc
  58. 4
      test_util/testutil.cc
  59. 2
      tools/sst_dump_test.cc
  60. 2
      tools/sst_dump_tool_imp.h
  61. 1
      tools/trace_analyzer_test.cc
  62. 3
      tools/trace_analyzer_tool.cc
  63. 1085
      util/file_reader_writer.cc
  64. 407
      util/file_reader_writer.h
  65. 5
      util/file_reader_writer_test.cc
  66. 2
      util/log_write_bench.cc
  67. 3
      utilities/backupable/backupable_db.cc
  68. 1
      utilities/backupable/backupable_db_test.cc
  69. 3
      utilities/blob_db/blob_db_impl.cc
  70. 5
      utilities/blob_db/blob_dump_tool.cc
  71. 2
      utilities/blob_db/blob_dump_tool.h
  72. 1
      utilities/blob_db/blob_file.cc
  73. 2
      utilities/blob_db/blob_file.h
  74. 2
      utilities/blob_db/blob_log_reader.cc
  75. 2
      utilities/blob_db/blob_log_reader.h
  76. 2
      utilities/blob_db/blob_log_writer.cc
  77. 3
      utilities/persistent_cache/block_cache_tier_file.h
  78. 2
      utilities/simulator_cache/sim_cache.cc
  79. 3
      utilities/trace/file_trace_reader_writer.cc

@ -547,9 +547,15 @@ set(SOURCES
env/env_hdfs.cc env/env_hdfs.cc
env/mock_env.cc env/mock_env.cc
file/delete_scheduler.cc file/delete_scheduler.cc
file/file_prefetch_buffer.cc
file/file_util.cc file/file_util.cc
file/filename.cc file/filename.cc
file/random_access_file_reader.cc
file/read_write_util.cc
file/readahead_raf.cc
file/sequence_file_reader.cc
file/sst_file_manager_impl.cc file/sst_file_manager_impl.cc
file/writable_file_writer.cc
logging/auto_roll_logger.cc logging/auto_roll_logger.cc
logging/event_logger.cc logging/event_logger.cc
logging/log_buffer.cc logging/log_buffer.cc
@ -639,7 +645,6 @@ set(SOURCES
util/concurrent_task_limiter_impl.cc util/concurrent_task_limiter_impl.cc
util/crc32c.cc util/crc32c.cc
util/dynamic_bloom.cc util/dynamic_bloom.cc
util/file_reader_writer.cc
util/filter_policy.cc util/filter_policy.cc
util/hash.cc util/hash.cc
util/murmurhash.cc util/murmurhash.cc

@ -176,9 +176,15 @@ cpp_library(
"env/io_posix.cc", "env/io_posix.cc",
"env/mock_env.cc", "env/mock_env.cc",
"file/delete_scheduler.cc", "file/delete_scheduler.cc",
"file/file_prefetch_buffer.cc",
"file/file_util.cc", "file/file_util.cc",
"file/filename.cc", "file/filename.cc",
"file/random_access_file_reader.cc",
"file/read_write_util.cc",
"file/readahead_raf.cc",
"file/sequence_file_reader.cc",
"file/sst_file_manager_impl.cc", "file/sst_file_manager_impl.cc",
"file/writable_file_writer.cc",
"logging/auto_roll_logger.cc", "logging/auto_roll_logger.cc",
"logging/event_logger.cc", "logging/event_logger.cc",
"logging/log_buffer.cc", "logging/log_buffer.cc",
@ -267,7 +273,6 @@ cpp_library(
"util/concurrent_task_limiter_impl.cc", "util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc", "util/crc32c.cc",
"util/dynamic_bloom.cc", "util/dynamic_bloom.cc",
"util/file_reader_writer.cc",
"util/filter_policy.cc", "util/filter_policy.cc",
"util/hash.cc", "util/hash.cc",
"util/murmurhash.cc", "util/murmurhash.cc",

@ -22,6 +22,8 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_util.h" #include "monitoring/thread_status_util.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -33,7 +35,6 @@
#include "table/format.h" #include "table/format.h"
#include "table/internal_iterator.h" #include "table/internal_iterator.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
namespace rocksdb { namespace rocksdb {

@ -34,7 +34,9 @@
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/read_write_util.h"
#include "file/sst_file_manager_impl.h" #include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
#include "logging/log_buffer.h" #include "logging/log_buffer.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
@ -52,7 +54,6 @@
#include "table/table_builder.h" #include "table/table_builder.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"

@ -17,6 +17,7 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/error_handler.h" #include "db/error_handler.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "file/writable_file_writer.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -24,7 +25,6 @@
#include "table/mock_table.h" #include "table/mock_table.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"

@ -54,6 +54,7 @@
#include "db/write_callback.h" #include "db/write_callback.h"
#include "file/file_util.h" #include "file/file_util.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "file/sst_file_manager_impl.h" #include "file/sst_file_manager_impl.h"
#include "logging/auto_roll_logger.h" #include "logging/auto_roll_logger.h"
#include "logging/log_buffer.h" #include "logging/log_buffer.h"
@ -96,7 +97,6 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"

@ -12,7 +12,9 @@
#include "db/builder.h" #include "db/builder.h"
#include "db/error_handler.h" #include "db/error_handler.h"
#include "file/read_write_util.h"
#include "file/sst_file_manager_impl.h" #include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
#include "monitoring/persistent_stats_history.h" #include "monitoring/persistent_stats_history.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "rocksdb/wal_filter.h" #include "rocksdb/wal_filter.h"

@ -62,7 +62,6 @@
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
#include "util/string_util.h" #include "util/string_util.h"

@ -16,12 +16,12 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "file/file_util.h" #include "file/file_util.h"
#include "file/random_access_file_reader.h"
#include "table/merging_iterator.h" #include "table/merging_iterator.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
#include "table/sst_file_writer_collectors.h" #include "table/sst_file_writer_collectors.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
namespace rocksdb { namespace rocksdb {

@ -11,12 +11,12 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/flush_job.h" #include "db/flush_job.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "file/writable_file_writer.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/mock_table.h" #include "table/mock_table.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h" #include "util/string_util.h"
namespace rocksdb { namespace rocksdb {

@ -9,11 +9,11 @@
#include "db/version_edit.h" #include "db/version_edit.h"
#include "file/file_util.h" #include "file/file_util.h"
#include "file/random_access_file_reader.h"
#include "table/merging_iterator.h" #include "table/merging_iterator.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
#include "table/sst_file_writer_collectors.h" #include "table/sst_file_writer_collectors.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
namespace rocksdb { namespace rocksdb {

@ -10,10 +10,11 @@
#include "db/log_reader.h" #include "db/log_reader.h"
#include <stdio.h> #include <stdio.h>
#include "file/sequence_file_reader.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/util.h" #include "util/util.h"
namespace rocksdb { namespace rocksdb {

@ -12,13 +12,12 @@
#include <stdint.h> #include <stdint.h>
#include "db/log_format.h" #include "db/log_format.h"
#include "file/sequence_file_reader.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/options.h"
namespace rocksdb { namespace rocksdb {
class SequentialFileReader;
class Logger; class Logger;
namespace log { namespace log {

@ -9,12 +9,13 @@
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/random.h" #include "util/random.h"
namespace rocksdb { namespace rocksdb {

@ -10,10 +10,10 @@
#include "db/log_writer.h" #include "db/log_writer.h"
#include <stdint.h> #include <stdint.h>
#include "file/writable_file_writer.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
namespace log { namespace log {

@ -71,6 +71,7 @@
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -78,7 +79,6 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h" #include "util/string_util.h"
namespace rocksdb { namespace rocksdb {

@ -14,7 +14,7 @@
#include "db/snapshot_impl.h" #include "db/snapshot_impl.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_based_table_reader.h"
@ -27,7 +27,6 @@
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
namespace rocksdb { namespace rocksdb {

@ -9,10 +9,11 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "file/sequence_file_reader.h"
#include "db/db_impl/db_impl.h" #include "file/writable_file_writer.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_based_table_factory.h"
@ -22,7 +23,6 @@
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -8,7 +8,7 @@
#include "db/transaction_log_impl.h" #include "db/transaction_log_impl.h"
#include <cinttypes> #include <cinttypes>
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "util/file_reader_writer.h" #include "file/sequence_file_reader.h"
namespace rocksdb { namespace rocksdb {

@ -30,6 +30,9 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_builder.h" #include "db/version_builder.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "monitoring/file_read_sample.h" #include "monitoring/file_read_sample.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/persistent_stats_history.h" #include "monitoring/persistent_stats_history.h"
@ -47,7 +50,6 @@
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/user_comparator_wrapper.h" #include "util/user_comparator_wrapper.h"

@ -20,6 +20,7 @@
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "file/file_util.h" #include "file/file_util.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/sequence_file_reader.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -28,7 +29,6 @@
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/string_util.h" #include "util/string_util.h"

@ -18,10 +18,10 @@
#include "db/version_set.h" #include "db/version_set.h"
#include "db/wal_manager.h" #include "db/wal_manager.h"
#include "env/mock_env.h" #include "env/mock_env.h"
#include "file/writable_file_writer.h"
#include "table/mock_table.h" #include "table/mock_table.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h" #include "util/string_util.h"
namespace rocksdb { namespace rocksdb {

@ -0,0 +1,133 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/file_prefetch_buffer.h"
#include <algorithm>
#include <mutex>
#include "file/random_access_file_reader.h"
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/rate_limiter.h"
namespace rocksdb {
Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,
uint64_t offset, size_t n,
bool for_compaction) {
size_t alignment = reader->file()->GetRequiredBufferAlignment();
size_t offset_ = static_cast<size_t>(offset);
uint64_t rounddown_offset = Rounddown(offset_, alignment);
uint64_t roundup_end = Roundup(offset_ + n, alignment);
uint64_t roundup_len = roundup_end - rounddown_offset;
assert(roundup_len >= alignment);
assert(roundup_len % alignment == 0);
// Check if requested bytes are in the existing buffer_.
// If all bytes exist -- return.
// If only a few bytes exist -- reuse them & read only what is really needed.
// This is typically the case of incremental reading of data.
// If no bytes exist in buffer -- full pread.
Status s;
uint64_t chunk_offset_in_buffer = 0;
uint64_t chunk_len = 0;
bool copy_data_to_new_buffer = false;
if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
offset <= buffer_offset_ + buffer_.CurrentSize()) {
if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
// All requested bytes are already in the buffer. So no need to Read
// again.
return s;
} else {
// Only a few requested bytes are in the buffer. memmove those chunk of
// bytes to the beginning, and memcpy them back into the new buffer if a
// new buffer is created.
chunk_offset_in_buffer =
Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
assert(chunk_offset_in_buffer % alignment == 0);
assert(chunk_len % alignment == 0);
assert(chunk_offset_in_buffer + chunk_len <=
buffer_offset_ + buffer_.CurrentSize());
if (chunk_len > 0) {
copy_data_to_new_buffer = true;
} else {
// this reset is not necessary, but just to be safe.
chunk_offset_in_buffer = 0;
}
}
}
// Create a new buffer only if current capacity is not sufficient, and memcopy
// bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
if (buffer_.Capacity() < roundup_len) {
buffer_.Alignment(alignment);
buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
copy_data_to_new_buffer, chunk_offset_in_buffer,
static_cast<size_t>(chunk_len));
} else if (chunk_len > 0) {
// New buffer not needed. But memmove bytes from tail to the beginning since
// chunk_len is greater than 0.
buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
static_cast<size_t>(chunk_len));
}
Slice result;
s = reader->Read(rounddown_offset + chunk_len,
static_cast<size_t>(roundup_len - chunk_len), &result,
buffer_.BufferStart() + chunk_len, for_compaction);
if (s.ok()) {
buffer_offset_ = rounddown_offset;
buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
}
return s;
}
bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n,
Slice* result, bool for_compaction) {
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
if (!enable_ || offset < buffer_offset_) {
return false;
}
// If the buffer contains only a few of the requested bytes:
// If readahead is enabled: prefetch the remaining bytes + readadhead bytes
// and satisfy the request.
// If readahead is not enabled: return false.
if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
if (readahead_size_ > 0) {
assert(file_reader_ != nullptr);
assert(max_readahead_size_ >= readahead_size_);
Status s;
if (for_compaction) {
s = Prefetch(file_reader_, offset, std::max(n, readahead_size_),
for_compaction);
} else {
s = Prefetch(file_reader_, offset, n + readahead_size_, for_compaction);
}
if (!s.ok()) {
return false;
}
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
} else {
return false;
}
}
uint64_t offset_in_buffer = offset - buffer_offset_;
*result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
return true;
}
} // namespace rocksdb

@ -0,0 +1,97 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <sstream>
#include <string>
#include "file/random_access_file_reader.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/aligned_buffer.h"
namespace rocksdb {
// FilePrefetchBuffer is a smart buffer to store and read data from a file.
class FilePrefetchBuffer {
public:
// Constructor.
//
// All arguments are optional.
// file_reader : the file reader to use. Can be a nullptr.
// readahead_size : the initial readahead size.
// max_readahead_size : the maximum readahead size.
// If max_readahead_size > readahead_size, the readahead size will be
// doubled on every IO until max_readahead_size is hit.
// Typically this is set as a multiple of readahead_size.
// max_readahead_size should be greater than equal to readahead_size.
// enable : controls whether reading from the buffer is enabled.
// If false, TryReadFromCache() always return false, and we only take stats
// for the minimum offset if track_min_offset = true.
// track_min_offset : Track the minimum offset ever read and collect stats on
// it. Used for adaptable readahead of the file footer/metadata.
//
// Automatic readhead is enabled for a file if file_reader, readahead_size,
// and max_readahead_size are passed in.
// If file_reader is a nullptr, setting readadhead_size and max_readahead_size
// does not make any sense. So it does nothing.
// A user can construct a FilePrefetchBuffer without any arguments, but use
// `Prefetch` to load data into the buffer.
FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
size_t readadhead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false)
: buffer_offset_(0),
file_reader_(file_reader),
readahead_size_(readadhead_size),
max_readahead_size_(max_readahead_size),
min_offset_read_(port::kMaxSizet),
enable_(enable),
track_min_offset_(track_min_offset) {}
// Load data into the buffer from a file.
// reader : the file reader.
// offset : the file offset to start reading from.
// n : the number of bytes to read.
// for_compaction : if prefetch is done for compaction read.
Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n,
bool for_compaction = false);
// Tries returning the data for a file raed from this buffer, if that data is
// in the buffer.
// It handles tracking the minimum read offset if track_min_offset = true.
// It also does the exponential readahead when readadhead_size is set as part
// of the constructor.
//
// offset : the file offset.
// n : the number of bytes.
// result : output buffer to put the data into.
// for_compaction : if cache read is done for compaction read.
bool TryReadFromCache(uint64_t offset, size_t n, Slice* result,
bool for_compaction = false);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.
size_t min_offset_read() const { return min_offset_read_; }
private:
AlignedBuffer buffer_;
uint64_t buffer_offset_;
RandomAccessFileReader* file_reader_;
size_t readahead_size_;
size_t max_readahead_size_;
// The minimum `offset` ever passed to TryReadFromCache().
size_t min_offset_read_;
// if false, TryReadFromCache() always return false, and we only take stats
// for track_min_offset_ if track_min_offset_ = true
bool enable_;
// If true, track minimum `offset` ever passed to TryReadFromCache(), which
// can be fetched from min_offset_read().
bool track_min_offset_;
};
} // namespace rocksdb

@ -8,9 +8,11 @@
#include <string> #include <string>
#include <algorithm> #include <algorithm>
#include "file/random_access_file_reader.h"
#include "file/sequence_file_reader.h"
#include "file/sst_file_manager_impl.h" #include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -12,10 +12,10 @@
#include <ctype.h> #include <ctype.h>
#include <stdio.h> #include <stdio.h>
#include <vector> #include <vector>
#include "file/writable_file_writer.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"

@ -0,0 +1,188 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/random_access_file_reader.h"
#include <algorithm>
#include <mutex>
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/rate_limiter.h"
namespace rocksdb {
Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
char* scratch, bool for_compaction) const {
Status s;
uint64_t elapsed = 0;
{
StopWatch sw(env_, stats_, hist_type_,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
IOSTATS_TIMER_GUARD(read_nanos);
if (use_direct_io()) {
#ifndef ROCKSDB_LITE
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset =
TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
size_t read_size =
Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(read_size);
while (buf.CurrentSize() < read_size) {
size_t allowed;
if (for_compaction && rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
} else {
assert(buf.CurrentSize() == 0);
allowed = read_size;
}
Slice tmp;
FileOperationInfo::TimePoint start_ts;
uint64_t orig_offset = 0;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
orig_offset = aligned_offset + buf.CurrentSize();
}
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp,
buf.Destination());
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
s);
}
buf.Size(buf.CurrentSize() + tmp.size());
if (!s.ok() || tmp.size() < allowed) {
break;
}
}
size_t res_len = 0;
if (s.ok() && offset_advance < buf.CurrentSize()) {
res_len = buf.Read(scratch, offset_advance,
std::min(buf.CurrentSize() - offset_advance, n));
}
*result = Slice(scratch, res_len);
#endif // !ROCKSDB_LITE
} else {
size_t pos = 0;
const char* res_scratch = nullptr;
while (pos < n) {
size_t allowed;
if (for_compaction && rate_limiter_ != nullptr) {
if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
sw.DelayStart();
}
allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
Env::IOPriority::IO_LOW, stats_,
RateLimiter::OpType::kRead);
if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
sw.DelayStop();
}
} else {
allowed = n;
}
Slice tmp_result;
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
finish_ts, s);
}
#endif
if (res_scratch == nullptr) {
// we can't simply use `scratch` because reads of mmap'd files return
// data in a different buffer.
res_scratch = tmp_result.data();
} else {
// make sure chunks are inserted contiguously into `res_scratch`.
assert(tmp_result.data() == res_scratch + pos);
}
pos += tmp_result.size();
if (!s.ok() || tmp_result.size() < allowed) {
break;
}
}
*result = Slice(res_scratch, s.ok() ? pos : 0);
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
SetPerfLevel(prev_perf_level);
}
if (stats_ != nullptr && file_read_hist_ != nullptr) {
file_read_hist_->Add(elapsed);
}
return s;
}
Status RandomAccessFileReader::MultiRead(ReadRequest* read_reqs,
size_t num_reqs) const {
Status s;
uint64_t elapsed = 0;
assert(!use_direct_io());
{
StopWatch sw(env_, stats_, hist_type_,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
IOSTATS_TIMER_GUARD(read_nanos);
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif // ROCKSDB_LITE
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->MultiRead(read_reqs, num_reqs);
}
for (size_t i = 0; i < num_reqs; ++i) {
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
start_ts, finish_ts, read_reqs[i].status);
}
#endif // ROCKSDB_LITE
IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
}
SetPerfLevel(prev_perf_level);
}
if (stats_ != nullptr && file_read_hist_ != nullptr) {
file_read_hist_->Add(elapsed);
}
return s;
}
} // namespace rocksdb

@ -0,0 +1,120 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <sstream>
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
#include "util/aligned_buffer.h"
namespace rocksdb {
class Statistics;
class HistogramImpl;
// RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is
// responsible for:
// - Handling Buffered and Direct reads appropriately.
// - Rate limiting compaction reads.
// - Notifying any interested listeners on the completion of a read.
// - Updating IO stats.
class RandomAccessFileReader {
private:
#ifndef ROCKSDB_LITE
void NotifyOnFileReadFinish(uint64_t offset, size_t length,
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const Status& status) const {
FileOperationInfo info(file_name_, start_ts, finish_ts);
info.offset = offset;
info.length = length;
info.status = status;
for (auto& listener : listeners_) {
listener->OnFileReadFinish(info);
}
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
std::unique_ptr<RandomAccessFile> file_;
std::string file_name_;
Env* env_;
Statistics* stats_;
uint32_t hist_type_;
HistogramImpl* file_read_hist_;
RateLimiter* rate_limiter_;
std::vector<std::shared_ptr<EventListener>> listeners_;
public:
explicit RandomAccessFileReader(
std::unique_ptr<RandomAccessFile>&& raf, std::string _file_name,
Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: file_(std::move(raf)),
file_name_(std::move(_file_name)),
env_(env),
stats_(stats),
hist_type_(hist_type),
file_read_hist_(file_read_hist),
rate_limiter_(rate_limiter),
listeners_() {
#ifndef ROCKSDB_LITE
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
listeners_.emplace_back(e);
}
});
#else // !ROCKSDB_LITE
(void)listeners;
#endif
}
RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
RandomAccessFileReader& operator=(RandomAccessFileReader&& o)
ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
env_ = std::move(o.env_);
stats_ = std::move(o.stats_);
hist_type_ = std::move(o.hist_type_);
file_read_hist_ = std::move(o.file_read_hist_);
rate_limiter_ = std::move(o.rate_limiter_);
return *this;
}
RandomAccessFileReader(const RandomAccessFileReader&) = delete;
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch,
bool for_compaction = false) const;
Status MultiRead(ReadRequest* reqs, size_t num_reqs) const;
Status Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n);
}
RandomAccessFile* file() { return file_.get(); }
std::string file_name() const { return file_name_; }
bool use_direct_io() const { return file_->use_direct_io(); }
};
} // namespace rocksdb

@ -0,0 +1,61 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/read_write_util.h"
#include <sstream>
#include "test_util/sync_point.h"
namespace rocksdb {
Status NewWritableFile(Env* env, const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
Status s = env->NewWritableFile(fname, result, options);
TEST_KILL_RANDOM("NewWritableFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
return s;
}
bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file,
std::string* output, bool* has_data, Status* result) {
const int kBufferSize = 8192;
char buffer[kBufferSize + 1];
Slice input_slice;
std::string line;
bool has_complete_line = false;
while (!has_complete_line) {
if (std::getline(*iss, line)) {
has_complete_line = !iss->eof();
} else {
has_complete_line = false;
}
if (!has_complete_line) {
// if we're not sure whether we have a complete line,
// further read from the file.
if (*has_data) {
*result = seq_file->Read(kBufferSize, &input_slice, buffer);
}
if (input_slice.size() == 0) {
// meaning we have read all the data
*has_data = false;
break;
} else {
iss->str(line + input_slice.ToString());
// reset the internal state of iss so that we can keep reading it.
iss->clear();
*has_data = (input_slice.size() == kBufferSize);
continue;
}
}
}
*output = line;
return *has_data || has_complete_line;
}
} // namespace rocksdb

@ -0,0 +1,29 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include "rocksdb/env.h"
namespace rocksdb {
// Returns a WritableFile.
//
// env : the Env.
// fname : the file name.
// result : output arg. A WritableFile based on `fname` returned.
// options : the Env Options.
extern Status NewWritableFile(Env* env, const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options);
// Read a single line from a file.
bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file,
std::string* output, bool* has_data, Status* result);
} // namespace rocksdb

@ -0,0 +1,170 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/readahead_raf.h"
#include <algorithm>
#include <mutex>
#include "util/aligned_buffer.h"
#include "util/rate_limiter.h"
namespace rocksdb {
#ifndef NDEBUG
namespace {
bool IsFileSectorAligned(const size_t off, size_t sector_size) {
return off % sector_size == 0;
}
} // namespace
#endif
namespace {
class ReadaheadRandomAccessFile : public RandomAccessFile {
public:
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
alignment_(file_->GetRequiredBufferAlignment()),
readahead_size_(Roundup(readahead_size, alignment_)),
buffer_(),
buffer_offset_(0) {
buffer_.Alignment(alignment_);
buffer_.AllocateNewBuffer(readahead_size_);
}
ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
delete;
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
// Read-ahead only make sense if we have some slack left after reading
if (n + alignment_ >= readahead_size_) {
return file_->Read(offset, n, result, scratch);
}
std::unique_lock<std::mutex> lk(lock_);
size_t cached_len = 0;
// Check if there is a cache hit, meaning that [offset, offset + n) is
// either completely or partially in the buffer. If it's completely cached,
// including end of file case when offset + n is greater than EOF, then
// return.
if (TryReadFromCache(offset, n, &cached_len, scratch) &&
(cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
// We read exactly what we needed, or we hit end of file - return.
*result = Slice(scratch, cached_len);
return Status::OK();
}
size_t advanced_offset = static_cast<size_t>(offset + cached_len);
// In the case of cache hit advanced_offset is already aligned, means that
// chunk_offset equals to advanced_offset
size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
Status s = ReadIntoBuffer(chunk_offset, readahead_size_);
if (s.ok()) {
// The data we need is now in cache, so we can safely read it
size_t remaining_len;
TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
scratch + cached_len);
*result = Slice(scratch, cached_len + remaining_len);
}
return s;
}
Status Prefetch(uint64_t offset, size_t n) override {
if (n < readahead_size_) {
// Don't allow smaller prefetches than the configured `readahead_size_`.
// `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
return Status::OK();
}
std::unique_lock<std::mutex> lk(lock_);
size_t offset_ = static_cast<size_t>(offset);
size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
if (prefetch_offset == buffer_offset_) {
return Status::OK();
}
return ReadIntoBuffer(prefetch_offset,
Roundup(offset_ + n, alignment_) - prefetch_offset);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return file_->GetUniqueId(id, max_size);
}
void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
Status InvalidateCache(size_t offset, size_t length) override {
std::unique_lock<std::mutex> lk(lock_);
buffer_.Clear();
return file_->InvalidateCache(offset, length);
}
bool use_direct_io() const override { return file_->use_direct_io(); }
private:
// Tries to read from buffer_ n bytes starting at offset. If anything was read
// from the cache, it sets cached_len to the number of bytes actually read,
// copies these number of bytes to scratch and returns true.
// If nothing was read sets cached_len to 0 and returns false.
bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
char* scratch) const {
if (offset < buffer_offset_ ||
offset >= buffer_offset_ + buffer_.CurrentSize()) {
*cached_len = 0;
return false;
}
uint64_t offset_in_buffer = offset - buffer_offset_;
*cached_len = std::min(
buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
return true;
}
// Reads into buffer_ the next n bytes from file_ starting at offset.
// Can actually read less if EOF was reached.
// Returns the status of the read operastion on the file.
Status ReadIntoBuffer(uint64_t offset, size_t n) const {
if (n > buffer_.Capacity()) {
n = buffer_.Capacity();
}
assert(IsFileSectorAligned(offset, alignment_));
assert(IsFileSectorAligned(n, alignment_));
Slice result;
Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
if (s.ok()) {
buffer_offset_ = offset;
buffer_.Size(result.size());
assert(result.size() == 0 || buffer_.BufferStart() == result.data());
}
return s;
}
const std::unique_ptr<RandomAccessFile> file_;
const size_t alignment_;
const size_t readahead_size_;
mutable std::mutex lock_;
// The buffer storing the prefetched data
mutable AlignedBuffer buffer_;
// The offset in file_, corresponding to data stored in buffer_
mutable uint64_t buffer_offset_;
};
} // namespace
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
std::unique_ptr<RandomAccessFile> result(
new ReadaheadRandomAccessFile(std::move(file), readahead_size));
return result;
}
} // namespace rocksdb

@ -0,0 +1,27 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include "rocksdb/env.h"
namespace rocksdb {
// This file provides the following main abstractions:
// SequentialFileReader : wrapper over Env::SequentialFile
// RandomAccessFileReader : wrapper over Env::RandomAccessFile
// WritableFileWriter : wrapper over Env::WritableFile
// In addition, it also exposed NewReadaheadRandomAccessFile, NewWritableFile,
// and ReadOneLine primitives.
// NewReadaheadRandomAccessFile provides a wrapper over RandomAccessFile to
// always prefetch additional data with every read. This is mainly used in
// Compaction Table Readers.
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size);
} // namespace rocksdb

@ -0,0 +1,241 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/sequence_file_reader.h"
#include <algorithm>
#include <mutex>
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "test_util/sync_point.h"
#include "util/aligned_buffer.h"
#include "util/random.h"
#include "util/rate_limiter.h"
namespace rocksdb {
#ifndef NDEBUG
namespace {
bool IsFileSectorAligned(const size_t off, size_t sector_size) {
return off % sector_size == 0;
}
} // namespace
#endif
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s;
if (use_direct_io()) {
#ifndef ROCKSDB_LITE
size_t offset = offset_.fetch_add(n);
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
size_t r = 0;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
s = file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart());
if (s.ok() && offset_advance < tmp.size()) {
buf.Size(tmp.size());
r = buf.Read(scratch, offset_advance,
std::min(tmp.size() - offset_advance, n));
}
*result = Slice(scratch, r);
#endif // !ROCKSDB_LITE
} else {
s = file_->Read(n, result, scratch);
}
IOSTATS_ADD(bytes_read, result->size());
return s;
}
Status SequentialFileReader::Skip(uint64_t n) {
#ifndef ROCKSDB_LITE
if (use_direct_io()) {
offset_ += static_cast<size_t>(n);
return Status::OK();
}
#endif // !ROCKSDB_LITE
return file_->Skip(n);
}
namespace {
// This class wraps a SequentialFile, exposing same API, with the differenece
// of being able to prefetch up to readahead_size bytes and then serve them
// from memory, avoiding the entire round-trip if, for example, the data for the
// file is actually remote.
class ReadaheadSequentialFile : public SequentialFile {
public:
ReadaheadSequentialFile(std::unique_ptr<SequentialFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
alignment_(file_->GetRequiredBufferAlignment()),
readahead_size_(Roundup(readahead_size, alignment_)),
buffer_(),
buffer_offset_(0),
read_offset_(0) {
buffer_.Alignment(alignment_);
buffer_.AllocateNewBuffer(readahead_size_);
}
ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
Status Read(size_t n, Slice* result, char* scratch) override {
std::unique_lock<std::mutex> lk(lock_);
size_t cached_len = 0;
// Check if there is a cache hit, meaning that [offset, offset + n) is
// either completely or partially in the buffer. If it's completely cached,
// including end of file case when offset + n is greater than EOF, then
// return.
if (TryReadFromCache(n, &cached_len, scratch) &&
(cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
// We read exactly what we needed, or we hit end of file - return.
*result = Slice(scratch, cached_len);
return Status::OK();
}
n -= cached_len;
Status s;
// Read-ahead only make sense if we have some slack left after reading
if (n + alignment_ >= readahead_size_) {
s = file_->Read(n, result, scratch + cached_len);
if (s.ok()) {
read_offset_ += result->size();
*result = Slice(scratch, cached_len + result->size());
}
buffer_.Clear();
return s;
}
s = ReadIntoBuffer(readahead_size_);
if (s.ok()) {
// The data we need is now in cache, so we can safely read it
size_t remaining_len;
TryReadFromCache(n, &remaining_len, scratch + cached_len);
*result = Slice(scratch, cached_len + remaining_len);
}
return s;
}
Status Skip(uint64_t n) override {
std::unique_lock<std::mutex> lk(lock_);
Status s = Status::OK();
// First check if we need to skip already cached data
if (buffer_.CurrentSize() > 0) {
// Do we need to skip beyond cached data?
if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
// Yes. Skip whaterver is in memory and adjust offset accordingly
n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
read_offset_ = buffer_offset_ + buffer_.CurrentSize();
} else {
// No. The entire section to be skipped is entirely i cache.
read_offset_ += n;
n = 0;
}
}
if (n > 0) {
// We still need to skip more, so call the file API for skipping
s = file_->Skip(n);
if (s.ok()) {
read_offset_ += n;
}
buffer_.Clear();
}
return s;
}
Status PositionedRead(uint64_t offset, size_t n, Slice* result,
char* scratch) override {
return file_->PositionedRead(offset, n, result, scratch);
}
Status InvalidateCache(size_t offset, size_t length) override {
std::unique_lock<std::mutex> lk(lock_);
buffer_.Clear();
return file_->InvalidateCache(offset, length);
}
bool use_direct_io() const override { return file_->use_direct_io(); }
private:
// Tries to read from buffer_ n bytes. If anything was read from the cache, it
// sets cached_len to the number of bytes actually read, copies these number
// of bytes to scratch and returns true.
// If nothing was read sets cached_len to 0 and returns false.
bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
if (read_offset_ < buffer_offset_ ||
read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
*cached_len = 0;
return false;
}
uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
*cached_len = std::min(
buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
read_offset_ += *cached_len;
return true;
}
// Reads into buffer_ the next n bytes from file_.
// Can actually read less if EOF was reached.
// Returns the status of the read operastion on the file.
Status ReadIntoBuffer(size_t n) {
if (n > buffer_.Capacity()) {
n = buffer_.Capacity();
}
assert(IsFileSectorAligned(n, alignment_));
Slice result;
Status s = file_->Read(n, &result, buffer_.BufferStart());
if (s.ok()) {
buffer_offset_ = read_offset_;
buffer_.Size(result.size());
assert(result.size() == 0 || buffer_.BufferStart() == result.data());
}
return s;
}
const std::unique_ptr<SequentialFile> file_;
const size_t alignment_;
const size_t readahead_size_;
std::mutex lock_;
// The buffer storing the prefetched data
AlignedBuffer buffer_;
// The offset in file_, corresponding to data stored in buffer_
uint64_t buffer_offset_;
// The offset up to which data was read from file_. In fact, it can be larger
// than the actual file size, since the file_->Skip(n) call doesn't return the
// actual number of bytes that were skipped, which can be less than n.
// This is not a problemm since read_offset_ is monotonically increasing and
// its only use is to figure out if next piece of data should be read from
// buffer_ or file_ directly.
uint64_t read_offset_;
};
} // namespace
std::unique_ptr<SequentialFile>
SequentialFileReader::NewReadaheadSequentialFile(
std::unique_ptr<SequentialFile>&& file, size_t readahead_size) {
if (file->GetRequiredBufferAlignment() >= readahead_size) {
// Short-circuit and return the original file if readahead_size is
// too small and hence doesn't make sense to be used for prefetching.
return std::move(file);
}
std::unique_ptr<SequentialFile> result(
new ReadaheadSequentialFile(std::move(file), readahead_size));
return result;
}
} // namespace rocksdb

@ -0,0 +1,66 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
namespace rocksdb {
// SequentialFileReader is a wrapper on top of Env::SequentialFile. It handles
// Buffered (i.e when page cache is enabled) and Direct (with O_DIRECT / page
// cache disabled) reads appropriately, and also updates the IO stats.
class SequentialFileReader {
private:
std::unique_ptr<SequentialFile> file_;
std::string file_name_;
std::atomic<size_t> offset_{0}; // read offset
public:
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
const std::string& _file_name)
: file_(std::move(_file)), file_name_(_file_name) {}
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
const std::string& _file_name,
size_t _readahead_size)
: file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)),
file_name_(_file_name) {}
SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
return *this;
}
SequentialFileReader(const SequentialFileReader&) = delete;
SequentialFileReader& operator=(const SequentialFileReader&) = delete;
Status Read(size_t n, Slice* result, char* scratch);
Status Skip(uint64_t n);
SequentialFile* file() { return file_.get(); }
std::string file_name() { return file_name_; }
bool use_direct_io() const { return file_->use_direct_io(); }
private:
// NewReadaheadSequentialFile provides a wrapper over SequentialFile to
// always prefetch additional data with every read.
static std::unique_ptr<SequentialFile> NewReadaheadSequentialFile(
std::unique_ptr<SequentialFile>&& file, size_t readahead_size);
};
} // namespace rocksdb

@ -0,0 +1,405 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "file/writable_file_writer.h"
#include <algorithm>
#include <mutex>
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/rate_limiter.h"
namespace rocksdb {
Status WritableFileWriter::Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();
Status s;
pending_sync_ = true;
TEST_KILL_RANDOM("WritableFileWriter::Append:0",
rocksdb_kill_odds * REDUCE_ODDS2);
{
IOSTATS_TIMER_GUARD(prepare_write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
}
// See whether we need to enlarge the buffer to avoid the flush
if (buf_.Capacity() - buf_.CurrentSize() < left) {
for (size_t cap = buf_.Capacity();
cap < max_buffer_size_; // There is still room to increase
cap *= 2) {
// See whether the next available size is large enough.
// Buffer will never be increased to more than max_buffer_size_.
size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
if (desired_capacity - buf_.CurrentSize() >= left ||
(use_direct_io() && desired_capacity == max_buffer_size_)) {
buf_.AllocateNewBuffer(desired_capacity, true);
break;
}
}
}
// Flush only when buffered I/O
if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
if (buf_.CurrentSize() > 0) {
s = Flush();
if (!s.ok()) {
return s;
}
}
assert(buf_.CurrentSize() == 0);
}
// We never write directly to disk with direct I/O on.
// or we simply use it for its original purpose to accumulate many small
// chunks
if (use_direct_io() || (buf_.Capacity() >= left)) {
while (left > 0) {
size_t appended = buf_.Append(src, left);
left -= appended;
src += appended;
if (left > 0) {
s = Flush();
if (!s.ok()) {
break;
}
}
}
} else {
// Writing directly to file bypassing the buffer
assert(buf_.CurrentSize() == 0);
s = WriteBuffered(src, left);
}
TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
if (s.ok()) {
filesize_ += data.size();
}
return s;
}
Status WritableFileWriter::Pad(const size_t pad_bytes) {
assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize();
// Assume pad_bytes is small compared to buf_ capacity. So we always
// use buf_ rather than write directly to file in certain cases like
// Append() does.
while (left) {
size_t append_bytes = std::min(cap, left);
buf_.PadWith(append_bytes, 0);
left -= append_bytes;
if (left > 0) {
Status s = Flush();
if (!s.ok()) {
return s;
}
}
cap = buf_.Capacity() - buf_.CurrentSize();
}
pending_sync_ = true;
filesize_ += pad_bytes;
return Status::OK();
}
Status WritableFileWriter::Close() {
// Do not quit immediately on failure the file MUST be closed
Status s;
// Possible to close it twice now as we MUST close
// in __dtor, simply flushing is not enough
// Windows when pre-allocating does not fill with zeros
// also with unbuffered access we also set the end of data.
if (!writable_file_) {
return s;
}
s = Flush(); // flush cache to OS
Status interim;
// In direct I/O mode we write whole pages so
// we need to let the file know where data ends.
if (use_direct_io()) {
interim = writable_file_->Truncate(filesize_);
if (interim.ok()) {
interim = writable_file_->Fsync();
}
if (!interim.ok() && s.ok()) {
s = interim;
}
}
TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
interim = writable_file_->Close();
if (!interim.ok() && s.ok()) {
s = interim;
}
writable_file_.reset();
TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
return s;
}
// write out the cached data to the OS cache or storage if direct I/O
// enabled
Status WritableFileWriter::Flush() {
Status s;
TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
rocksdb_kill_odds * REDUCE_ODDS2);
if (buf_.CurrentSize() > 0) {
if (use_direct_io()) {
#ifndef ROCKSDB_LITE
if (pending_sync_) {
s = WriteDirect();
}
#endif // !ROCKSDB_LITE
} else {
s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
}
if (!s.ok()) {
return s;
}
}
s = writable_file_->Flush();
if (!s.ok()) {
return s;
}
// sync OS cache to disk for every bytes_per_sync_
// TODO: give log file and sst file different options (log
// files could be potentially cached in OS for their whole
// life time, thus we might not want to flush at all).
// We try to avoid sync to the last 1MB of data. For two reasons:
// (1) avoid rewrite the same page that is modified later.
// (2) for older version of OS, write can block while writing out
// the page.
// Xfs does neighbor page flushing outside of the specified ranges. We
// need to make sure sync range is far from the write offset.
if (!use_direct_io() && bytes_per_sync_) {
const uint64_t kBytesNotSyncRange =
1024 * 1024; // recent 1MB is not synced.
const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
if (filesize_ > kBytesNotSyncRange) {
uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
assert(offset_sync_to >= last_sync_size_);
if (offset_sync_to > 0 &&
offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
last_sync_size_ = offset_sync_to;
}
}
}
return s;
}
Status WritableFileWriter::Sync(bool use_fsync) {
Status s = Flush();
if (!s.ok()) {
return s;
}
TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
if (!use_direct_io() && pending_sync_) {
s = SyncInternal(use_fsync);
if (!s.ok()) {
return s;
}
}
TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
pending_sync_ = false;
return Status::OK();
}
Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
if (!writable_file_->IsSyncThreadSafe()) {
return Status::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because "
"WritableFile::IsSyncThreadSafe() is false");
}
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
Status s = SyncInternal(use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
return s;
}
Status WritableFileWriter::SyncInternal(bool use_fsync) {
Status s;
IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
if (use_fsync) {
s = writable_file_->Fsync();
} else {
s = writable_file_->Sync();
}
SetPerfLevel(prev_perf_level);
return s;
}
Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
IOSTATS_TIMER_GUARD(range_sync_nanos);
TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
return writable_file_->RangeSync(offset, nbytes);
}
// This method writes to disk the specified data and makes use of the rate
// limiter if available
Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
Status s;
assert(!use_direct_io());
const char* src = data;
size_t left = size;
while (left > 0) {
size_t allowed;
if (rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
RateLimiter::OpType::kWrite);
} else {
allowed = left;
}
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize();
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
old_size = next_write_offset_;
}
#endif
{
auto prev_perf_level = GetPerfLevel();
IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
s = writable_file_->Append(Slice(src, allowed));
SetPerfLevel(prev_perf_level);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
}
#endif
if (!s.ok()) {
return s;
}
}
IOSTATS_ADD(bytes_written, allowed);
TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
left -= allowed;
src += allowed;
}
buf_.Size(0);
return s;
}
// This flushes the accumulated data in the buffer. We pad data with zeros if
// necessary to the whole page.
// However, during automatic flushes padding would not be necessary.
// We always use RateLimiter if available. We move (Refit) any buffer bytes
// that are left over the
// whole number of pages to be written again on the next flush because we can
// only write on aligned
// offsets.
#ifndef ROCKSDB_LITE
Status WritableFileWriter::WriteDirect() {
assert(use_direct_io());
Status s;
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write
// it again in the future either on Close() OR when the current whole page
// fills out
size_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up and pad
buf_.PadToAlignmentWith(0);
const char* src = buf_.BufferStart();
uint64_t write_offset = next_write_offset_;
size_t left = buf_.CurrentSize();
while (left > 0) {
// Check how much is allowed
size_t size;
if (rate_limiter_ != nullptr) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(),
writable_file_->GetIOPriority(),
stats_, RateLimiter::OpType::kWrite);
} else {
size = left;
}
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
// direct writes must be positional
s = writable_file_->PositionedAppend(Slice(src, size), write_offset);
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
}
if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
return s;
}
}
IOSTATS_ADD(bytes_written, size);
left -= size;
src += size;
write_offset += size;
assert((next_write_offset_ % alignment) == 0);
}
if (s.ok()) {
// Move the tail to the beginning of the buffer
// This never happens during normal Append but rather during
// explicit call to Flush()/Sync() or Close()
buf_.RefitTail(file_advance, leftover_tail);
// This is where we start writing next time which may or not be
// the actual file size on disk. They match if the buffer size
// is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind
next_write_offset_ += file_advance;
}
return s;
}
#endif // !ROCKSDB_LITE
} // namespace rocksdb

@ -0,0 +1,155 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
#include "test_util/sync_point.h"
#include "util/aligned_buffer.h"
namespace rocksdb {
class Statistics;
// WritableFileWriter is a wrapper on top of Env::WritableFile. It provides
// facilities to:
// - Handle Buffered and Direct writes.
// - Rate limit writes.
// - Flush and Sync the data to the underlying filesystem.
// - Notify any interested listeners on the completion of a write.
// - Update IO stats.
class WritableFileWriter {
private:
#ifndef ROCKSDB_LITE
void NotifyOnFileWriteFinish(uint64_t offset, size_t length,
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const Status& status) {
FileOperationInfo info(file_name_, start_ts, finish_ts);
info.offset = offset;
info.length = length;
info.status = status;
for (auto& listener : listeners_) {
listener->OnFileWriteFinish(info);
}
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
std::unique_ptr<WritableFile> writable_file_;
std::string file_name_;
Env* env_;
AlignedBuffer buf_;
size_t max_buffer_size_;
// Actually written data size can be used for truncate
// not counting padding data
uint64_t filesize_;
#ifndef ROCKSDB_LITE
// This is necessary when we use unbuffered access
// and writes must happen on aligned offsets
// so we need to go back and write that page again
uint64_t next_write_offset_;
#endif // ROCKSDB_LITE
bool pending_sync_;
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
Statistics* stats_;
std::vector<std::shared_ptr<EventListener>> listeners_;
public:
WritableFileWriter(
std::unique_ptr<WritableFile>&& file, const std::string& _file_name,
const EnvOptions& options, Env* env = nullptr,
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: writable_file_(std::move(file)),
file_name_(_file_name),
env_(env),
buf_(),
max_buffer_size_(options.writable_file_max_buffer_size),
filesize_(0),
#ifndef ROCKSDB_LITE
next_write_offset_(0),
#endif // ROCKSDB_LITE
pending_sync_(false),
last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter),
stats_(stats),
listeners_() {
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
#ifndef ROCKSDB_LITE
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
listeners_.emplace_back(e);
}
});
#else // !ROCKSDB_LITE
(void)listeners;
#endif
}
WritableFileWriter(const WritableFileWriter&) = delete;
WritableFileWriter& operator=(const WritableFileWriter&) = delete;
~WritableFileWriter() { Close(); }
std::string file_name() const { return file_name_; }
Status Append(const Slice& data);
Status Pad(const size_t pad_bytes);
Status Flush();
Status Close();
Status Sync(bool use_fsync);
// Sync only the data that was already Flush()ed. Safe to call concurrently
// with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
// returns NotSupported status.
Status SyncWithoutFlush(bool use_fsync);
uint64_t GetFileSize() const { return filesize_; }
Status InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);
}
WritableFile* writable_file() const { return writable_file_.get(); }
bool use_direct_io() { return writable_file_->use_direct_io(); }
bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
private:
// Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE
Status WriteDirect();
#endif // !ROCKSDB_LITE
// Normal write
Status WriteBuffered(const char* data, size_t size);
Status RangeSync(uint64_t offset, uint64_t nbytes);
Status SyncInternal(bool use_fsync);
};
} // namespace rocksdb

@ -16,11 +16,11 @@
#include "port/sys_time.h" #include "port/sys_time.h"
#include <time.h> #include <time.h>
#include "file/writable_file_writer.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
namespace rocksdb { namespace rocksdb {

@ -13,12 +13,13 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "port/port.h" #include "port/port.h"

@ -71,9 +71,15 @@ LIB_SOURCES = \
env/io_posix.cc \ env/io_posix.cc \
env/mock_env.cc \ env/mock_env.cc \
file/delete_scheduler.cc \ file/delete_scheduler.cc \
file/file_prefetch_buffer.cc \
file/file_util.cc \ file/file_util.cc \
file/filename.cc \ file/filename.cc \
file/random_access_file_reader.cc \
file/read_write_util.cc \
file/readahead_raf.cc \
file/sequence_file_reader.cc \
file/sst_file_manager_impl.cc \ file/sst_file_manager_impl.cc \
file/writable_file_writer.cc \
logging/auto_roll_logger.cc \ logging/auto_roll_logger.cc \
logging/event_logger.cc \ logging/event_logger.cc \
logging/log_buffer.cc \ logging/log_buffer.cc \
@ -159,7 +165,6 @@ LIB_SOURCES = \
util/concurrent_task_limiter_impl.cc \ util/concurrent_task_limiter_impl.cc \
util/crc32c.cc \ util/crc32c.cc \
util/dynamic_bloom.cc \ util/dynamic_bloom.cc \
util/file_reader_writer.cc \
util/filter_policy.cc \ util/filter_policy.cc \
util/hash.cc \ util/hash.cc \
util/murmurhash.cc \ util/murmurhash.cc \

@ -18,6 +18,9 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/pinned_iterators_manager.h" #include "db/pinned_iterators_manager.h"
#include "file/file_prefetch_buffer.h"
#include "file/random_access_file_reader.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -49,7 +52,6 @@
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"

@ -18,6 +18,7 @@
#include "db/range_tombstone_fragmenter.h" #include "db/range_tombstone_fragmenter.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/persistent_cache.h" #include "rocksdb/persistent_cache.h"
@ -39,7 +40,6 @@
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "trace_replay/block_cache_tracer.h" #include "trace_replay/block_cache_tracer.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/user_comparator_wrapper.h" #include "util/user_comparator_wrapper.h"
namespace rocksdb { namespace rocksdb {

@ -23,7 +23,6 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"

@ -13,6 +13,7 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/writable_file_writer.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "table/block_based/block_builder.h" #include "table/block_based/block_builder.h"
@ -20,7 +21,6 @@
#include "table/format.h" #include "table/format.h"
#include "table/meta_blocks.h" #include "table/meta_blocks.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/file_reader_writer.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"

@ -10,11 +10,12 @@
#include <map> #include <map>
#include <utility> #include <utility>
#include "file/random_access_file_reader.h"
#include "file/writable_file_writer.h"
#include "table/cuckoo/cuckoo_table_builder.h" #include "table/cuckoo/cuckoo_table_builder.h"
#include "table/meta_blocks.h" #include "table/meta_blocks.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
extern const uint64_t kCuckooTableMagicNumber; extern const uint64_t kCuckooTableMagicNumber;

@ -15,11 +15,11 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/random_access_file_reader.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "table/table_reader.h" #include "table/table_reader.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -13,6 +13,7 @@
#include <string> #include <string>
#include "block_fetcher.h" #include "block_fetcher.h"
#include "file/random_access_file_reader.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "memory/memory_allocator.h" #include "memory/memory_allocator.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
@ -24,7 +25,6 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"

@ -17,6 +17,9 @@
#include <malloc.h> #include <malloc.h>
#endif #endif
#endif #endif
#include "file/file_prefetch_buffer.h"
#include "file/random_access_file_reader.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -27,7 +30,6 @@
#include "port/port.h" // noexcept #include "port/port.h" // noexcept
#include "table/persistent_cache_options.h" #include "table/persistent_cache_options.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/xxhash.h" #include "util/xxhash.h"
namespace rocksdb { namespace rocksdb {

@ -12,6 +12,7 @@
#include <set> #include <set>
#include "table/internal_iterator.h" #include "table/internal_iterator.h"
#include "test_util/sync_point.h"
namespace rocksdb { namespace rocksdb {

@ -9,6 +9,7 @@
#include "block_fetcher.h" #include "block_fetcher.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "file/random_access_file_reader.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "table/block_based/block.h" #include "table/block_based/block.h"
@ -18,7 +19,6 @@
#include "table/table_properties_internal.h" #include "table/table_properties_internal.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -6,11 +6,11 @@
#include "table/mock_table.h" #include "table/mock_table.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/random_access_file_reader.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "table/get_context.h" #include "table/get_context.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
namespace mock { namespace mock {

@ -13,6 +13,7 @@
#include <map> #include <map>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/writable_file_writer.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/filter_policy.h" #include "rocksdb/filter_policy.h"
@ -26,7 +27,6 @@
#include "table/plain/plain_table_index.h" #include "table/plain/plain_table_index.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
namespace rocksdb { namespace rocksdb {

@ -9,9 +9,9 @@
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/writable_file_writer.h"
#include "table/plain/plain_table_factory.h" #include "table/plain/plain_table_factory.h"
#include "table/plain/plain_table_reader.h" #include "table/plain/plain_table_reader.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -13,6 +13,7 @@
#include <stdint.h> #include <stdint.h>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/random_access_file_reader.h"
#include "memory/arena.h" #include "memory/arena.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -23,7 +24,6 @@
#include "table/plain/plain_table_factory.h" #include "table/plain/plain_table_factory.h"
#include "table/plain/plain_table_index.h" #include "table/plain/plain_table_index.h"
#include "table/table_reader.h" #include "table/table_reader.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -9,11 +9,11 @@
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/random_access_file_reader.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "table/get_context.h" #include "table/get_context.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "table/table_reader.h" #include "table/table_reader.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -8,11 +8,11 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/writable_file_writer.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "table/block_based/block_based_table_builder.h" #include "table/block_based/block_based_table_builder.h"
#include "table/sst_file_writer_collectors.h" #include "table/sst_file_writer_collectors.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -15,11 +15,11 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "trace_replay/block_cache_tracer.h" #include "trace_replay/block_cache_tracer.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -13,6 +13,7 @@ int main() {
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/random_access_file_reader.h"
#include "monitoring/histogram.h" #include "monitoring/histogram.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
@ -24,7 +25,6 @@ int main() {
#include "table/table_builder.h" #include "table/table_builder.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_reader_writer.h"
#include "util/gflags_compat.h" #include "util/gflags_compat.h"
using GFLAGS_NAMESPACE::ParseCommandLineFlags; using GFLAGS_NAMESPACE::ParseCommandLineFlags;

@ -15,8 +15,10 @@
#include <sstream> #include <sstream>
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include "file/random_access_file_reader.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "port/port.h" #include "port/port.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
namespace test { namespace test {

@ -12,12 +12,12 @@
#include <stdint.h> #include <stdint.h>
#include "rocksdb/sst_dump_tool.h" #include "rocksdb/sst_dump_tool.h"
#include "file/random_access_file_reader.h"
#include "rocksdb/filter_policy.h" #include "rocksdb/filter_policy.h"
#include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_based_table_factory.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -10,8 +10,8 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

@ -23,6 +23,7 @@ int main() {
#include <thread> #include <thread>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "file/read_write_util.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"

@ -26,6 +26,8 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -43,7 +45,6 @@
#include "trace_replay/trace_replay.h" #include "trace_replay/trace_replay.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/file_reader_writer.h"
#include "util/gflags_compat.h" #include "util/gflags_compat.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"

File diff suppressed because it is too large Load Diff

@ -1,407 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <sstream>
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
#include "test_util/sync_point.h"
#include "util/aligned_buffer.h"
namespace rocksdb {
class Statistics;
class HistogramImpl;
// This file provides the following main abstractions:
// SequentialFileReader : wrapper over Env::SequentialFile
// RandomAccessFileReader : wrapper over Env::RandomAccessFile
// WritableFileWriter : wrapper over Env::WritableFile
// In addition, it also exposed NewReadaheadRandomAccessFile, NewWritableFile,
// and ReadOneLine primitives.
// NewReadaheadRandomAccessFile provides a wrapper over RandomAccessFile to
// always prefetch additional data with every read. This is mainly used in
// Compaction Table Readers.
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size);
// SequentialFileReader is a wrapper on top of Env::SequentialFile. It handles
// Buffered (i.e when page cache is enabled) and Direct (with O_DIRECT / page
// cache disabled) reads appropriately, and also updates the IO stats.
class SequentialFileReader {
private:
std::unique_ptr<SequentialFile> file_;
std::string file_name_;
std::atomic<size_t> offset_{0}; // read offset
public:
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
const std::string& _file_name)
: file_(std::move(_file)), file_name_(_file_name) {}
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
const std::string& _file_name,
size_t _readahead_size)
: file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)),
file_name_(_file_name) {}
SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
return *this;
}
SequentialFileReader(const SequentialFileReader&) = delete;
SequentialFileReader& operator=(const SequentialFileReader&) = delete;
Status Read(size_t n, Slice* result, char* scratch);
Status Skip(uint64_t n);
SequentialFile* file() { return file_.get(); }
std::string file_name() { return file_name_; }
bool use_direct_io() const { return file_->use_direct_io(); }
private:
// NewReadaheadSequentialFile provides a wrapper over SequentialFile to
// always prefetch additional data with every read.
static std::unique_ptr<SequentialFile> NewReadaheadSequentialFile(
std::unique_ptr<SequentialFile>&& file, size_t readahead_size);
};
// RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is
// responsible for:
// - Handling Buffered and Direct reads appropriately.
// - Rate limiting compaction reads.
// - Notifying any interested listeners on the completion of a read.
// - Updating IO stats.
class RandomAccessFileReader {
private:
#ifndef ROCKSDB_LITE
void NotifyOnFileReadFinish(uint64_t offset, size_t length,
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const Status& status) const {
FileOperationInfo info(file_name_, start_ts, finish_ts);
info.offset = offset;
info.length = length;
info.status = status;
for (auto& listener : listeners_) {
listener->OnFileReadFinish(info);
}
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
std::unique_ptr<RandomAccessFile> file_;
std::string file_name_;
Env* env_;
Statistics* stats_;
uint32_t hist_type_;
HistogramImpl* file_read_hist_;
RateLimiter* rate_limiter_;
std::vector<std::shared_ptr<EventListener>> listeners_;
public:
explicit RandomAccessFileReader(
std::unique_ptr<RandomAccessFile>&& raf, std::string _file_name,
Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: file_(std::move(raf)),
file_name_(std::move(_file_name)),
env_(env),
stats_(stats),
hist_type_(hist_type),
file_read_hist_(file_read_hist),
rate_limiter_(rate_limiter),
listeners_() {
#ifndef ROCKSDB_LITE
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
listeners_.emplace_back(e);
}
});
#else // !ROCKSDB_LITE
(void)listeners;
#endif
}
RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
RandomAccessFileReader& operator=(RandomAccessFileReader&& o)
ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
env_ = std::move(o.env_);
stats_ = std::move(o.stats_);
hist_type_ = std::move(o.hist_type_);
file_read_hist_ = std::move(o.file_read_hist_);
rate_limiter_ = std::move(o.rate_limiter_);
return *this;
}
RandomAccessFileReader(const RandomAccessFileReader&) = delete;
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch,
bool for_compaction = false) const;
Status MultiRead(ReadRequest* reqs, size_t num_reqs) const;
Status Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n);
}
RandomAccessFile* file() { return file_.get(); }
std::string file_name() const { return file_name_; }
bool use_direct_io() const { return file_->use_direct_io(); }
};
// WritableFileWriter is a wrapper on top of Env::WritableFile. It provides
// facilities to:
// - Handle Buffered and Direct writes.
// - Rate limit writes.
// - Flush and Sync the data to the underlying filesystem.
// - Notify any interested listeners on the completion of a write.
// - Update IO stats.
class WritableFileWriter {
private:
#ifndef ROCKSDB_LITE
void NotifyOnFileWriteFinish(uint64_t offset, size_t length,
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const Status& status) {
FileOperationInfo info(file_name_, start_ts, finish_ts);
info.offset = offset;
info.length = length;
info.status = status;
for (auto& listener : listeners_) {
listener->OnFileWriteFinish(info);
}
}
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
std::unique_ptr<WritableFile> writable_file_;
std::string file_name_;
Env* env_;
AlignedBuffer buf_;
size_t max_buffer_size_;
// Actually written data size can be used for truncate
// not counting padding data
uint64_t filesize_;
#ifndef ROCKSDB_LITE
// This is necessary when we use unbuffered access
// and writes must happen on aligned offsets
// so we need to go back and write that page again
uint64_t next_write_offset_;
#endif // ROCKSDB_LITE
bool pending_sync_;
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
Statistics* stats_;
std::vector<std::shared_ptr<EventListener>> listeners_;
public:
WritableFileWriter(
std::unique_ptr<WritableFile>&& file, const std::string& _file_name,
const EnvOptions& options, Env* env = nullptr,
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
: writable_file_(std::move(file)),
file_name_(_file_name),
env_(env),
buf_(),
max_buffer_size_(options.writable_file_max_buffer_size),
filesize_(0),
#ifndef ROCKSDB_LITE
next_write_offset_(0),
#endif // ROCKSDB_LITE
pending_sync_(false),
last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter),
stats_(stats),
listeners_() {
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
#ifndef ROCKSDB_LITE
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
listeners_.emplace_back(e);
}
});
#else // !ROCKSDB_LITE
(void)listeners;
#endif
}
WritableFileWriter(const WritableFileWriter&) = delete;
WritableFileWriter& operator=(const WritableFileWriter&) = delete;
~WritableFileWriter() { Close(); }
std::string file_name() const { return file_name_; }
Status Append(const Slice& data);
Status Pad(const size_t pad_bytes);
Status Flush();
Status Close();
Status Sync(bool use_fsync);
// Sync only the data that was already Flush()ed. Safe to call concurrently
// with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
// returns NotSupported status.
Status SyncWithoutFlush(bool use_fsync);
uint64_t GetFileSize() const { return filesize_; }
Status InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);
}
WritableFile* writable_file() const { return writable_file_.get(); }
bool use_direct_io() { return writable_file_->use_direct_io(); }
bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
private:
// Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE
Status WriteDirect();
#endif // !ROCKSDB_LITE
// Normal write
Status WriteBuffered(const char* data, size_t size);
Status RangeSync(uint64_t offset, uint64_t nbytes);
Status SyncInternal(bool use_fsync);
};
// FilePrefetchBuffer is a smart buffer to store and read data from a file.
class FilePrefetchBuffer {
public:
// Constructor.
//
// All arguments are optional.
// file_reader : the file reader to use. Can be a nullptr.
// readahead_size : the initial readahead size.
// max_readahead_size : the maximum readahead size.
// If max_readahead_size > readahead_size, the readahead size will be
// doubled on every IO until max_readahead_size is hit.
// Typically this is set as a multiple of readahead_size.
// max_readahead_size should be greater than equal to readahead_size.
// enable : controls whether reading from the buffer is enabled.
// If false, TryReadFromCache() always return false, and we only take stats
// for the minimum offset if track_min_offset = true.
// track_min_offset : Track the minimum offset ever read and collect stats on
// it. Used for adaptable readahead of the file footer/metadata.
//
// Automatic readhead is enabled for a file if file_reader, readahead_size,
// and max_readahead_size are passed in.
// If file_reader is a nullptr, setting readadhead_size and max_readahead_size
// does not make any sense. So it does nothing.
// A user can construct a FilePrefetchBuffer without any arguments, but use
// `Prefetch` to load data into the buffer.
FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
size_t readadhead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false)
: buffer_offset_(0),
file_reader_(file_reader),
readahead_size_(readadhead_size),
max_readahead_size_(max_readahead_size),
min_offset_read_(port::kMaxSizet),
enable_(enable),
track_min_offset_(track_min_offset) {}
// Load data into the buffer from a file.
// reader : the file reader.
// offset : the file offset to start reading from.
// n : the number of bytes to read.
// for_compaction : if prefetch is done for compaction read.
Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n,
bool for_compaction = false);
// Tries returning the data for a file raed from this buffer, if that data is
// in the buffer.
// It handles tracking the minimum read offset if track_min_offset = true.
// It also does the exponential readahead when readadhead_size is set as part
// of the constructor.
//
// offset : the file offset.
// n : the number of bytes.
// result : output buffer to put the data into.
// for_compaction : if cache read is done for compaction read.
bool TryReadFromCache(uint64_t offset, size_t n, Slice* result,
bool for_compaction = false);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.
size_t min_offset_read() const { return min_offset_read_; }
private:
AlignedBuffer buffer_;
uint64_t buffer_offset_;
RandomAccessFileReader* file_reader_;
size_t readahead_size_;
size_t max_readahead_size_;
// The minimum `offset` ever passed to TryReadFromCache().
size_t min_offset_read_;
// if false, TryReadFromCache() always return false, and we only take stats
// for track_min_offset_ if track_min_offset_ = true
bool enable_;
// If true, track minimum `offset` ever passed to TryReadFromCache(), which
// can be fetched from min_offset_read().
bool track_min_offset_;
};
// Returns a WritableFile.
//
// env : the Env.
// fname : the file name.
// result : output arg. A WritableFile based on `fname` returned.
// options : the Env Options.
extern Status NewWritableFile(Env* env, const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options);
// Read a single line from a file.
bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file,
std::string* output, bool* has_data, Status* result);
} // namespace rocksdb

@ -3,9 +3,12 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
// //
#include "util/file_reader_writer.h"
#include <algorithm> #include <algorithm>
#include <vector> #include <vector>
#include "file/random_access_file_reader.h"
#include "file/readahead_raf.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/random.h" #include "util/random.h"

@ -11,11 +11,11 @@ int main() {
} }
#else #else
#include "file/writable_file_writer.h"
#include "monitoring/histogram.h" #include "monitoring/histogram.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_reader_writer.h"
#include "util/gflags_compat.h" #include "util/gflags_compat.h"
using GFLAGS_NAMESPACE::ParseCommandLineFlags; using GFLAGS_NAMESPACE::ParseCommandLineFlags;

@ -11,6 +11,8 @@
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
@ -19,7 +21,6 @@
#include "util/channel.h" #include "util/channel.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/checkpoint/checkpoint_impl.h" #include "utilities/checkpoint/checkpoint_impl.h"

@ -25,7 +25,6 @@
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/stderr_logger.h" #include "util/stderr_logger.h"

@ -15,7 +15,9 @@
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "file/file_util.h" #include "file/file_util.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "file/sst_file_manager_impl.h" #include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "monitoring/instrumented_mutex.h" #include "monitoring/instrumented_mutex.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
@ -31,7 +33,6 @@
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"

@ -5,17 +5,18 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_dump_tool.h" #include "utilities/blob_db/blob_dump_tool.h"
#include <cinttypes>
#include <stdio.h> #include <stdio.h>
#include <cinttypes>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <string> #include <string>
#include "file/random_access_file_reader.h"
#include "file/readahead_raf.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "table/format.h" #include "table/format.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h" #include "util/string_util.h"
namespace rocksdb { namespace rocksdb {

@ -8,9 +8,9 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include "file/random_access_file_reader.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "util/file_reader_writer.h"
#include "utilities/blob_db/blob_log_format.h" #include "utilities/blob_db/blob_log_format.h"
namespace rocksdb { namespace rocksdb {

@ -17,6 +17,7 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/readahead_raf.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "utilities/blob_db/blob_db_impl.h" #include "utilities/blob_db/blob_db_impl.h"

@ -8,10 +8,10 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include "file/random_access_file_reader.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "util/file_reader_writer.h"
#include "utilities/blob_db/blob_log_format.h" #include "utilities/blob_db/blob_log_format.h"
#include "utilities/blob_db/blob_log_reader.h" #include "utilities/blob_db/blob_log_reader.h"
#include "utilities/blob_db/blob_log_writer.h" #include "utilities/blob_db/blob_log_writer.h"

@ -9,8 +9,8 @@
#include <algorithm> #include <algorithm>
#include "file/random_access_file_reader.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
namespace rocksdb { namespace rocksdb {

@ -10,11 +10,11 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "file/random_access_file_reader.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "util/file_reader_writer.h"
#include "utilities/blob_db/blob_log_format.h" #include "utilities/blob_db/blob_log_format.h"
namespace rocksdb { namespace rocksdb {

@ -9,10 +9,10 @@
#include <cstdint> #include <cstdint>
#include <string> #include <string>
#include "file/writable_file_writer.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "utilities/blob_db/blob_log_format.h" #include "utilities/blob_db/blob_log_format.h"

@ -11,6 +11,8 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "file/random_access_file_reader.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -21,7 +23,6 @@
#include "port/port.h" #include "port/port.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
// The io code path of persistent cache uses pipelined architecture // The io code path of persistent cache uses pipelined architecture

@ -5,10 +5,10 @@
#include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/sim_cache.h"
#include <atomic> #include <atomic>
#include "file/writable_file_writer.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/file_reader_writer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/string_util.h" #include "util/string_util.h"

@ -5,9 +5,10 @@
#include "utilities/trace/file_trace_reader_writer.h" #include "utilities/trace/file_trace_reader_writer.h"
#include "file/random_access_file_reader.h"
#include "file/writable_file_writer.h"
#include "trace_replay/trace_replay.h" #include "trace_replay/trace_replay.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {

Loading…
Cancel
Save