From 70c7302fb5d343fa319e05327ecf88d09fe26a2b Mon Sep 17 00:00:00 2001 From: haoyuhuang Date: Fri, 26 Jul 2019 14:36:16 -0700 Subject: [PATCH] Block cache simulator: Add pysim to simulate caches using reinforcement learning. (#5610) Summary: This PR implements cache eviction using reinforcement learning. It includes two implementations: 1. An implementation of Thompson Sampling for the Bernoulli Bandit [1]. 2. An implementation of LinUCB with disjoint linear models [2]. The idea is that a cache uses multiple eviction policies, e.g., MRU, LRU, and LFU. The cache learns which eviction policy is the best and uses it upon a cache miss. Thompson Sampling is contextless and does not include any features. LinUCB includes features such as level, block type, caller, column family id to decide which eviction policy to use. [1] Daniel J. Russo, Benjamin Van Roy, Abbas Kazerouni, Ian Osband, and Zheng Wen. 2018. A Tutorial on Thompson Sampling. Found. Trends Mach. Learn. 11, 1 (July 2018), 1-96. DOI: https://doi.org/10.1561/2200000070 [2] Lihong Li, Wei Chu, John Langford, and Robert E. Schapire. 2010. A contextual-bandit approach to personalized news article recommendation. In Proceedings of the 19th international conference on World wide web (WWW '10). ACM, New York, NY, USA, 661-670. DOI=http://dx.doi.org/10.1145/1772690.1772758 Pull Request resolved: https://github.com/facebook/rocksdb/pull/5610 Differential Revision: D16435067 Pulled By: HaoyuHuang fbshipit-source-id: 6549239ae14115c01cb1e70548af9e46d8dc21bb --- .gitignore | 1 + CMakeLists.txt | 4 +- Makefile | 4 +- TARGETS | 6 +- src.mk | 6 +- tools/block_cache_analyzer/__init__.py | 2 + .../block_cache_analyzer/block_cache_pysim.py | 864 ++++++++++++++++++ .../block_cache_analyzer/block_cache_pysim.sh | 118 +++ .../block_cache_pysim_test.py | 340 +++++++ .../block_cache_trace_analyzer.cc | 14 +- .../block_cache_trace_analyzer.h | 0 .../block_cache_trace_analyzer_plot.py | 0 .../block_cache_trace_analyzer_test.cc | 4 +- .../block_cache_trace_analyzer_tool.cc | 2 +- 14 files changed, 1345 insertions(+), 20 deletions(-) create mode 100644 tools/block_cache_analyzer/__init__.py create mode 100644 tools/block_cache_analyzer/block_cache_pysim.py create mode 100644 tools/block_cache_analyzer/block_cache_pysim.sh create mode 100644 tools/block_cache_analyzer/block_cache_pysim_test.py rename tools/{ => block_cache_analyzer}/block_cache_trace_analyzer.cc (99%) rename tools/{ => block_cache_analyzer}/block_cache_trace_analyzer.h (100%) rename tools/{ => block_cache_analyzer}/block_cache_trace_analyzer_plot.py (100%) rename tools/{ => block_cache_analyzer}/block_cache_trace_analyzer_test.cc (99%) rename tools/{ => block_cache_analyzer}/block_cache_trace_analyzer_tool.cc (91%) diff --git a/.gitignore b/.gitignore index c8672a8b3..199458901 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,7 @@ manifest_dump sst_dump blob_dump block_cache_trace_analyzer +tools/block_cache_analyzer/*.pyc column_aware_encoding_exp util/build_version.cc build_tools/VALGRIND_LOGS/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 086975f3e..7266f3b55 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -626,7 +626,7 @@ set(SOURCES test_util/sync_point_impl.cc test_util/testutil.cc test_util/transaction_test_util.cc - tools/block_cache_trace_analyzer.cc + tools/block_cache_analyzer/block_cache_trace_analyzer.cc tools/db_bench_tool.cc tools/dump/db_dump_tool.cc tools/ldb_cmd.cc @@ -976,7 +976,7 @@ if(WITH_TESTS) table/merger_test.cc table/sst_file_reader_test.cc table/table_test.cc - tools/block_cache_trace_analyzer_test.cc + tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc tools/ldb_cmd_test.cc tools/reduce_levels_test.cc tools/sst_dump_test.cc diff --git a/Makefile b/Makefile index 7aef277d5..fbe6d2d06 100644 --- a/Makefile +++ b/Makefile @@ -1114,7 +1114,7 @@ db_bench: tools/db_bench.o $(BENCHTOOLOBJECTS) trace_analyzer: tools/trace_analyzer.o $(ANALYZETOOLOBJECTS) $(LIBOBJECTS) $(AM_LINK) -block_cache_trace_analyzer: tools/block_cache_trace_analyzer_tool.o $(ANALYZETOOLOBJECTS) $(LIBOBJECTS) +block_cache_trace_analyzer: tools/block_cache_analyzer/block_cache_trace_analyzer_tool.o $(ANALYZETOOLOBJECTS) $(LIBOBJECTS) $(AM_LINK) cache_bench: cache/cache_bench.o $(LIBOBJECTS) $(TESTUTIL) @@ -1614,7 +1614,7 @@ db_secondary_test: db/db_impl/db_secondary_test.o db/db_test_util.o $(LIBOBJECTS block_cache_tracer_test: trace_replay/block_cache_tracer_test.o trace_replay/block_cache_tracer.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) -block_cache_trace_analyzer_test: tools/block_cache_trace_analyzer_test.o tools/block_cache_trace_analyzer.o $(LIBOBJECTS) $(TESTHARNESS) +block_cache_trace_analyzer_test: tools/block_cache_analyzer/block_cache_trace_analyzer_test.o tools/block_cache_analyzer/block_cache_trace_analyzer.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) #------------------------------------------------- diff --git a/TARGETS b/TARGETS index a54e56b98..884d69b14 100644 --- a/TARGETS +++ b/TARGETS @@ -351,7 +351,7 @@ cpp_library( "test_util/fault_injection_test_env.cc", "test_util/testharness.cc", "test_util/testutil.cc", - "tools/block_cache_trace_analyzer.cc", + "tools/block_cache_analyzer/block_cache_trace_analyzer.cc", "tools/trace_analyzer_tool.cc", "utilities/cassandra/test_utils.cc", ], @@ -369,7 +369,7 @@ cpp_library( name = "rocksdb_tools_lib", srcs = [ "test_util/testutil.cc", - "tools/block_cache_trace_analyzer.cc", + "tools/block_cache_analyzer/block_cache_trace_analyzer.cc", "tools/db_bench_tool.cc", "tools/trace_analyzer_tool.cc", ], @@ -430,7 +430,7 @@ ROCKS_TESTS = [ ], [ "block_cache_trace_analyzer_test", - "tools/block_cache_trace_analyzer_test.cc", + "tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc", "serial", ], [ diff --git a/src.mk b/src.mk index 3462a6a58..0c6142e41 100644 --- a/src.mk +++ b/src.mk @@ -246,7 +246,7 @@ TOOL_LIB_SOURCES = \ utilities/blob_db/blob_dump_tool.cc \ ANALYZER_LIB_SOURCES = \ - tools/block_cache_trace_analyzer.cc \ + tools/block_cache_analyzer/block_cache_trace_analyzer.cc \ tools/trace_analyzer_tool.cc \ MOCK_LIB_SOURCES = \ @@ -374,8 +374,8 @@ MAIN_SOURCES = \ table/table_reader_bench.cc \ table/table_test.cc \ third-party/gtest-1.7.0/fused-src/gtest/gtest-all.cc \ - tools/block_cache_trace_analyzer_test.cc \ - tools/block_cache_trace_analyzer_tool.cc \ + tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc \ + tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc \ tools/db_bench.cc \ tools/db_bench_tool_test.cc \ tools/db_sanity_test.cc \ diff --git a/tools/block_cache_analyzer/__init__.py b/tools/block_cache_analyzer/__init__.py new file mode 100644 index 000000000..8dbe96a78 --- /dev/null +++ b/tools/block_cache_analyzer/__init__.py @@ -0,0 +1,2 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. diff --git a/tools/block_cache_analyzer/block_cache_pysim.py b/tools/block_cache_analyzer/block_cache_pysim.py new file mode 100644 index 000000000..63e367be5 --- /dev/null +++ b/tools/block_cache_analyzer/block_cache_pysim.py @@ -0,0 +1,864 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + +import gc +import random +import sys +import time +from os import path + +import numpy as np + + +kSampleSize = 16 # The sample size used when performing eviction. +kMicrosInSecond = 1000000 +kSecondsInMinute = 60 +kSecondsInHour = 3600 + + +class TraceRecord: + """ + A trace record represents a block access. + It holds the same struct as BlockCacheTraceRecord in + trace_replay/block_cache_tracer.h + """ + + def __init__( + self, + access_time, + block_id, + block_type, + block_size, + cf_id, + cf_name, + level, + fd, + caller, + no_insert, + get_id, + key_id, + kv_size, + is_hit, + ): + self.access_time = access_time + self.block_id = block_id + self.block_type = block_type + self.block_size = block_size + self.cf_id = cf_id + self.cf_name = cf_name + self.level = level + self.fd = fd + self.caller = caller + if no_insert == 1: + self.no_insert = True + else: + self.no_insert = False + self.get_id = get_id + self.key_id = key_id + self.kv_size = kv_size + if is_hit == 1: + self.is_hit = True + else: + self.is_hit = False + + +class CacheEntry: + """A cache entry stored in the cache.""" + + def __init__(self, value_size, cf_id, level, block_type, access_number): + self.value_size = value_size + self.last_access_number = access_number + self.num_hits = 0 + self.cf_id = 0 + self.level = level + self.block_type = block_type + + def __repr__(self): + """Debug string.""" + return "s={},last={},hits={},cf={},l={},bt={}".format( + self.value_size, + self.last_access_number, + self.num_hits, + self.cf_id, + self.level, + self.block_type, + ) + + +class HashEntry: + """A hash entry stored in a hash table.""" + + def __init__(self, key, hash, value): + self.key = key + self.hash = hash + self.value = value + + def __repr__(self): + return "k={},h={},v=[{}]".format(self.key, self.hash, self.value) + + +class HashTable: + """ + A custom implementation of hash table to support fast random sampling. + It is closed hashing and uses chaining to resolve hash conflicts. + It grows/shrinks the hash table upon insertion/deletion to support + fast lookups and random samplings. + """ + + def __init__(self): + self.table = [None] * 32 + self.elements = 0 + + def random_sample(self, sample_size): + """Randomly sample 'sample_size' hash entries from the table.""" + samples = [] + index = random.randint(0, len(self.table)) + pos = (index + 1) % len(self.table) + searches = 0 + # Starting from index, adding hash entries to the sample list until + # sample_size is met or we ran out of entries. + while pos != index and len(samples) < sample_size: + if self.table[pos] is not None: + for i in range(len(self.table[pos])): + if self.table[pos][i] is None: + continue + samples.append(self.table[pos][i]) + if len(samples) > sample_size: + break + pos += 1 + pos = pos % len(self.table) + searches += 1 + return samples + + def insert(self, key, hash, value): + """ + Insert a hash entry in the table. Replace the old entry if it already + exists. + """ + self.grow() + inserted = False + index = hash % len(self.table) + if self.table[index] is None: + self.table[index] = [] + for i in range(len(self.table[index])): + if self.table[index][i] is not None: + if ( + self.table[index][i].hash == hash + and self.table[index][i].key == key + ): + # The entry already exists in the table. + self.table[index][i] = HashEntry(key, hash, value) + return + continue + self.table[index][i] = HashEntry(key, hash, value) + inserted = True + break + if not inserted: + self.table[index].append(HashEntry(key, hash, value)) + self.elements += 1 + + def resize(self, new_size): + if new_size == len(self.table): + return + if new_size == 0: + return + if self.elements < 100: + return + new_table = [None] * new_size + # Copy 'self.table' to new_table. + for i in range(len(self.table)): + entries = self.table[i] + if entries is None: + continue + for j in range(len(entries)): + if entries[j] is None: + continue + index = entries[j].hash % new_size + if new_table[index] is None: + new_table[index] = [] + new_table[index].append(entries[j]) + self.table = new_table + del new_table + # Manually call python gc here to free the memory as 'self.table' + # might be very large. + gc.collect() + + def grow(self): + if self.elements < len(self.table): + return + new_size = int(len(self.table) * 1.2) + self.resize(new_size) + + def delete(self, key, hash): + index = hash % len(self.table) + entries = self.table[index] + deleted = False + if entries is None: + return + for i in range(len(entries)): + if ( + entries[i] is not None + and entries[i].hash == hash + and entries[i].key == key + ): + entries[i] = None + self.elements -= 1 + deleted = True + break + if deleted: + self.shrink() + + def shrink(self): + if self.elements * 2 >= len(self.table): + return + new_size = int(len(self.table) * 0.7) + self.resize(new_size) + + def lookup(self, key, hash): + index = hash % len(self.table) + entries = self.table[index] + if entries is None: + return None + for entry in entries: + if entry is not None and entry.hash == hash and entry.key == key: + return entry.value + return None + + +class MissRatioStats: + def __init__(self, time_unit): + self.num_misses = 0 + self.num_accesses = 0 + self.time_unit = time_unit + self.time_misses = {} + self.time_accesses = {} + + def update_metrics(self, access_time, is_hit): + access_time /= kMicrosInSecond * self.time_unit + self.num_accesses += 1 + if access_time not in self.time_accesses: + self.time_accesses[access_time] = 0 + self.time_accesses[access_time] += 1 + if not is_hit: + self.num_misses += 1 + if access_time not in self.time_misses: + self.time_misses[access_time] = 0 + self.time_misses[access_time] += 1 + + def reset_counter(self): + self.num_misses = 0 + self.num_accesses = 0 + + def miss_ratio(self): + return float(self.num_misses) * 100.0 / float(self.num_accesses) + + def write_miss_timeline(self, cache_type, cache_size, result_dir, start, end): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-miss-timeline-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-miss-timeline-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size + ) + with open(file_path, "w+") as file: + row = "{}".format(cache_type) + for trace_time in range(start, end): + row += ",{}".format(self.time_misses.get(trace_time, 0)) + file.write(row + "\n") + + def write_miss_ratio_timeline(self, cache_type, cache_size, result_dir, start, end): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-miss-ratio-timeline-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-miss-ratio-timeline-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size + ) + with open(file_path, "w+") as file: + row = "{}".format(cache_type) + for trace_time in range(start, end): + naccesses = self.time_accesses.get(trace_time, 0) + miss_ratio = 0 + if naccesses > 0: + miss_ratio = float( + self.time_misses.get(trace_time, 0) * 100.0 + ) / float(naccesses) + row += ",{0:.2f}".format(miss_ratio) + file.write(row + "\n") + + +class PolicyStats: + def __init__(self, time_unit, policies): + self.time_selected_polices = {} + self.time_accesses = {} + self.policy_names = {} + self.time_unit = time_unit + for i in range(len(policies)): + self.policy_names[i] = policies[i].policy_name() + + def update_metrics(self, access_time, selected_policy): + access_time /= kMicrosInSecond * self.time_unit + if access_time not in self.time_accesses: + self.time_accesses[access_time] = 0 + self.time_accesses[access_time] += 1 + if access_time not in self.time_selected_polices: + self.time_selected_polices[access_time] = {} + policy_name = self.policy_names[selected_policy] + if policy_name not in self.time_selected_polices[access_time]: + self.time_selected_polices[access_time][policy_name] = 0 + self.time_selected_polices[access_time][policy_name] += 1 + + def write_policy_timeline(self, cache_type, cache_size, result_dir, start, end): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-policy-timeline-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-policy-timeline-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size + ) + with open(file_path, "w+") as file: + for policy in self.policy_names: + policy_name = self.policy_names[policy] + row = "{}-{}".format(cache_type, policy_name) + for trace_time in range(start, end): + row += ",{}".format( + self.time_selected_polices.get(trace_time, {}).get( + policy_name, 0 + ) + ) + file.write(row + "\n") + + def write_policy_ratio_timeline( + self, cache_type, cache_size, file_path, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-policy-ratio-timeline-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-policy-ratio-timeline-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size + ) + with open(file_path, "w+") as file: + for policy in self.policy_names: + policy_name = self.policy_names[policy] + row = "{}-{}".format(cache_type, policy_name) + for trace_time in range(start, end): + naccesses = self.time_accesses.get(trace_time, 0) + ratio = 0 + if naccesses > 0: + ratio = float( + self.time_selected_polices.get(trace_time, {}).get( + policy_name, 0 + ) + * 100.0 + ) / float(naccesses) + row += ",{0:.2f}".format(ratio) + file.write(row + "\n") + + +class Policy(object): + """ + A policy maintains a set of evicted keys. It returns a reward of one to + itself if it has not evicted a missing key. Otherwise, it gives itself 0 + reward. + """ + + def __init__(self): + self.evicted_keys = {} + + def evict(self, key, max_size): + self.evicted_keys[key] = 0 + + def delete(self, key): + self.evicted_keys.pop(key, None) + + def prioritize_samples(self, samples): + raise NotImplementedError + + def policy_name(self): + raise NotImplementedError + + def generate_reward(self, key): + if key in self.evicted_keys: + return 0 + return 1 + + +class LRUPolicy(Policy): + def prioritize_samples(self, samples): + return sorted( + samples, + cmp=lambda e1, e2: e1.value.last_access_number + - e2.value.last_access_number, + ) + + def policy_name(self): + return "lru" + + +class MRUPolicy(Policy): + def prioritize_samples(self, samples): + return sorted( + samples, + cmp=lambda e1, e2: e2.value.last_access_number + - e1.value.last_access_number, + ) + + def policy_name(self): + return "mru" + + +class LFUPolicy(Policy): + def prioritize_samples(self, samples): + return sorted(samples, cmp=lambda e1, e2: e1.value.num_hits - e2.value.num_hits) + + def policy_name(self): + return "lfu" + + +class MLCache(object): + def __init__(self, cache_size, enable_cache_row_key, policies): + self.cache_size = cache_size + self.used_size = 0 + self.miss_ratio_stats = MissRatioStats(kSecondsInMinute) + self.policy_stats = PolicyStats(kSecondsInMinute, policies) + self.per_hour_miss_ratio_stats = MissRatioStats(kSecondsInHour) + self.per_hour_policy_stats = PolicyStats(kSecondsInHour, policies) + self.table = HashTable() + self.enable_cache_row_key = enable_cache_row_key + self.get_id_row_key_map = {} + self.policies = policies + + def _lookup(self, key, hash): + value = self.table.lookup(key, hash) + if value is not None: + value.last_access_number = self.miss_ratio_stats.num_accesses + value.num_hits += 1 + return True + return False + + def _select_policy(self, trace_record, key): + raise NotImplementedError + + def cache_name(self): + raise NotImplementedError + + def _evict(self, policy_index, value_size): + # Randomly sample n entries. + samples = self.table.random_sample(kSampleSize) + samples = self.policies[policy_index].prioritize_samples(samples) + for hash_entry in samples: + self.used_size -= hash_entry.value.value_size + self.table.delete(hash_entry.key, hash_entry.hash) + self.policies[policy_index].evict( + key=hash_entry.key, max_size=self.table.elements + ) + if self.used_size + value_size <= self.cache_size: + break + + def _insert(self, trace_record, key, hash, value_size): + if value_size > self.cache_size: + return + policy_index = self._select_policy(trace_record, key) + self.policies[policy_index].delete(key) + self.policy_stats.update_metrics(trace_record.access_time, policy_index) + self.per_hour_policy_stats.update_metrics( + trace_record.access_time, policy_index + ) + while self.used_size + value_size > self.cache_size: + self._evict(policy_index, value_size) + self.table.insert( + key, + hash, + CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + self.miss_ratio_stats.num_accesses, + ), + ) + self.used_size += value_size + + def _access_kv(self, trace_record, key, hash, value_size, no_insert): + if self._lookup(key, hash): + return True + if not no_insert and value_size > 0: + self._insert(trace_record, key, hash, value_size) + return False + + def _update_stats(self, access_time, is_hit): + self.miss_ratio_stats.update_metrics(access_time, is_hit) + self.per_hour_miss_ratio_stats.update_metrics(access_time, is_hit) + + def access(self, trace_record): + assert self.used_size <= self.cache_size + if ( + self.enable_cache_row_key + and trace_record.caller == 1 + and trace_record.key_id != 0 + and trace_record.get_id != 0 + ): + # This is a get request. + if trace_record.get_id not in self.get_id_row_key_map: + self.get_id_row_key_map[trace_record.get_id] = {} + self.get_id_row_key_map[trace_record.get_id]["h"] = False + if self.get_id_row_key_map[trace_record.get_id]["h"]: + # We treat future accesses as hits since this get request + # completes. + self._update_stats(trace_record.access_time, is_hit=True) + return + if trace_record.key_id not in self.get_id_row_key_map[trace_record.get_id]: + # First time seen this key. + is_hit = self._access_kv( + trace_record, + key="g{}".format(trace_record.key_id), + hash=trace_record.key_id, + value_size=trace_record.kv_size, + no_insert=False, + ) + inserted = False + if trace_record.kv_size > 0: + inserted = True + self.get_id_row_key_map[trace_record.get_id][ + trace_record.key_id + ] = inserted + self.get_id_row_key_map[trace_record.get_id]["h"] = is_hit + if self.get_id_row_key_map[trace_record.get_id]["h"]: + # We treat future accesses as hits since this get request + # completes. + self._update_stats(trace_record.access_time, is_hit=True) + return + # Access its blocks. + is_hit = self._access_kv( + trace_record, + key="b{}".format(trace_record.block_id), + hash=trace_record.block_id, + value_size=trace_record.block_size, + no_insert=trace_record.no_insert, + ) + self._update_stats(trace_record.access_time, is_hit) + if ( + trace_record.kv_size > 0 + and not self.get_id_row_key_map[trace_record.get_id][ + trace_record.key_id + ] + ): + # Insert the row key-value pair. + self._access_kv( + trace_record, + key="g{}".format(trace_record.key_id), + hash=trace_record.key_id, + value_size=trace_record.kv_size, + no_insert=False, + ) + # Mark as inserted. + self.get_id_row_key_map[trace_record.get_id][trace_record.key_id] = True + return + # Access the block. + is_hit = self._access_kv( + trace_record, + key="b{}".format(trace_record.block_id), + hash=trace_record.block_id, + value_size=trace_record.block_size, + no_insert=trace_record.no_insert, + ) + self._update_stats(trace_record.access_time, is_hit) + + +class ThompsonSamplingCache(MLCache): + """ + An implementation of Thompson Sampling for the Bernoulli Bandit [1]. + [1] Daniel J. Russo, Benjamin Van Roy, Abbas Kazerouni, Ian Osband, + and Zheng Wen. 2018. A Tutorial on Thompson Sampling. Found. + Trends Mach. Learn. 11, 1 (July 2018), 1-96. + DOI: https://doi.org/10.1561/2200000070 + """ + + def __init__(self, cache_size, enable_cache_row_key, policies, init_a=1, init_b=1): + super(ThompsonSamplingCache, self).__init__( + cache_size, enable_cache_row_key, policies + ) + self._as = {} + self._bs = {} + for _i in range(len(policies)): + self._as = [init_a] * len(self.policies) + self._bs = [init_b] * len(self.policies) + + def _select_policy(self, trace_record, key): + samples = [ + np.random.beta(self._as[x], self._bs[x]) for x in range(len(self.policies)) + ] + selected_policy = max(range(len(self.policies)), key=lambda x: samples[x]) + reward = self.policies[selected_policy].generate_reward(key) + assert reward <= 1 and reward >= 0 + self._as[selected_policy] += reward + self._bs[selected_policy] += 1 - reward + return selected_policy + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid ThompsonSampling (ts_hybrid)" + return "ThompsonSampling (ts)" + + +class LinUCBCache(MLCache): + """ + An implementation of LinUCB with disjoint linear models [2]. + [2] Lihong Li, Wei Chu, John Langford, and Robert E. Schapire. 2010. + A contextual-bandit approach to personalized news article recommendation. + In Proceedings of the 19th international conference on World wide web + (WWW '10). ACM, New York, NY, USA, 661-670. + DOI=http://dx.doi.org/10.1145/1772690.1772758 + """ + + def __init__(self, cache_size, enable_cache_row_key, policies): + super(LinUCBCache, self).__init__(cache_size, enable_cache_row_key, policies) + self.nfeatures = 4 # Block type, caller, level, cf. + self.th = np.zeros((len(self.policies), self.nfeatures)) + self.eps = 0.2 + self.b = np.zeros_like(self.th) + self.A = np.zeros((len(self.policies), self.nfeatures, self.nfeatures)) + self.A_inv = np.zeros((len(self.policies), self.nfeatures, self.nfeatures)) + for i in range(len(self.policies)): + self.A[i] = np.identity(self.nfeatures) + self.th_hat = np.zeros_like(self.th) + self.p = np.zeros(len(self.policies)) + self.alph = 0.2 + + def _select_policy(self, trace_record, key): + x_i = np.zeros(self.nfeatures) # The current context vector + x_i[0] = trace_record.block_type + x_i[1] = trace_record.caller + x_i[2] = trace_record.level + x_i[3] = trace_record.cf_id + p = np.zeros(len(self.policies)) + for a in range(len(self.policies)): + self.th_hat[a] = self.A_inv[a].dot(self.b[a]) + ta = x_i.dot(self.A_inv[a]).dot(x_i) + a_upper_ci = self.alph * np.sqrt(ta) + a_mean = self.th_hat[a].dot(x_i) + p[a] = a_mean + a_upper_ci + p = p + (np.random.random(len(p)) * 0.000001) + selected_policy = p.argmax() + reward = self.policies[selected_policy].generate_reward(key) + assert reward <= 1 and reward >= 0 + self.A[selected_policy] += np.outer(x_i, x_i) + self.b[selected_policy] += reward * x_i + self.A_inv[selected_policy] = np.linalg.inv(self.A[selected_policy]) + del x_i + return selected_policy + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid LinUCB (linucb_hybrid)" + return "LinUCB (linucb)" + + +def parse_cache_size(cs): + cs = cs.replace("\n", "") + if cs[-1] == "M": + return int(cs[: len(cs) - 1]) * 1024 * 1024 + if cs[-1] == "G": + return int(cs[: len(cs) - 1]) * 1024 * 1024 * 1024 + if cs[-1] == "T": + return int(cs[: len(cs) - 1]) * 1024 * 1024 * 1024 * 1024 + return int(cs) + + +def create_cache(cache_type, cache_size, downsample_size): + policies = [] + policies.append(LRUPolicy()) + policies.append(MRUPolicy()) + policies.append(LFUPolicy()) + cache_size = cache_size / downsample_size + enable_cache_row_key = False + if "hybrid" in cache_type: + enable_cache_row_key = True + cache_type = cache_type[:-7] + if cache_type == "ts": + return ThompsonSamplingCache(cache_size, enable_cache_row_key, policies) + elif cache_type == "linucb": + return LinUCBCache(cache_size, enable_cache_row_key, policies) + else: + print("Unknown cache type {}".format(cache_type)) + assert False + return None + + +def run(trace_file_path, cache_type, cache, warmup_seconds): + warmup_complete = False + num = 0 + trace_start_time = 0 + trace_duration = 0 + start_time = time.time() + time_interval = 1 + trace_miss_ratio_stats = MissRatioStats(kSecondsInMinute) + with open(trace_file_path, "r") as trace_file: + for line in trace_file: + num += 1 + if num % 1000000 == 0: + # Force a python gc periodically to reduce memory usage. + gc.collect() + ts = line.split(",") + timestamp = int(ts[0]) + if trace_start_time == 0: + trace_start_time = timestamp + trace_duration = timestamp - trace_start_time + if not warmup_complete and trace_duration > warmup_seconds * 1000000: + cache.miss_ratio_stats.reset_counter() + warmup_complete = True + record = TraceRecord( + access_time=int(ts[0]), + block_id=int(ts[1]), + block_type=int(ts[2]), + block_size=int(ts[3]), + cf_id=int(ts[4]), + cf_name=ts[5], + level=int(ts[6]), + fd=int(ts[7]), + caller=int(ts[8]), + no_insert=int(ts[9]), + get_id=int(ts[10]), + key_id=int(ts[11]), + kv_size=int(ts[12]), + is_hit=int(ts[13]), + ) + trace_miss_ratio_stats.update_metrics( + record.access_time, is_hit=record.is_hit + ) + cache.access(record) + del record + if num % 100 != 0: + continue + # Report progress every 10 seconds. + now = time.time() + if now - start_time > time_interval * 10: + print( + "Take {} seconds to process {} trace records with trace " + "duration of {} seconds. Throughput: {} records/second. " + "Trace miss ratio {}".format( + now - start_time, + num, + trace_duration / 1000000, + num / (now - start_time), + trace_miss_ratio_stats.miss_ratio(), + ) + ) + time_interval += 1 + print( + "{},0,0,{},{},{}".format( + cache_type, + cache.cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) + now = time.time() + print( + "Take {} seconds to process {} trace records with trace duration of {} " + "seconds. Throughput: {} records/second. Trace miss ratio {}".format( + now - start_time, + num, + trace_duration / 1000000, + num / (now - start_time), + trace_miss_ratio_stats.miss_ratio(), + ) + ) + return trace_start_time, trace_duration + + +def report_stats( + cache, cache_type, cache_size, result_dir, trace_start_time, trace_end_time +): + cache_label = "{}-{}".format(cache_type, cache_size) + with open("{}/data-ml-mrc-{}".format(result_dir, cache_label), "w+") as mrc_file: + mrc_file.write( + "{},0,0,{},{},{}\n".format( + cache_type, + cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) + cache.policy_stats.write_policy_timeline( + cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) + cache.policy_stats.write_policy_ratio_timeline( + cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) + cache.miss_ratio_stats.write_miss_timeline( + cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) + cache.miss_ratio_stats.write_miss_ratio_timeline( + cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) + cache.per_hour_policy_stats.write_policy_timeline( + cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) + cache.per_hour_policy_stats.write_policy_ratio_timeline( + cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) + cache.per_hour_miss_ratio_stats.write_miss_timeline( + cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) + cache.per_hour_miss_ratio_stats.write_miss_ratio_timeline( + cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) + + +if __name__ == "__main__": + if len(sys.argv) <= 6: + print( + "Must provide 6 arguments. " + "1) cache_type (ts, ts_hybrid, linucb, linucb_hybrid). " + "2) cache size (xM, xG, xT). " + "3) The sampling frequency used to collect the trace. (The " + "simulation scales down the cache size by the sampling frequency). " + "4) Warmup seconds (The number of seconds used for warmup). " + "5) Trace file path. " + "6) Result directory (A directory that saves generated results)" + ) + exit(1) + cache_type = sys.argv[1] + cache_size = parse_cache_size(sys.argv[2]) + downsample_size = int(sys.argv[3]) + warmup_seconds = int(sys.argv[4]) + trace_file_path = sys.argv[5] + result_dir = sys.argv[6] + cache = create_cache(cache_type, cache_size, downsample_size) + trace_start_time, trace_duration = run( + trace_file_path, cache_type, cache, warmup_seconds + ) + trace_end_time = trace_start_time + trace_duration + report_stats( + cache, cache_type, cache_size, result_dir, trace_start_time, trace_end_time + ) diff --git a/tools/block_cache_analyzer/block_cache_pysim.sh b/tools/block_cache_analyzer/block_cache_pysim.sh new file mode 100644 index 000000000..58193a063 --- /dev/null +++ b/tools/block_cache_analyzer/block_cache_pysim.sh @@ -0,0 +1,118 @@ +#!/usr/bin/env bash +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +# +# A shell script to run a batch of pysims and combine individual pysim output files. +# +# Usage: bash block_cache_pysim.sh trace_file_path result_dir downsample_size warmup_seconds max_jobs +# trace_file_path: The file path that stores the traces. +# result_dir: The directory to store pysim results. The output files from a pysim is stores in result_dir/ml +# downsample_size: The downsample size used to collect the trace. +# warmup_seconds: The number of seconds used for warmup. +# max_jobs: The max number of concurrent pysims to run. + +if [ $# -ne 5 ]; then + echo "Usage: ./block_cache_pysim.sh trace_file_path result_dir downsample_size warmup_seconds max_jobs" + exit 0 +fi + +trace_file="$1" +result_dir="$2" +downsample_size="$3" +warmup_seconds="$4" +max_jobs="$5" +current_jobs=0 + +ml_tmp_result_dir="$result_dir/ml" +rm -rf "$ml_tmp_result_dir" +mkdir -p "$result_dir" +mkdir -p "$ml_tmp_result_dir" + +for cache_type in "ts" "linucb" "ts_hybrid" "linucb_hybrid" +do +for cache_size in "16M" "256M" "1G" "2G" "4G" "8G" "12G" "16G" +do + while [ "$current_jobs" -ge "$max_jobs" ] + do + sleep 10 + echo "Waiting jobs to complete. Number of running jobs: $current_jobs" + current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep) + echo "Waiting jobs to complete. Number of running jobs: $current_jobs" + done + output="log-ml-$cache_type-$cache_size" + echo "Running simulation for $cache_type and cache size $cache_size. Number of running jobs: $current_jobs. " + nohup python block_cache_pysim.py "$cache_type" "$cache_size" "$downsample_size" "$warmup_seconds" "$trace_file" "$ml_tmp_result_dir" >& $ml_tmp_result_dir/$output & + current_jobs=$((current_jobs+1)) +done +done + +# Wait for all jobs to complete. +while [ $current_jobs -gt 0 ] +do + sleep 10 + echo "Waiting jobs to complete. Number of running jobs: $current_jobs" + current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep) + echo "Waiting jobs to complete. Number of running jobs: $current_jobs" +done + +echo "Combine individual pysim output files" + +rm -rf "$result_dir/ml_*" +mrc_file="$result_dir/ml_mrc" +for header in "header-" "data-" +do +for fn in $ml_tmp_result_dir/* +do + sum_file="" + time_unit="" + capacity="" + if [[ $fn == *"timeline"* ]]; then + tmpfn="$fn" + IFS='-' read -ra elements <<< "$tmpfn" + time_unit_index=0 + capacity_index=0 + for i in "${elements[@]}" + do + if [[ $i == "timeline" ]]; then + break + fi + time_unit_index=$((time_unit_index+1)) + done + time_unit_index=$((time_unit_index+1)) + capacity_index=$((time_unit_index+2)) + time_unit="${elements[$time_unit_index]}_" + capacity="${elements[$capacity_index]}_" + fi + + if [[ $fn == "${header}ml-policy-timeline"* ]]; then + sum_file="$result_dir/ml_${capacity}${time_unit}policy_timeline" + fi + if [[ $fn == "${header}ml-policy-ratio-timeline"* ]]; then + sum_file="$result_dir/ml_${capacity}${time_unit}policy_ratio_timeline" + fi + if [[ $fn == "${header}ml-miss-timeline"* ]]; then + sum_file="$result_dir/ml_${capacity}${time_unit}miss_timeline" + fi + if [[ $fn == "${header}ml-miss-ratio-timeline"* ]]; then + sum_file="$result_dir/ml_${capacity}${time_unit}miss_ratio_timeline" + fi + if [[ $fn == "${header}ml-mrc"* ]]; then + sum_file="$mrc_file" + fi + if [[ $sum_file == "" ]]; then + continue + fi + if [[ $header == "header-" ]]; then + if [ -e "$sum_file" ]; then + continue + fi + fi + cat "$ml_tmp_result_dir/$fn" >> "$sum_file" +done +done + +echo "Done" +# Sort MRC file by cache_type and cache_size. +tmp_file="$result_dir/tmp_mrc" +cat "$mrc_file" | sort -t ',' -k1,1 -k4,4n > "$tmp_file" +cat "$tmp_file" > "$mrc_file" +rm -rf "$tmp_file" diff --git a/tools/block_cache_analyzer/block_cache_pysim_test.py b/tools/block_cache_analyzer/block_cache_pysim_test.py new file mode 100644 index 000000000..e298d7bbd --- /dev/null +++ b/tools/block_cache_analyzer/block_cache_pysim_test.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + +import random + +from block_cache_pysim import ( + HashTable, + LFUPolicy, + LinUCBCache, + LRUPolicy, + MRUPolicy, + ThompsonSamplingCache, + TraceRecord, + kSampleSize, +) + + +def test_hash_table(): + print("Test hash table") + table = HashTable() + data_size = 10000 + for i in range(data_size): + table.insert("k{}".format(i), i, "v{}".format(i)) + for i in range(data_size): + assert table.lookup("k{}".format(i), i) is not None + for i in range(data_size): + table.delete("k{}".format(i), i) + for i in range(data_size): + assert table.lookup("k{}".format(i), i) is None + + truth_map = {} + n = 1000000 + records = 100 + for i in range(n): + key_id = random.randint(0, records) + key = "k{}".format(key_id) + value = "v{}".format(key_id) + action = random.randint(0, 2) + # print "{}:{}:{}".format(action, key, value) + assert len(truth_map) == table.elements, "{} {} {}".format( + len(truth_map), table.elements, i + ) + if action == 0: + table.insert(key, key_id, value) + truth_map[key] = value + elif action == 1: + if key in truth_map: + assert table.lookup(key, key_id) is not None + assert truth_map[key] == table.lookup(key, key_id) + else: + assert table.lookup(key, key_id) is None + else: + table.delete(key, key_id) + if key in truth_map: + del truth_map[key] + print("Test hash table: Success") + + +def assert_metrics(cache, expected_value): + assert cache.used_size == expected_value[0], "Expected {}, Actual {}".format( + expected_value[0], cache.used_size + ) + assert ( + cache.miss_ratio_stats.num_accesses == expected_value[1] + ), "Expected {}, Actual {}".format( + expected_value[1], cache.miss_ratio_stats.num_accesses + ) + assert ( + cache.miss_ratio_stats.num_misses == expected_value[2] + ), "Expected {}, Actual {}".format( + expected_value[2], cache.miss_ratio_stats.num_misses + ) + assert cache.table.elements == len(expected_value[3]) + len( + expected_value[4] + ), "Expected {}, Actual {}".format( + len(expected_value[3]) + len(expected_value[4]), cache.table.elements + ) + for expeceted_k in expected_value[3]: + val = cache.table.lookup("b{}".format(expeceted_k), expeceted_k) + assert val is not None + assert val.value_size == 1 + for expeceted_k in expected_value[4]: + val = cache.table.lookup("g{}".format(expeceted_k), expeceted_k) + assert val is not None + assert val.value_size == 1 + + +# Access k1, k1, k2, k3, k3, k3, k4 +def test_cache(policies, expected_value): + cache = ThompsonSamplingCache(3, False, policies) + k1 = TraceRecord( + access_time=0, + block_id=1, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=5, + is_hit=1, + ) + k2 = TraceRecord( + access_time=1, + block_id=2, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=5, + is_hit=1, + ) + k3 = TraceRecord( + access_time=2, + block_id=3, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=5, + is_hit=1, + ) + k4 = TraceRecord( + access_time=3, + block_id=4, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=5, + is_hit=1, + ) + sequence = [k1, k1, k2, k3, k3, k3] + index = 0 + expected_values = [] + # Access k1, miss. + expected_values.append([1, 1, 1, [1], []]) + # Access k1, hit. + expected_values.append([1, 2, 1, [1], []]) + # Access k2, miss. + expected_values.append([2, 3, 2, [1, 2], []]) + # Access k3, miss. + expected_values.append([3, 4, 3, [1, 2, 3], []]) + # Access k3, hit. + expected_values.append([3, 5, 3, [1, 2, 3], []]) + # Access k3, hit. + expected_values.append([3, 6, 3, [1, 2, 3], []]) + for access in sequence: + cache.access(access) + assert_metrics(cache, expected_values[index]) + index += 1 + cache.access(k4) + assert_metrics(cache, expected_value) + + +def test_lru_cache(): + print("Test LRU cache") + policies = [] + policies.append(LRUPolicy()) + # Access k4, miss. evict k1 + test_cache(policies, [3, 7, 4, [2, 3, 4], []]) + print("Test LRU cache: Success") + + +def test_mru_cache(): + print("Test MRU cache") + policies = [] + policies.append(MRUPolicy()) + # Access k4, miss. evict k3 + test_cache(policies, [3, 7, 4, [1, 2, 4], []]) + print("Test MRU cache: Success") + + +def test_lfu_cache(): + print("Test LFU cache") + policies = [] + policies.append(LFUPolicy()) + # Access k4, miss. evict k2 + test_cache(policies, [3, 7, 4, [1, 3, 4], []]) + print("Test LFU cache: Success") + + +def test_mix(cache): + print("Test Mix {} cache".format(cache.cache_name())) + n = 100000 + records = 199 + for i in range(n): + key_id = random.randint(0, records) + vs = random.randint(0, 10) + k = TraceRecord( + access_time=i, + block_id=key_id, + block_type=1, + block_size=vs, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=key_id, + key_id=key_id, + kv_size=5, + is_hit=1, + ) + cache.access(k) + assert cache.miss_ratio_stats.miss_ratio() > 0 + print("Test Mix {} cache: Success".format(cache.cache_name())) + + +def test_hybrid(cache): + print("Test {} cache".format(cache.cache_name())) + k = TraceRecord( + access_time=0, + block_id=1, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, # the first get request. + key_id=1, + kv_size=0, # no size. + is_hit=1, + ) + cache.access(k) # Expect a miss. + # used size, num accesses, num misses, hash table size, blocks, get keys. + assert_metrics(cache, [1, 1, 1, [1], []]) + k.access_time += 1 + k.kv_size = 1 + k.block_id = 2 + cache.access(k) # k should be inserted. + assert_metrics(cache, [3, 2, 2, [1, 2], [1]]) + k.access_time += 1 + k.block_id = 3 + cache.access(k) # k should not be inserted again. + assert_metrics(cache, [4, 3, 3, [1, 2, 3], [1]]) + # A second get request referencing the same key. + k.access_time += 1 + k.get_id = 2 + k.block_id = 4 + k.kv_size = 0 + cache.access(k) # k should observe a hit. No block access. + assert_metrics(cache, [4, 4, 3, [1, 2, 3], [1]]) + + # A third get request searches three files, three different keys. + # And the second key observes a hit. + k.access_time += 1 + k.kv_size = 1 + k.get_id = 3 + k.block_id = 3 + k.key_id = 2 + cache.access(k) # k should observe a miss. block 3 observes a hit. + assert_metrics(cache, [5, 5, 3, [1, 2, 3], [1, 2]]) + + k.access_time += 1 + k.kv_size = 1 + k.get_id = 3 + k.block_id = 4 + k.kv_size = 1 + k.key_id = 1 + cache.access(k) # k1 should observe a hit. + assert_metrics(cache, [5, 6, 3, [1, 2, 3], [1, 2]]) + + k.access_time += 1 + k.kv_size = 1 + k.get_id = 3 + k.block_id = 4 + k.kv_size = 1 + k.key_id = 3 + # k3 should observe a miss. + # However, as the get already complete, we should not access k3 any more. + cache.access(k) + assert_metrics(cache, [5, 7, 3, [1, 2, 3], [1, 2]]) + + # A fourth get request searches one file and two blocks. One row key. + k.access_time += 1 + k.get_id = 4 + k.block_id = 5 + k.key_id = 4 + k.kv_size = 1 + cache.access(k) + assert_metrics(cache, [7, 8, 4, [1, 2, 3, 5], [1, 2, 4]]) + + # A bunch of insertions which evict cached row keys. + for i in range(6, 100): + k.access_time += 1 + k.get_id = 0 + k.block_id = i + cache.access(k) + + k.get_id = 4 + k.block_id = 100 # A different block. + k.key_id = 4 # Same row key and should not be inserted again. + k.kv_size = 1 + cache.access(k) + assert_metrics(cache, [16, 103, 99, [i for i in range(101 - kSampleSize, 101)], []]) + print("Test {} cache: Success".format(cache.cache_name())) + + +if __name__ == "__main__": + policies = [] + policies.append(MRUPolicy()) + policies.append(LRUPolicy()) + policies.append(LFUPolicy()) + test_hash_table() + test_lru_cache() + test_mru_cache() + test_lfu_cache() + test_mix(ThompsonSamplingCache(100, False, policies)) + test_mix(ThompsonSamplingCache(100, True, policies)) + test_mix(LinUCBCache(100, False, policies)) + test_mix(LinUCBCache(100, True, policies)) + test_hybrid(ThompsonSamplingCache(kSampleSize, True, [LRUPolicy()])) + test_hybrid(LinUCBCache(kSampleSize, True, [LRUPolicy()])) diff --git a/tools/block_cache_trace_analyzer.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc similarity index 99% rename from tools/block_cache_trace_analyzer.cc rename to tools/block_cache_analyzer/block_cache_trace_analyzer.cc index 761395a66..032ed2be2 100644 --- a/tools/block_cache_trace_analyzer.cc +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc @@ -5,7 +5,7 @@ #ifndef ROCKSDB_LITE #ifdef GFLAGS -#include "tools/block_cache_trace_analyzer.h" +#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h" #include #include @@ -1395,13 +1395,12 @@ Status BlockCacheTraceAnalyzer::WriteHumanReadableTraceRecord( } int ret = snprintf( trace_record_buffer_, sizeof(trace_record_buffer_), - "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%" PRIu32 ",%" PRIu64 - "" - ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u\n", + "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32 + ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u\n", access.access_timestamp, block_id, access.block_type, access.block_size, - access.cf_id, access.level, access.sst_fd_number, access.caller, - access.no_insert, access.get_id, get_key_id, access.referenced_data_size, - access.is_cache_hit); + access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number, + access.caller, access.no_insert, access.get_id, get_key_id, + access.referenced_data_size, access.is_cache_hit); if (ret < 0) { return Status::IOError("failed to format the output"); } @@ -2134,6 +2133,7 @@ int block_cache_trace_analyzer_tool(int argc, char** argv) { analyzer.WriteAccessTimeline(label, kSecondInHour, false); } else { analyzer.WriteAccessTimeline(label, kSecondInMinute, false); + analyzer.WriteAccessTimeline(label, kSecondInHour, false); } } } diff --git a/tools/block_cache_trace_analyzer.h b/tools/block_cache_analyzer/block_cache_trace_analyzer.h similarity index 100% rename from tools/block_cache_trace_analyzer.h rename to tools/block_cache_analyzer/block_cache_trace_analyzer.h diff --git a/tools/block_cache_trace_analyzer_plot.py b/tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py similarity index 100% rename from tools/block_cache_trace_analyzer_plot.py rename to tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py diff --git a/tools/block_cache_trace_analyzer_test.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc similarity index 99% rename from tools/block_cache_trace_analyzer_test.cc rename to tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc index a028bf197..9917d5b9e 100644 --- a/tools/block_cache_trace_analyzer_test.cc +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc @@ -23,7 +23,7 @@ int main() { #include "rocksdb/trace_reader_writer.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "tools/block_cache_trace_analyzer.h" +#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h" #include "trace_replay/block_cache_tracer.h" namespace rocksdb { @@ -343,7 +343,7 @@ TEST_F(BlockCacheTracerTest, BlockCacheAnalyzer) { std::string l; ASSERT_TRUE(getline(ss, l, ',')); if (l.find("block") == std::string::npos) { - if (unit != "_60" || user_access_only != "all_access_") { + if (user_access_only != "all_access_") { continue; } } diff --git a/tools/block_cache_trace_analyzer_tool.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc similarity index 91% rename from tools/block_cache_trace_analyzer_tool.cc rename to tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc index b7b36c5d2..63382cf8c 100644 --- a/tools/block_cache_trace_analyzer_tool.cc +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc @@ -11,7 +11,7 @@ int main() { return 1; } #else // GFLAGS -#include "tools/block_cache_trace_analyzer.h" +#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h" int main(int argc, char** argv) { return rocksdb::block_cache_trace_analyzer_tool(argc, argv); }