diff --git a/tools/block_cache_analyzer/block_cache_pysim.py b/tools/block_cache_analyzer/block_cache_pysim.py index 63e367be5..67307df53 100644 --- a/tools/block_cache_analyzer/block_cache_pysim.py +++ b/tools/block_cache_analyzer/block_cache_pysim.py @@ -2,15 +2,17 @@ # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. import gc +import heapq import random import sys import time +from collections import OrderedDict from os import path import numpy as np -kSampleSize = 16 # The sample size used when performing eviction. +kSampleSize = 64 # The sample size used when performing eviction. kMicrosInSecond = 1000000 kSecondsInMinute = 60 kSecondsInHour = 3600 @@ -39,11 +41,19 @@ class TraceRecord: key_id, kv_size, is_hit, + referenced_key_exist_in_block, + num_keys_in_block, + table_id, + seq_number, + block_key_size, + key_size, + block_offset_in_file, + next_access_seq_no, ): self.access_time = access_time self.block_id = block_id self.block_type = block_type - self.block_size = block_size + self.block_size = block_size + block_key_size self.cf_id = cf_id self.cf_name = cf_name self.level = level @@ -60,22 +70,46 @@ class TraceRecord: self.is_hit = True else: self.is_hit = False + if referenced_key_exist_in_block == 1: + self.referenced_key_exist_in_block = True + else: + self.referenced_key_exist_in_block = False + self.num_keys_in_block = num_keys_in_block + self.table_id = table_id + self.seq_number = seq_number + self.block_key_size = block_key_size + self.key_size = key_size + self.block_offset_in_file = block_offset_in_file + self.next_access_seq_no = next_access_seq_no class CacheEntry: """A cache entry stored in the cache.""" - def __init__(self, value_size, cf_id, level, block_type, access_number): + def __init__( + self, + value_size, + cf_id, + level, + block_type, + table_id, + access_number, + time_s, + num_hits=0, + ): self.value_size = value_size self.last_access_number = access_number - self.num_hits = 0 + self.num_hits = num_hits self.cf_id = 0 self.level = level self.block_type = block_type + self.last_access_time = time_s + self.insertion_time = time_s + self.table_id = table_id def __repr__(self): """Debug string.""" - return "s={},last={},hits={},cf={},l={},bt={}".format( + return "(s={},last={},hits={},cf={},l={},bt={})\n".format( self.value_size, self.last_access_number, self.num_hits, @@ -84,6 +118,22 @@ class CacheEntry: self.block_type, ) + def cost_class(self, cost_class_label): + if cost_class_label == "table_bt": + return "{}-{}".format(self.table_id, self.block_type) + elif cost_class_label == "table": + return "{}".format(self.table_id) + elif cost_class_label == "bt": + return "{}".format(self.block_type) + elif cost_class_label == "cf": + return "{}".format(self.cf_id) + elif cost_class_label == "cf_bt": + return "{}-{}".format(self.cf_id, self.block_type) + elif cost_class_label == "table_level_bt": + return "{}-{}-{}".format(self.table_id, self.level, self.block_type) + assert False, "Unknown cost class label {}".format(cost_class_label) + return None + class HashEntry: """A hash entry stored in a hash table.""" @@ -106,30 +156,55 @@ class HashTable: """ def __init__(self): - self.table = [None] * 32 + self.initial_size = 32 + self.table = [None] * self.initial_size self.elements = 0 def random_sample(self, sample_size): """Randomly sample 'sample_size' hash entries from the table.""" samples = [] - index = random.randint(0, len(self.table)) - pos = (index + 1) % len(self.table) - searches = 0 + index = random.randint(0, len(self.table) - 1) + pos = index # Starting from index, adding hash entries to the sample list until # sample_size is met or we ran out of entries. - while pos != index and len(samples) < sample_size: + while True: if self.table[pos] is not None: for i in range(len(self.table[pos])): if self.table[pos][i] is None: continue samples.append(self.table[pos][i]) - if len(samples) > sample_size: + if len(samples) == sample_size: break pos += 1 pos = pos % len(self.table) - searches += 1 + if pos == index or len(samples) == sample_size: + break + assert len(samples) <= sample_size return samples + def __repr__(self): + all_entries = [] + for i in range(len(self.table)): + if self.table[i] is None: + continue + for j in range(len(self.table[i])): + if self.table[i][j] is not None: + all_entries.append(self.table[i][j]) + return "{}".format(all_entries) + + def values(self): + all_values = [] + for i in range(len(self.table)): + if self.table[i] is None: + continue + for j in range(len(self.table[i])): + if self.table[i][j] is not None: + all_values.append(self.table[i][j].value) + return all_values + + def __len__(self): + return self.elements + def insert(self, key, hash, value): """ Insert a hash entry in the table. Replace the old entry if it already @@ -140,19 +215,21 @@ class HashTable: index = hash % len(self.table) if self.table[index] is None: self.table[index] = [] + # Search for the entry first. for i in range(len(self.table[index])): - if self.table[index][i] is not None: - if ( - self.table[index][i].hash == hash - and self.table[index][i].key == key - ): - # The entry already exists in the table. - self.table[index][i] = HashEntry(key, hash, value) - return + if self.table[index][i] is None: continue - self.table[index][i] = HashEntry(key, hash, value) - inserted = True - break + if self.table[index][i].hash == hash and self.table[index][i].key == key: + # The entry already exists in the table. + self.table[index][i] = HashEntry(key, hash, value) + return + + # Find an empty slot. + for i in range(len(self.table[index])): + if self.table[index][i] is None: + self.table[index][i] = HashEntry(key, hash, value) + inserted = True + break if not inserted: self.table[index].append(HashEntry(key, hash, value)) self.elements += 1 @@ -160,7 +237,7 @@ class HashTable: def resize(self, new_size): if new_size == len(self.table): return - if new_size == 0: + if new_size < self.initial_size: return if self.elements < 100: return @@ -184,29 +261,31 @@ class HashTable: gc.collect() def grow(self): - if self.elements < len(self.table): + if self.elements < 4 * len(self.table): return - new_size = int(len(self.table) * 1.2) + new_size = int(len(self.table) * 1.5) self.resize(new_size) def delete(self, key, hash): index = hash % len(self.table) - entries = self.table[index] deleted = False - if entries is None: + deleted_entry = None + if self.table[index] is None: return - for i in range(len(entries)): + for i in range(len(self.table[index])): if ( - entries[i] is not None - and entries[i].hash == hash - and entries[i].key == key + self.table[index][i] is not None + and self.table[index][i].hash == hash + and self.table[index][i].key == key ): - entries[i] = None + deleted_entry = self.table[index][i] + self.table[index][i] = None self.elements -= 1 deleted = True break if deleted: self.shrink() + return deleted_entry def shrink(self): if self.elements * 2 >= len(self.table): @@ -216,12 +295,15 @@ class HashTable: def lookup(self, key, hash): index = hash % len(self.table) - entries = self.table[index] - if entries is None: + if self.table[index] is None: return None - for entry in entries: - if entry is not None and entry.hash == hash and entry.key == key: - return entry.value + for i in range(len(self.table[index])): + if ( + self.table[index][i] is not None + and self.table[index][i].hash == hash + and self.table[index][i].key == key + ): + return self.table[index][i].value return None @@ -231,9 +313,10 @@ class MissRatioStats: self.num_accesses = 0 self.time_unit = time_unit self.time_misses = {} + self.time_miss_bytes = {} self.time_accesses = {} - def update_metrics(self, access_time, is_hit): + def update_metrics(self, access_time, is_hit, miss_bytes): access_time /= kMicrosInSecond * self.time_unit self.num_accesses += 1 if access_time not in self.time_accesses: @@ -243,20 +326,41 @@ class MissRatioStats: self.num_misses += 1 if access_time not in self.time_misses: self.time_misses[access_time] = 0 + self.time_miss_bytes[access_time] = 0 self.time_misses[access_time] += 1 + self.time_miss_bytes[access_time] += miss_bytes def reset_counter(self): self.num_misses = 0 self.num_accesses = 0 + self.time_miss_bytes.clear() + self.time_misses.clear() + self.time_accesses.clear() + + def compute_miss_bytes(self): + miss_bytes = [] + for at in self.time_miss_bytes: + miss_bytes.append(self.time_miss_bytes[at]) + miss_bytes = sorted(miss_bytes) + avg_miss_bytes = 0 + p95_miss_bytes = 0 + for i in range(len(miss_bytes)): + avg_miss_bytes += float(miss_bytes[i]) / float(len(miss_bytes)) + + p95_index = min(int(0.95 * float(len(miss_bytes))), len(miss_bytes) - 1) + p95_miss_bytes = miss_bytes[p95_index] + return avg_miss_bytes, p95_miss_bytes def miss_ratio(self): return float(self.num_misses) * 100.0 / float(self.num_accesses) - def write_miss_timeline(self, cache_type, cache_size, result_dir, start, end): + def write_miss_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): start /= kMicrosInSecond * self.time_unit end /= kMicrosInSecond * self.time_unit - header_file_path = "{}/header-ml-miss-timeline-{}-{}-{}".format( - result_dir, self.time_unit, cache_type, cache_size + header_file_path = "{}/header-ml-miss-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name ) if not path.exists(header_file_path): with open(header_file_path, "w+") as header_file: @@ -264,8 +368,8 @@ class MissRatioStats: for trace_time in range(start, end): header += ",{}".format(trace_time) header_file.write(header + "\n") - file_path = "{}/data-ml-miss-timeline-{}-{}-{}".format( - result_dir, self.time_unit, cache_type, cache_size + file_path = "{}/data-ml-miss-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name ) with open(file_path, "w+") as file: row = "{}".format(cache_type) @@ -273,11 +377,13 @@ class MissRatioStats: row += ",{}".format(self.time_misses.get(trace_time, 0)) file.write(row + "\n") - def write_miss_ratio_timeline(self, cache_type, cache_size, result_dir, start, end): + def write_miss_ratio_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): start /= kMicrosInSecond * self.time_unit end /= kMicrosInSecond * self.time_unit - header_file_path = "{}/header-ml-miss-ratio-timeline-{}-{}-{}".format( - result_dir, self.time_unit, cache_type, cache_size + header_file_path = "{}/header-ml-miss-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name ) if not path.exists(header_file_path): with open(header_file_path, "w+") as header_file: @@ -285,8 +391,8 @@ class MissRatioStats: for trace_time in range(start, end): header += ",{}".format(trace_time) header_file.write(header + "\n") - file_path = "{}/data-ml-miss-ratio-timeline-{}-{}-{}".format( - result_dir, self.time_unit, cache_type, cache_size + file_path = "{}/data-ml-miss-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name ) with open(file_path, "w+") as file: row = "{}".format(cache_type) @@ -322,11 +428,13 @@ class PolicyStats: self.time_selected_polices[access_time][policy_name] = 0 self.time_selected_polices[access_time][policy_name] += 1 - def write_policy_timeline(self, cache_type, cache_size, result_dir, start, end): + def write_policy_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): start /= kMicrosInSecond * self.time_unit end /= kMicrosInSecond * self.time_unit - header_file_path = "{}/header-ml-policy-timeline-{}-{}-{}".format( - result_dir, self.time_unit, cache_type, cache_size + header_file_path = "{}/header-ml-policy-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name ) if not path.exists(header_file_path): with open(header_file_path, "w+") as header_file: @@ -334,8 +442,8 @@ class PolicyStats: for trace_time in range(start, end): header += ",{}".format(trace_time) header_file.write(header + "\n") - file_path = "{}/data-ml-policy-timeline-{}-{}-{}".format( - result_dir, self.time_unit, cache_type, cache_size + file_path = "{}/data-ml-policy-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name ) with open(file_path, "w+") as file: for policy in self.policy_names: @@ -350,12 +458,12 @@ class PolicyStats: file.write(row + "\n") def write_policy_ratio_timeline( - self, cache_type, cache_size, file_path, start, end + self, cache_type, cache_size, target_cf_name, file_path, start, end ): start /= kMicrosInSecond * self.time_unit end /= kMicrosInSecond * self.time_unit - header_file_path = "{}/header-ml-policy-ratio-timeline-{}-{}-{}".format( - result_dir, self.time_unit, cache_type, cache_size + header_file_path = "{}/header-ml-policy-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name ) if not path.exists(header_file_path): with open(header_file_path, "w+") as header_file: @@ -363,8 +471,8 @@ class PolicyStats: for trace_time in range(start, end): header += ",{}".format(trace_time) header_file.write(header + "\n") - file_path = "{}/data-ml-policy-ratio-timeline-{}-{}-{}".format( - result_dir, self.time_unit, cache_type, cache_size + file_path = "{}/data-ml-policy-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name ) with open(file_path, "w+") as file: for policy in self.policy_names: @@ -400,7 +508,7 @@ class Policy(object): def delete(self, key): self.evicted_keys.pop(key, None) - def prioritize_samples(self, samples): + def prioritize_samples(self, samples, auxilliary_info): raise NotImplementedError def policy_name(self): @@ -413,7 +521,7 @@ class Policy(object): class LRUPolicy(Policy): - def prioritize_samples(self, samples): + def prioritize_samples(self, samples, auxilliary_info): return sorted( samples, cmp=lambda e1, e2: e1.value.last_access_number @@ -425,7 +533,7 @@ class LRUPolicy(Policy): class MRUPolicy(Policy): - def prioritize_samples(self, samples): + def prioritize_samples(self, samples, auxilliary_info): return sorted( samples, cmp=lambda e1, e2: e2.value.last_access_number @@ -437,175 +545,478 @@ class MRUPolicy(Policy): class LFUPolicy(Policy): - def prioritize_samples(self, samples): + def prioritize_samples(self, samples, auxilliary_info): return sorted(samples, cmp=lambda e1, e2: e1.value.num_hits - e2.value.num_hits) def policy_name(self): return "lfu" -class MLCache(object): - def __init__(self, cache_size, enable_cache_row_key, policies): +class HyperbolicPolicy(Policy): + """ + An implementation of Hyperbolic caching. + + Aaron Blankstein, Siddhartha Sen, and Michael J. Freedman. 2017. + Hyperbolic caching: flexible caching for web applications. In Proceedings + of the 2017 USENIX Conference on Usenix Annual Technical Conference + (USENIX ATC '17). USENIX Association, Berkeley, CA, USA, 499-511. + """ + + def compare(self, e1, e2, now): + e1_duration = max(0, (now - e1.value.insertion_time) / kMicrosInSecond) * float( + e1.value.value_size + ) + e2_duration = max(0, (now - e2.value.insertion_time) / kMicrosInSecond) * float( + e2.value.value_size + ) + if e1_duration == e2_duration: + return e1.value.num_hits - e2.value.num_hits + if e1_duration == 0: + return 1 + if e2_duration == 0: + return 1 + diff = (float(e1.value.num_hits) / (float(e1_duration))) - ( + float(e2.value.num_hits) / float(e2_duration) + ) + if diff == 0: + return 0 + elif diff > 0: + return 1 + else: + return -1 + + def prioritize_samples(self, samples, auxilliary_info): + assert len(auxilliary_info) == 3 + now = auxilliary_info[0] + return sorted(samples, cmp=lambda e1, e2: self.compare(e1, e2, now)) + + def policy_name(self): + return "hb" + + +class CostClassPolicy(Policy): + """ + We calculate the hit density of a cost class as + number of hits / total size in cache * average duration in the cache. + + An entry has a higher priority if its class's hit density is higher. + """ + + def compare(self, e1, e2, now, cost_classes, cost_class_label): + e1_class = e1.value.cost_class(cost_class_label) + e2_class = e2.value.cost_class(cost_class_label) + + assert e1_class in cost_classes + assert e2_class in cost_classes + + e1_entry = cost_classes[e1_class] + e2_entry = cost_classes[e2_class] + e1_density = e1_entry.density(now) + e2_density = e2_entry.density(now) + e1_hits = cost_classes[e1_class].hits + e2_hits = cost_classes[e2_class].hits + + if e1_density == e2_density: + return e1_hits - e2_hits + + if e1_entry.num_entries_in_cache == 0: + return -1 + if e2_entry.num_entries_in_cache == 0: + return 1 + + if e1_density == 0: + return 1 + if e2_density == 0: + return -1 + diff = (float(e1_hits) / float(e1_density)) - ( + float(e2_hits) / float(e2_density) + ) + if diff == 0: + return 0 + elif diff > 0: + return 1 + else: + return -1 + + def prioritize_samples(self, samples, auxilliary_info): + assert len(auxilliary_info) == 3 + now = auxilliary_info[0] + cost_classes = auxilliary_info[1] + cost_class_label = auxilliary_info[2] + return sorted( + samples, + cmp=lambda e1, e2: self.compare( + e1, e2, now, cost_classes, cost_class_label + ), + ) + + def policy_name(self): + return "cc" + + +class Cache(object): + """ + This is the base class for the implementations of alternative cache + replacement policies. + """ + + def __init__(self, cache_size, enable_cache_row_key): self.cache_size = cache_size self.used_size = 0 + self.per_second_miss_ratio_stats = MissRatioStats(1) self.miss_ratio_stats = MissRatioStats(kSecondsInMinute) - self.policy_stats = PolicyStats(kSecondsInMinute, policies) self.per_hour_miss_ratio_stats = MissRatioStats(kSecondsInHour) - self.per_hour_policy_stats = PolicyStats(kSecondsInHour, policies) - self.table = HashTable() + # 0: disabled. 1: enabled. Insert both row and the refereneced data block. + # 2: enabled. Insert only the row but NOT the referenced data block. self.enable_cache_row_key = enable_cache_row_key self.get_id_row_key_map = {} - self.policies = policies + self.max_seen_get_id = 0 + self.retain_get_id_range = 100000 - def _lookup(self, key, hash): - value = self.table.lookup(key, hash) - if value is not None: - value.last_access_number = self.miss_ratio_stats.num_accesses - value.num_hits += 1 - return True - return False + def block_key(self, trace_record): + return "b{}".format(trace_record.block_id) - def _select_policy(self, trace_record, key): - raise NotImplementedError + def row_key(self, trace_record): + return "g{}-{}".format(trace_record.fd, trace_record.key_id) - def cache_name(self): + def _lookup(self, trace_record, key, hash): + """ + Look up the key in the cache. + Returns true upon a cache hit, false otherwise. + """ raise NotImplementedError - def _evict(self, policy_index, value_size): - # Randomly sample n entries. - samples = self.table.random_sample(kSampleSize) - samples = self.policies[policy_index].prioritize_samples(samples) - for hash_entry in samples: - self.used_size -= hash_entry.value.value_size - self.table.delete(hash_entry.key, hash_entry.hash) - self.policies[policy_index].evict( - key=hash_entry.key, max_size=self.table.elements - ) - if self.used_size + value_size <= self.cache_size: - break + def _evict(self, trace_record, key, hash, value_size): + """ + Evict entries in the cache until there is enough room to insert the new + entry with 'value_size'. + """ + raise NotImplementedError def _insert(self, trace_record, key, hash, value_size): - if value_size > self.cache_size: - return - policy_index = self._select_policy(trace_record, key) - self.policies[policy_index].delete(key) - self.policy_stats.update_metrics(trace_record.access_time, policy_index) - self.per_hour_policy_stats.update_metrics( - trace_record.access_time, policy_index - ) - while self.used_size + value_size > self.cache_size: - self._evict(policy_index, value_size) - self.table.insert( - key, - hash, - CacheEntry( - value_size, - trace_record.cf_id, - trace_record.level, - trace_record.block_type, - self.miss_ratio_stats.num_accesses, - ), - ) - self.used_size += value_size + """ + Insert the new entry into the cache. + """ + raise NotImplementedError - def _access_kv(self, trace_record, key, hash, value_size, no_insert): - if self._lookup(key, hash): - return True - if not no_insert and value_size > 0: - self._insert(trace_record, key, hash, value_size) + def _should_admit(self, trace_record, key, hash, value_size): + """ + A custom admission policy to decide whether we should admit the new + entry upon a cache miss. + Returns true if the new entry should be admitted, false otherwise. + """ + raise NotImplementedError + + def cache_name(self): + """ + The name of the replacement policy. + """ + raise NotImplementedError + + def is_ml_cache(self): return False - def _update_stats(self, access_time, is_hit): - self.miss_ratio_stats.update_metrics(access_time, is_hit) - self.per_hour_miss_ratio_stats.update_metrics(access_time, is_hit) + def _update_stats(self, access_time, is_hit, miss_bytes): + self.per_second_miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) + self.miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) + self.per_hour_miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) def access(self, trace_record): + """ + Access a trace record. The simulator calls this function to access a + trace record. + """ assert self.used_size <= self.cache_size if ( - self.enable_cache_row_key + self.enable_cache_row_key > 0 and trace_record.caller == 1 and trace_record.key_id != 0 and trace_record.get_id != 0 ): # This is a get request. - if trace_record.get_id not in self.get_id_row_key_map: - self.get_id_row_key_map[trace_record.get_id] = {} - self.get_id_row_key_map[trace_record.get_id]["h"] = False - if self.get_id_row_key_map[trace_record.get_id]["h"]: - # We treat future accesses as hits since this get request - # completes. - self._update_stats(trace_record.access_time, is_hit=True) - return - if trace_record.key_id not in self.get_id_row_key_map[trace_record.get_id]: - # First time seen this key. - is_hit = self._access_kv( - trace_record, - key="g{}".format(trace_record.key_id), - hash=trace_record.key_id, - value_size=trace_record.kv_size, - no_insert=False, - ) - inserted = False - if trace_record.kv_size > 0: - inserted = True - self.get_id_row_key_map[trace_record.get_id][ - trace_record.key_id - ] = inserted - self.get_id_row_key_map[trace_record.get_id]["h"] = is_hit - if self.get_id_row_key_map[trace_record.get_id]["h"]: - # We treat future accesses as hits since this get request - # completes. - self._update_stats(trace_record.access_time, is_hit=True) - return - # Access its blocks. + self._access_row(trace_record) + return + is_hit = self._access_kv( + trace_record, + self.block_key(trace_record), + trace_record.block_id, + trace_record.block_size, + trace_record.no_insert, + ) + self._update_stats( + trace_record.access_time, is_hit=is_hit, miss_bytes=trace_record.block_size + ) + + def _access_row(self, trace_record): + row_key = self.row_key(trace_record) + self.max_seen_get_id = max(self.max_seen_get_id, trace_record.get_id) + self.get_id_row_key_map.pop( + self.max_seen_get_id - self.retain_get_id_range, None + ) + if trace_record.get_id not in self.get_id_row_key_map: + self.get_id_row_key_map[trace_record.get_id] = {} + self.get_id_row_key_map[trace_record.get_id]["h"] = False + if self.get_id_row_key_map[trace_record.get_id]["h"]: + # We treat future accesses as hits since this get request + # completes. + # print("row hit 1") + self._update_stats(trace_record.access_time, is_hit=True, miss_bytes=0) + return + if row_key not in self.get_id_row_key_map[trace_record.get_id]: + # First time seen this key. is_hit = self._access_kv( trace_record, - key="b{}".format(trace_record.block_id), - hash=trace_record.block_id, - value_size=trace_record.block_size, - no_insert=trace_record.no_insert, + key=row_key, + hash=trace_record.key_id, + value_size=trace_record.kv_size, + no_insert=False, ) - self._update_stats(trace_record.access_time, is_hit) - if ( - trace_record.kv_size > 0 - and not self.get_id_row_key_map[trace_record.get_id][ - trace_record.key_id - ] - ): - # Insert the row key-value pair. - self._access_kv( - trace_record, - key="g{}".format(trace_record.key_id), - hash=trace_record.key_id, - value_size=trace_record.kv_size, - no_insert=False, - ) - # Mark as inserted. - self.get_id_row_key_map[trace_record.get_id][trace_record.key_id] = True + inserted = False + if trace_record.kv_size > 0: + inserted = True + self.get_id_row_key_map[trace_record.get_id][row_key] = inserted + self.get_id_row_key_map[trace_record.get_id]["h"] = is_hit + if self.get_id_row_key_map[trace_record.get_id]["h"]: + # We treat future accesses as hits since this get request + # completes. + # print("row hit 2") + self._update_stats(trace_record.access_time, is_hit=True, miss_bytes=0) return - # Access the block. + # Access its blocks. + no_insert = trace_record.no_insert + if ( + self.enable_cache_row_key == 2 + and trace_record.kv_size > 0 + and trace_record.block_type == 9 + ): + no_insert = True is_hit = self._access_kv( trace_record, - key="b{}".format(trace_record.block_id), + key=self.block_key(trace_record), hash=trace_record.block_id, value_size=trace_record.block_size, - no_insert=trace_record.no_insert, + no_insert=no_insert, + ) + self._update_stats( + trace_record.access_time, is_hit, miss_bytes=trace_record.block_size + ) + if ( + trace_record.kv_size > 0 + and not self.get_id_row_key_map[trace_record.get_id][row_key] + ): + # Insert the row key-value pair. + self._access_kv( + trace_record, + key=row_key, + hash=trace_record.key_id, + value_size=trace_record.kv_size, + no_insert=False, + ) + # Mark as inserted. + self.get_id_row_key_map[trace_record.get_id][row_key] = True + + def _access_kv(self, trace_record, key, hash, value_size, no_insert): + # Sanity checks. + assert self.used_size <= self.cache_size + if self._lookup(trace_record, key, hash): + # A cache hit. + return True + if no_insert or value_size <= 0: + return False + # A cache miss. + if value_size > self.cache_size: + # The block is too large to fit into the cache. + return False + self._evict(trace_record, key, hash, value_size) + if self._should_admit(trace_record, key, hash, value_size): + self._insert(trace_record, key, hash, value_size) + self.used_size += value_size + return False + + +class CostClassEntry: + """ + A cost class maintains aggregated statistics of cached entries in a class. + For example, we may define block type as a class. Then, cached blocks of the + same type will share one cost class entry. + """ + + def __init__(self): + self.hits = 0 + self.num_entries_in_cache = 0 + self.size_in_cache = 0 + self.sum_insertion_times = 0 + self.sum_last_access_time = 0 + + def insert(self, trace_record, key, value_size): + self.size_in_cache += value_size + self.num_entries_in_cache += 1 + self.sum_insertion_times += trace_record.access_time / kMicrosInSecond + self.sum_last_access_time += trace_record.access_time / kMicrosInSecond + + def remove(self, insertion_time, last_access_time, key, value_size, num_hits): + self.hits -= num_hits + self.num_entries_in_cache -= 1 + self.sum_insertion_times -= insertion_time / kMicrosInSecond + self.size_in_cache -= value_size + self.sum_last_access_time -= last_access_time / kMicrosInSecond + + def update_on_hit(self, trace_record, last_access_time): + self.hits += 1 + self.sum_last_access_time -= last_access_time / kMicrosInSecond + self.sum_last_access_time += trace_record.access_time / kMicrosInSecond + + def avg_lifetime_in_cache(self, now): + avg_insertion_time = self.sum_insertion_times / self.num_entries_in_cache + return now / kMicrosInSecond - avg_insertion_time + + def avg_last_access_time(self): + if self.num_entries_in_cache == 0: + return 0 + return float(self.sum_last_access_time) / float(self.num_entries_in_cache) + + def avg_size(self): + if self.num_entries_in_cache == 0: + return 0 + return float(self.sum_last_access_time) / float(self.num_entries_in_cache) + + def density(self, now): + avg_insertion_time = self.sum_insertion_times / self.num_entries_in_cache + in_cache_duration = now / kMicrosInSecond - avg_insertion_time + return self.size_in_cache * in_cache_duration + + +class MLCache(Cache): + """ + MLCache is the base class for implementations of alternative replacement + policies using reinforcement learning. + """ + + def __init__(self, cache_size, enable_cache_row_key, policies, cost_class_label): + super(MLCache, self).__init__(cache_size, enable_cache_row_key) + self.table = HashTable() + self.policy_stats = PolicyStats(kSecondsInMinute, policies) + self.per_hour_policy_stats = PolicyStats(kSecondsInHour, policies) + self.policies = policies + self.cost_classes = {} + self.cost_class_label = cost_class_label + + def is_ml_cache(self): + return True + + def _lookup(self, trace_record, key, hash): + value = self.table.lookup(key, hash) + if value is not None: + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = value.cost_class(self.cost_class_label) + assert cost_class in self.cost_classes + self.cost_classes[cost_class].update_on_hit( + trace_record, value.last_access_time + ) + # Update the entry's last access time. + self.table.insert( + key, + hash, + CacheEntry( + value_size=value.value_size, + cf_id=value.cf_id, + level=value.level, + block_type=value.block_type, + table_id=value.table_id, + access_number=self.miss_ratio_stats.num_accesses, + time_s=trace_record.access_time, + num_hits=value.num_hits + 1, + ), + ) + return True + return False + + def _evict(self, trace_record, key, hash, value_size): + # Select a policy, random sample kSampleSize keys from the cache, then + # evict keys in the sample set until we have enough room for the new + # entry. + policy_index = self._select_policy(trace_record, key) + assert policy_index < len(self.policies) and policy_index >= 0 + self.policies[policy_index].delete(key) + self.policy_stats.update_metrics(trace_record.access_time, policy_index) + self.per_hour_policy_stats.update_metrics( + trace_record.access_time, policy_index + ) + while self.used_size + value_size > self.cache_size: + # Randomly sample n entries. + samples = self.table.random_sample(kSampleSize) + samples = self.policies[policy_index].prioritize_samples( + samples, + [trace_record.access_time, self.cost_classes, self.cost_class_label], + ) + for hash_entry in samples: + assert self.table.delete(hash_entry.key, hash_entry.hash) is not None + self.used_size -= hash_entry.value.value_size + self.policies[policy_index].evict( + key=hash_entry.key, max_size=self.table.elements + ) + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = hash_entry.value.cost_class(self.cost_class_label) + assert cost_class in self.cost_classes + self.cost_classes[cost_class].remove( + hash_entry.value.insertion_time, + hash_entry.value.last_access_time, + key, + hash_entry.value.value_size, + hash_entry.value.num_hits, + ) + if self.used_size + value_size <= self.cache_size: + break + + def _insert(self, trace_record, key, hash, value_size): + assert self.used_size + value_size <= self.cache_size + entry = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + self.miss_ratio_stats.num_accesses, + trace_record.access_time, ) - self._update_stats(trace_record.access_time, is_hit) + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = entry.cost_class(self.cost_class_label) + if cost_class not in self.cost_classes: + self.cost_classes[cost_class] = CostClassEntry() + self.cost_classes[cost_class].insert(trace_record, key, value_size) + self.table.insert(key, hash, entry) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def _select_policy(self, trace_record, key): + raise NotImplementedError class ThompsonSamplingCache(MLCache): """ - An implementation of Thompson Sampling for the Bernoulli Bandit [1]. - [1] Daniel J. Russo, Benjamin Van Roy, Abbas Kazerouni, Ian Osband, + An implementation of Thompson Sampling for the Bernoulli Bandit. + + Daniel J. Russo, Benjamin Van Roy, Abbas Kazerouni, Ian Osband, and Zheng Wen. 2018. A Tutorial on Thompson Sampling. Found. Trends Mach. Learn. 11, 1 (July 2018), 1-96. DOI: https://doi.org/10.1561/2200000070 """ - def __init__(self, cache_size, enable_cache_row_key, policies, init_a=1, init_b=1): + def __init__( + self, + cache_size, + enable_cache_row_key, + policies, + cost_class_label, + init_a=1, + init_b=1, + ): super(ThompsonSamplingCache, self).__init__( - cache_size, enable_cache_row_key, policies + cache_size, enable_cache_row_key, policies, cost_class_label ) self._as = {} self._bs = {} @@ -614,6 +1025,8 @@ class ThompsonSamplingCache(MLCache): self._bs = [init_b] * len(self.policies) def _select_policy(self, trace_record, key): + if len(self.policies) == 1: + return 0 samples = [ np.random.beta(self._as[x], self._bs[x]) for x in range(len(self.policies)) ] @@ -626,23 +1039,28 @@ class ThompsonSamplingCache(MLCache): def cache_name(self): if self.enable_cache_row_key: - return "Hybrid ThompsonSampling (ts_hybrid)" - return "ThompsonSampling (ts)" + return "Hybrid ThompsonSampling with cost class {} (ts_hybrid)".format( + self.cost_class_label + ) + return "ThompsonSampling with cost class {} (ts)".format(self.cost_class_label) class LinUCBCache(MLCache): """ - An implementation of LinUCB with disjoint linear models [2]. - [2] Lihong Li, Wei Chu, John Langford, and Robert E. Schapire. 2010. + An implementation of LinUCB with disjoint linear models. + + Lihong Li, Wei Chu, John Langford, and Robert E. Schapire. 2010. A contextual-bandit approach to personalized news article recommendation. In Proceedings of the 19th international conference on World wide web (WWW '10). ACM, New York, NY, USA, 661-670. DOI=http://dx.doi.org/10.1145/1772690.1772758 """ - def __init__(self, cache_size, enable_cache_row_key, policies): - super(LinUCBCache, self).__init__(cache_size, enable_cache_row_key, policies) - self.nfeatures = 4 # Block type, caller, level, cf. + def __init__(self, cache_size, enable_cache_row_key, policies, cost_class_label): + super(LinUCBCache, self).__init__( + cache_size, enable_cache_row_key, policies, cost_class_label + ) + self.nfeatures = 4 # Block type, level, cf. self.th = np.zeros((len(self.policies), self.nfeatures)) self.eps = 0.2 self.b = np.zeros_like(self.th) @@ -655,11 +1073,12 @@ class LinUCBCache(MLCache): self.alph = 0.2 def _select_policy(self, trace_record, key): + if len(self.policies) == 1: + return 0 x_i = np.zeros(self.nfeatures) # The current context vector x_i[0] = trace_record.block_type - x_i[1] = trace_record.caller - x_i[2] = trace_record.level - x_i[3] = trace_record.cf_id + x_i[1] = trace_record.level + x_i[2] = trace_record.cf_id p = np.zeros(len(self.policies)) for a in range(len(self.policies)): self.th_hat[a] = self.A_inv[a].dot(self.b[a]) @@ -679,8 +1098,429 @@ class LinUCBCache(MLCache): def cache_name(self): if self.enable_cache_row_key: - return "Hybrid LinUCB (linucb_hybrid)" - return "LinUCB (linucb)" + return "Hybrid LinUCB with cost class {} (linucb_hybrid)".format( + self.cost_class_label + ) + return "LinUCB with cost class {} (linucb)".format(self.cost_class_label) + + +class OPTCacheEntry: + """ + A cache entry for the OPT algorithm. The entries are sorted based on its + next access sequence number in reverse order, i.e., the entry which next + access is the furthest in the future is ordered before other entries. + """ + + def __init__(self, key, next_access_seq_no, value_size): + self.key = key + self.next_access_seq_no = next_access_seq_no + self.value_size = value_size + self.is_removed = False + + def __cmp__(self, other): + if other.next_access_seq_no != self.next_access_seq_no: + return other.next_access_seq_no - self.next_access_seq_no + return self.value_size - other.value_size + + def __repr__(self): + return "({} {} {} {})".format( + self.key, self.next_access_seq_no, self.value_size, self.is_removed + ) + + +class PQTable: + """ + A hash table with a priority queue. + """ + + def __init__(self): + # A list of entries arranged in a heap sorted based on the entry custom + # implementation of __cmp__ + self.pq = [] + self.table = {} + + def pqinsert(self, entry): + "Add a new key or update the priority of an existing key" + # Remove the entry from the table first. + removed_entry = self.table.pop(entry.key, None) + if removed_entry: + # Mark as removed since there is no 'remove' API in heappq. + # Instead, an entry in pq is removed lazily when calling pop. + removed_entry.is_removed = True + self.table[entry.key] = entry + heapq.heappush(self.pq, entry) + return removed_entry + + def pqpop(self): + while self.pq: + entry = heapq.heappop(self.pq) + if not entry.is_removed: + del self.table[entry.key] + return entry + return None + + def pqpeek(self): + while self.pq: + entry = self.pq[0] + if not entry.is_removed: + return entry + heapq.heappop(self.pq) + return + + def __contains__(self, k): + return k in self.table + + def __getitem__(self, k): + return self.table[k] + + def __len__(self): + return len(self.table) + + def values(self): + return self.table.values() + + +class OPTCache(Cache): + """ + An implementation of the Belady MIN algorithm. OPTCache evicts an entry + in the cache whose next access occurs furthest in the future. + + Note that Belady MIN algorithm is optimal assuming all blocks having the + same size and a missing entry will be inserted in the cache. + These are NOT true for the block cache trace since blocks have different + sizes and we may not insert a block into the cache upon a cache miss. + However, it is still useful to serve as a "theoretical upper bound" on the + lowest miss ratio we can achieve given a cache size. + + L. A. Belady. 1966. A Study of Replacement Algorithms for a + Virtual-storage Computer. IBM Syst. J. 5, 2 (June 1966), 78-101. + DOI=http://dx.doi.org/10.1147/sj.52.0078 + """ + + def __init__(self, cache_size): + super(OPTCache, self).__init__(cache_size, enable_cache_row_key=0) + self.table = PQTable() + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update its next access time. + assert ( + self.table.pqinsert( + OPTCacheEntry( + key, trace_record.next_access_seq_no, self.table[key].value_size + ) + ) + is not None + ) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_entry = self.table.pqpop() + assert evict_entry is not None + self.used_size -= evict_entry.value_size + + def _insert(self, trace_record, key, hash, value_size): + assert ( + self.table.pqinsert( + OPTCacheEntry(key, trace_record.next_access_seq_no, value_size) + ) + is None + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def cache_name(self): + return "Belady MIN (opt)" + + +class GDSizeEntry: + """ + A cache entry for the greedy dual size replacement policy. + """ + + def __init__(self, key, value_size, priority): + self.key = key + self.value_size = value_size + self.priority = priority + self.is_removed = False + + def __cmp__(self, other): + if other.priority != self.priority: + return self.priority - other.priority + return self.value_size - other.value_size + + def __repr__(self): + return "({} {} {} {})".format( + self.key, self.next_access_seq_no, self.value_size, self.is_removed + ) + + +class GDSizeCache(Cache): + """ + An implementation of the greedy dual size algorithm. + We define cost as an entry's size. + + See https://www.usenix.org/legacy/publications/library/proceedings/usits97/full_papers/cao/cao_html/node8.html + and N. Young. The k-server dual and loose competitiveness for paging. + Algorithmica,June 1994, vol. 11,(no.6):525-41. + Rewritten version of ''On-line caching as cache size varies'', + in The 2nd Annual ACM-SIAM Symposium on Discrete Algorithms, 241-250, 1991. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(GDSizeCache, self).__init__(cache_size, enable_cache_row_key) + self.table = PQTable() + self.L = 0.0 + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid GreedyDualSize (gdsize_hybrid)" + return "GreedyDualSize (gdsize)" + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update its priority. + entry = self.table[key] + assert ( + self.table.pqinsert( + GDSizeEntry(key, entry.value_size, self.L + entry.value_size) + ) + is not None + ) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_entry = self.table.pqpop() + assert evict_entry is not None + self.L = evict_entry.priority + self.used_size -= evict_entry.value_size + + def _insert(self, trace_record, key, hash, value_size): + assert ( + self.table.pqinsert(GDSizeEntry(key, value_size, self.L + value_size)) + is None + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + +class Deque(object): + """A Deque class facilitates the implementation of LRU and ARC.""" + + def __init__(self): + self.od = OrderedDict() + + def appendleft(self, k): + if k in self.od: + del self.od[k] + self.od[k] = None + + def pop(self): + item = self.od.popitem(last=False) if self.od else None + if item is not None: + return item[0] + return None + + def remove(self, k): + del self.od[k] + + def __len__(self): + return len(self.od) + + def __contains__(self, k): + return k in self.od + + def __iter__(self): + return reversed(self.od) + + def __repr__(self): + return "Deque(%r)" % (list(self),) + + +class ARCCache(Cache): + """ + An implementation of ARC. ARC assumes that all blocks are having the + same size. The size of index and filter blocks are variable. To accommodate + this, we modified ARC as follows: + 1) We use 16 KB as the average block size and calculate the number of blocks + (c) in the cache. + 2) When we insert an entry, the cache evicts entries in both t1 and t2 + queues until it has enough space for the new entry. This also requires + modification of the algorithm to maintain a maximum of 2*c blocks. + + Nimrod Megiddo and Dharmendra S. Modha. 2003. ARC: A Self-Tuning, Low + Overhead Replacement Cache. In Proceedings of the 2nd USENIX Conference on + File and Storage Technologies (FAST '03). USENIX Association, Berkeley, CA, + USA, 115-130. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(ARCCache, self).__init__(cache_size, enable_cache_row_key) + self.table = {} + self.c = cache_size / 16 * 1024 # Number of elements in the cache. + self.p = 0 # Target size for the list T1 + # L1: only once recently + self.t1 = Deque() # T1: recent cache entries + self.b1 = Deque() # B1: ghost entries recently evicted from the T1 cache + # L2: at least twice recently + self.t2 = Deque() # T2: frequent entries + self.b2 = Deque() # B2: ghost entries recently evicted from the T2 cache + + def _replace(self, key, value_size): + while self.used_size + value_size > self.cache_size: + if self.t1 and ((key in self.b2) or (len(self.t1) > self.p)): + old = self.t1.pop() + self.b1.appendleft(old) + else: + if self.t2: + old = self.t2.pop() + self.b2.appendleft(old) + else: + old = self.t1.pop() + self.b1.appendleft(old) + self.used_size -= self.table[old].value_size + del self.table[old] + + def _lookup(self, trace_record, key, hash): + # Case I: key is in T1 or T2. + # Move key to MRU position in T2. + if key in self.t1: + self.t1.remove(key) + self.t2.appendleft(key) + return True + + if key in self.t2: + self.t2.remove(key) + self.t2.appendleft(key) + return True + return False + + def _evict(self, trace_record, key, hash, value_size): + # Case II: key is in B1 + # Move x from B1 to the MRU position in T2 (also fetch x to the cache). + if key in self.b1: + self.p = min(self.c, self.p + max(len(self.b2) / len(self.b1), 1)) + self._replace(key, value_size) + self.b1.remove(key) + self.t2.appendleft(key) + return + + # Case III: key is in B2 + # Move x from B2 to the MRU position in T2 (also fetch x to the cache). + if key in self.b2: + self.p = max(0, self.p - max(len(self.b1) / len(self.b2), 1)) + self._replace(key, value_size) + self.b2.remove(key) + self.t2.appendleft(key) + return + + # Case IV: key is not in (T1 u B1 u T2 u B2) + self._replace(key, value_size) + while len(self.t1) + len(self.b1) >= self.c and self.b1: + self.b1.pop() + + total = len(self.t1) + len(self.b1) + len(self.t2) + len(self.b2) + while total >= (2 * self.c) and self.b2: + self.b2.pop() + total -= 1 + # Finally, move it to MRU position in T1. + self.t1.appendleft(key) + return + + def _insert(self, trace_record, key, hash, value_size): + self.table[key] = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + 0, + trace_record.access_time, + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid Adaptive Replacement Cache (arc_hybrid)" + return "Adaptive Replacement Cache (arc)" + + +class LRUCache(Cache): + """ + A strict LRU queue. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(LRUCache, self).__init__(cache_size, enable_cache_row_key) + self.table = {} + self.lru = Deque() + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid LRU (lru_hybrid)" + return "LRU (lru)" + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update LRU queue. + self.lru.remove(key) + self.lru.appendleft(key) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_key = self.lru.pop() + self.used_size -= self.table[evict_key].value_size + del self.table[evict_key] + + def _insert(self, trace_record, key, hash, value_size): + self.table[key] = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + 0, + trace_record.access_time, + ) + self.lru.appendleft(key) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + +class TraceCache(Cache): + """ + A trace cache. Lookup returns true if the trace observes a cache hit. + It is used to maintain cache hits observed in the trace. + """ + + def __init__(self, cache_size): + super(TraceCache, self).__init__(cache_size, enable_cache_row_key=0) + + def _lookup(self, trace_record, key, hash): + return trace_record.is_hit + + def _evict(self, trace_record, key, hash, value_size): + pass + + def _insert(self, trace_record, key, hash, value_size): + pass + + def _should_admit(self, trace_record, key, hash, value_size): + return False + + def cache_name(self): + return "Trace" def parse_cache_size(cs): @@ -695,47 +1535,255 @@ def parse_cache_size(cs): def create_cache(cache_type, cache_size, downsample_size): - policies = [] - policies.append(LRUPolicy()) - policies.append(MRUPolicy()) - policies.append(LFUPolicy()) cache_size = cache_size / downsample_size - enable_cache_row_key = False + enable_cache_row_key = 0 + if "hybridn" in cache_type: + enable_cache_row_key = 2 + cache_type = cache_type[:-8] if "hybrid" in cache_type: - enable_cache_row_key = True + enable_cache_row_key = 1 cache_type = cache_type[:-7] if cache_type == "ts": - return ThompsonSamplingCache(cache_size, enable_cache_row_key, policies) + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [LRUPolicy(), LFUPolicy(), HyperbolicPolicy()], + cost_class_label=None, + ) elif cache_type == "linucb": - return LinUCBCache(cache_size, enable_cache_row_key, policies) + return LinUCBCache( + cache_size, + enable_cache_row_key, + [LRUPolicy(), LFUPolicy(), HyperbolicPolicy()], + cost_class_label=None, + ) + elif cache_type == "pylru": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [LRUPolicy()], cost_class_label=None + ) + elif cache_type == "pymru": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [MRUPolicy()], cost_class_label=None + ) + elif cache_type == "pylfu": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [LFUPolicy()], cost_class_label=None + ) + elif cache_type == "pyhb": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [HyperbolicPolicy()], + cost_class_label=None, + ) + elif cache_type == "pycctbbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table_bt", + ) + elif cache_type == "pycccf": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [CostClassPolicy()], cost_class_label="cf" + ) + elif cache_type == "pycctblevelbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table_level_bt", + ) + elif cache_type == "pycccfbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="cf_bt", + ) + elif cache_type == "pycctb": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table", + ) + elif cache_type == "pyccbt": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [CostClassPolicy()], cost_class_label="bt" + ) + elif cache_type == "opt": + if enable_cache_row_key: + print("opt does not support hybrid mode.") + assert False + return OPTCache(cache_size) + elif cache_type == "trace": + if enable_cache_row_key: + print("trace does not support hybrid mode.") + assert False + return TraceCache(cache_size) + elif cache_type == "lru": + return LRUCache(cache_size, enable_cache_row_key) + elif cache_type == "arc": + return ARCCache(cache_size, enable_cache_row_key) + elif cache_type == "gdsize": + return GDSizeCache(cache_size, enable_cache_row_key) else: print("Unknown cache type {}".format(cache_type)) assert False return None -def run(trace_file_path, cache_type, cache, warmup_seconds): +class BlockAccessTimeline: + """ + BlockAccessTimeline stores all accesses of a block. + """ + + def __init__(self): + self.accesses = [] + self.current_access_index = 1 + + def get_next_access(self): + if self.current_access_index == len(self.accesses): + return sys.maxsize + next_access_seq_no = self.accesses[self.current_access_index] + self.current_access_index += 1 + return next_access_seq_no + + +def percent(e1, e2): + if e2 == 0: + return -1 + return float(e1) * 100.0 / float(e2) + + +def is_target_cf(access_cf, target_cf_name): + if target_cf_name == "all": + return True + return access_cf == target_cf_name + + +def run( + trace_file_path, + cache_type, + cache, + warmup_seconds, + max_accesses_to_process, + target_cf_name, +): warmup_complete = False - num = 0 + trace_miss_ratio_stats = MissRatioStats(kSecondsInMinute) + access_seq_no = 0 + time_interval = 1 + start_time = time.time() trace_start_time = 0 trace_duration = 0 - start_time = time.time() + is_opt_cache = False + if cache.cache_name() == "Belady MIN (opt)": + is_opt_cache = True + + block_access_timelines = {} + num_no_inserts = 0 + num_blocks_with_no_size = 0 + num_inserts_block_with_no_size = 0 + + if is_opt_cache: + # Read all blocks in memory and stores their access times so that OPT + # can use this information to evict the cached key which next access is + # the furthest in the future. + print("Preprocessing block traces.") + with open(trace_file_path, "r") as trace_file: + for line in trace_file: + if ( + max_accesses_to_process != -1 + and access_seq_no > max_accesses_to_process + ): + break + ts = line.split(",") + timestamp = int(ts[0]) + cf_name = ts[5] + if not is_target_cf(cf_name, target_cf_name): + continue + if trace_start_time == 0: + trace_start_time = timestamp + trace_duration = timestamp - trace_start_time + block_id = int(ts[1]) + block_size = int(ts[3]) + no_insert = int(ts[9]) + if block_id not in block_access_timelines: + block_access_timelines[block_id] = BlockAccessTimeline() + if block_size == 0: + num_blocks_with_no_size += 1 + block_access_timelines[block_id].accesses.append(access_seq_no) + access_seq_no += 1 + if no_insert == 1: + num_no_inserts += 1 + if no_insert == 0 and block_size == 0: + num_inserts_block_with_no_size += 1 + if access_seq_no % 100 != 0: + continue + now = time.time() + if now - start_time > time_interval * 10: + print( + "Take {} seconds to process {} trace records with trace " + "duration of {} seconds. Throughput: {} records/second.".format( + now - start_time, + access_seq_no, + trace_duration / 1000000, + access_seq_no / (now - start_time), + ) + ) + time_interval += 1 + print( + "Trace contains {0} blocks, {1}({2:.2f}%) blocks with no size." + "{3} accesses, {4}({5:.2f}%) accesses with no_insert," + "{6}({7:.2f}%) accesses that want to insert but block size is 0.".format( + len(block_access_timelines), + num_blocks_with_no_size, + percent(num_blocks_with_no_size, len(block_access_timelines)), + access_seq_no, + num_no_inserts, + percent(num_no_inserts, access_seq_no), + num_inserts_block_with_no_size, + percent(num_inserts_block_with_no_size, access_seq_no), + ) + ) + + access_seq_no = 0 time_interval = 1 - trace_miss_ratio_stats = MissRatioStats(kSecondsInMinute) + start_time = time.time() + trace_start_time = 0 + trace_duration = 0 + print("Running simulated {} cache on block traces.".format(cache.cache_name())) with open(trace_file_path, "r") as trace_file: for line in trace_file: - num += 1 - if num % 1000000 == 0: + if ( + max_accesses_to_process != -1 + and access_seq_no > max_accesses_to_process + ): + break + if access_seq_no % 1000000 == 0: # Force a python gc periodically to reduce memory usage. gc.collect() ts = line.split(",") timestamp = int(ts[0]) + cf_name = ts[5] + if not is_target_cf(cf_name, target_cf_name): + continue if trace_start_time == 0: trace_start_time = timestamp trace_duration = timestamp - trace_start_time - if not warmup_complete and trace_duration > warmup_seconds * 1000000: + if ( + not warmup_complete + and warmup_seconds > 0 + and trace_duration > warmup_seconds * 1000000 + ): cache.miss_ratio_stats.reset_counter() warmup_complete = True + next_access_seq_no = 0 + block_id = int(ts[1]) + if is_opt_cache: + next_access_seq_no = block_access_timelines[block_id].get_next_access() record = TraceRecord( access_time=int(ts[0]), block_id=int(ts[1]), @@ -751,13 +1799,23 @@ def run(trace_file_path, cache_type, cache, warmup_seconds): key_id=int(ts[11]), kv_size=int(ts[12]), is_hit=int(ts[13]), + referenced_key_exist_in_block=int(ts[14]), + num_keys_in_block=int(ts[15]), + table_id=int(ts[16]), + seq_number=int(ts[17]), + block_key_size=int(ts[18]), + key_size=int(ts[19]), + block_offset_in_file=int(ts[20]), + next_access_seq_no=next_access_seq_no, ) trace_miss_ratio_stats.update_metrics( - record.access_time, is_hit=record.is_hit + record.access_time, is_hit=record.is_hit, miss_bytes=record.block_size ) cache.access(record) + access_seq_no += 1 del record - if num % 100 != 0: + del ts + if access_seq_no % 100 != 0: continue # Report progress every 10 seconds. now = time.time() @@ -767,9 +1825,9 @@ def run(trace_file_path, cache_type, cache, warmup_seconds): "duration of {} seconds. Throughput: {} records/second. " "Trace miss ratio {}".format( now - start_time, - num, + access_seq_no, trace_duration / 1000000, - num / (now - start_time), + access_seq_no / (now - start_time), trace_miss_ratio_stats.miss_ratio(), ) ) @@ -787,19 +1845,33 @@ def run(trace_file_path, cache_type, cache, warmup_seconds): "Take {} seconds to process {} trace records with trace duration of {} " "seconds. Throughput: {} records/second. Trace miss ratio {}".format( now - start_time, - num, + access_seq_no, trace_duration / 1000000, - num / (now - start_time), + access_seq_no / (now - start_time), trace_miss_ratio_stats.miss_ratio(), ) ) + print( + "{},0,0,{},{},{}".format( + cache_type, + cache.cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) return trace_start_time, trace_duration def report_stats( - cache, cache_type, cache_size, result_dir, trace_start_time, trace_end_time + cache, + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, ): - cache_label = "{}-{}".format(cache_type, cache_size) + cache_label = "{}-{}-{}".format(cache_type, cache_size, target_cf_name) with open("{}/data-ml-mrc-{}".format(result_dir, cache_label), "w+") as mrc_file: mrc_file.write( "{},0,0,{},{},{}\n".format( @@ -809,56 +1881,120 @@ def report_stats( cache.miss_ratio_stats.num_accesses, ) ) - cache.policy_stats.write_policy_timeline( - cache_type, cache_size, result_dir, trace_start_time, trace_end_time - ) - cache.policy_stats.write_policy_ratio_timeline( - cache_type, cache_size, result_dir, trace_start_time, trace_end_time - ) - cache.miss_ratio_stats.write_miss_timeline( - cache_type, cache_size, result_dir, trace_start_time, trace_end_time - ) - cache.miss_ratio_stats.write_miss_ratio_timeline( - cache_type, cache_size, result_dir, trace_start_time, trace_end_time - ) - cache.per_hour_policy_stats.write_policy_timeline( - cache_type, cache_size, result_dir, trace_start_time, trace_end_time - ) - cache.per_hour_policy_stats.write_policy_ratio_timeline( - cache_type, cache_size, result_dir, trace_start_time, trace_end_time - ) - cache.per_hour_miss_ratio_stats.write_miss_timeline( - cache_type, cache_size, result_dir, trace_start_time, trace_end_time - ) - cache.per_hour_miss_ratio_stats.write_miss_ratio_timeline( - cache_type, cache_size, result_dir, trace_start_time, trace_end_time - ) + + cache_stats = [ + cache.per_second_miss_ratio_stats, + cache.miss_ratio_stats, + cache.per_hour_miss_ratio_stats, + ] + for i in range(len(cache_stats)): + avg_miss_bytes, p95_miss_bytes = cache_stats[i].compute_miss_bytes() + + with open( + "{}/data-ml-avgmb-{}-{}".format( + result_dir, cache_stats[i].time_unit, cache_label + ), + "w+", + ) as mb_file: + mb_file.write( + "{},0,0,{},{}\n".format(cache_type, cache_size, avg_miss_bytes) + ) + + with open( + "{}/data-ml-p95mb-{}-{}".format( + result_dir, cache_stats[i].time_unit, cache_label + ), + "w+", + ) as mb_file: + mb_file.write( + "{},0,0,{},{}\n".format(cache_type, cache_size, p95_miss_bytes) + ) + + cache_stats[i].write_miss_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + cache_stats[i].write_miss_ratio_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + + if not cache.is_ml_cache(): + return + + policy_stats = [cache.policy_stats, cache.per_hour_policy_stats] + for i in range(len(policy_stats)): + policy_stats[i].write_policy_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + policy_stats[i].write_policy_ratio_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) if __name__ == "__main__": - if len(sys.argv) <= 6: + if len(sys.argv) <= 8: print( - "Must provide 6 arguments. " - "1) cache_type (ts, ts_hybrid, linucb, linucb_hybrid). " - "2) cache size (xM, xG, xT). " + "Must provide 8 arguments.\n" + "1) Cache type (ts, linucb, arc, lru, opt, pylru, pymru, pylfu, " + "pyhb, gdsize, trace). One may evaluate the hybrid row_block cache " + "by appending '_hybrid' to a cache_type, e.g., ts_hybrid. " + "Note that hybrid is not supported with opt and trace. \n" + "2) Cache size (xM, xG, xT).\n" "3) The sampling frequency used to collect the trace. (The " - "simulation scales down the cache size by the sampling frequency). " - "4) Warmup seconds (The number of seconds used for warmup). " - "5) Trace file path. " - "6) Result directory (A directory that saves generated results)" + "simulation scales down the cache size by the sampling frequency).\n" + "4) Warmup seconds (The number of seconds used for warmup).\n" + "5) Trace file path.\n" + "6) Result directory (A directory that saves generated results)\n" + "7) Max number of accesses to process\n" + "8) The target column family. (The simulation will only run " + "accesses on the target column family. If it is set to all, " + "it will run against all accesses.)" ) exit(1) + print("Arguments: {}".format(sys.argv)) cache_type = sys.argv[1] cache_size = parse_cache_size(sys.argv[2]) downsample_size = int(sys.argv[3]) warmup_seconds = int(sys.argv[4]) trace_file_path = sys.argv[5] result_dir = sys.argv[6] + max_accesses_to_process = int(sys.argv[7]) + target_cf_name = sys.argv[8] cache = create_cache(cache_type, cache_size, downsample_size) trace_start_time, trace_duration = run( - trace_file_path, cache_type, cache, warmup_seconds + trace_file_path, + cache_type, + cache, + warmup_seconds, + max_accesses_to_process, + target_cf_name, ) trace_end_time = trace_start_time + trace_duration report_stats( - cache, cache_type, cache_size, result_dir, trace_start_time, trace_end_time + cache, + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, ) diff --git a/tools/block_cache_analyzer/block_cache_pysim.sh b/tools/block_cache_analyzer/block_cache_pysim.sh index 58193a063..295f734aa 100644 --- a/tools/block_cache_analyzer/block_cache_pysim.sh +++ b/tools/block_cache_analyzer/block_cache_pysim.sh @@ -10,6 +10,10 @@ # warmup_seconds: The number of seconds used for warmup. # max_jobs: The max number of concurrent pysims to run. +# Install required packages to run simulations. +# sudo dnf install -y numpy scipy python-matplotlib ipython python-pandas sympy python-nose atlas-devel +ulimit -c 0 + if [ $# -ne 5 ]; then echo "Usage: ./block_cache_pysim.sh trace_file_path result_dir downsample_size warmup_seconds max_jobs" exit 0 @@ -20,17 +24,26 @@ result_dir="$2" downsample_size="$3" warmup_seconds="$4" max_jobs="$5" -current_jobs=0 +max_num_accesses=100000000 +current_jobs=1 ml_tmp_result_dir="$result_dir/ml" rm -rf "$ml_tmp_result_dir" mkdir -p "$result_dir" mkdir -p "$ml_tmp_result_dir" -for cache_type in "ts" "linucb" "ts_hybrid" "linucb_hybrid" +# Report miss ratio in the trace. +current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep) +for cf_name in "all" +do +for cache_size in "1G" "2G" "4G" "8G" "16G" #"12G" "16G" "1T" do -for cache_size in "16M" "256M" "1G" "2G" "4G" "8G" "12G" "16G" +for cache_type in "opt" "lru" "pylru" "pycctbbt" "pyhb" "ts" "trace" "lru_hybrid" #"pycctblevelbt" #"lru_hybridn" "opt" #"pylru" "pylru_hybrid" "pycctbbt" "pycccfbt" "trace" do + if [[ $cache_type == "trace" && $cache_size != "16G" ]]; then + # We only need to collect miss ratios observed in the trace once. + continue + fi while [ "$current_jobs" -ge "$max_jobs" ] do sleep 10 @@ -38,12 +51,13 @@ do current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep) echo "Waiting jobs to complete. Number of running jobs: $current_jobs" done - output="log-ml-$cache_type-$cache_size" - echo "Running simulation for $cache_type and cache size $cache_size. Number of running jobs: $current_jobs. " - nohup python block_cache_pysim.py "$cache_type" "$cache_size" "$downsample_size" "$warmup_seconds" "$trace_file" "$ml_tmp_result_dir" >& $ml_tmp_result_dir/$output & + output="log-ml-$cache_type-$cache_size-$cf_name" + echo "Running simulation for $cache_type, cache size $cache_size, and cf_name $cf_name. Number of running jobs: $current_jobs. " + nohup python block_cache_pysim.py "$cache_type" "$cache_size" "$downsample_size" "$warmup_seconds" "$trace_file" "$ml_tmp_result_dir" "$max_num_accesses" "$cf_name" >& "$ml_tmp_result_dir/$output" & current_jobs=$((current_jobs+1)) done done +done # Wait for all jobs to complete. while [ $current_jobs -gt 0 ] @@ -57,14 +71,14 @@ done echo "Combine individual pysim output files" rm -rf "$result_dir/ml_*" -mrc_file="$result_dir/ml_mrc" for header in "header-" "data-" do -for fn in $ml_tmp_result_dir/* +for fn in "$ml_tmp_result_dir"/* do sum_file="" time_unit="" capacity="" + target_cf_name="" if [[ $fn == *"timeline"* ]]; then tmpfn="$fn" IFS='-' read -ra elements <<< "$tmpfn" @@ -79,24 +93,43 @@ do done time_unit_index=$((time_unit_index+1)) capacity_index=$((time_unit_index+2)) + target_cf_name_index=$((time_unit_index+3)) time_unit="${elements[$time_unit_index]}_" capacity="${elements[$capacity_index]}_" + target_cf_name="${elements[$target_cf_name_index]}_" fi - if [[ $fn == "${header}ml-policy-timeline"* ]]; then - sum_file="$result_dir/ml_${capacity}${time_unit}policy_timeline" + if [[ $fn == *"${header}ml-policy-timeline"* ]]; then + sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}policy_timeline" + fi + if [[ $fn == *"${header}ml-policy-ratio-timeline"* ]]; then + sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}policy_ratio_timeline" fi - if [[ $fn == "${header}ml-policy-ratio-timeline"* ]]; then - sum_file="$result_dir/ml_${capacity}${time_unit}policy_ratio_timeline" + if [[ $fn == *"${header}ml-miss-timeline"* ]]; then + sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}miss_timeline" fi - if [[ $fn == "${header}ml-miss-timeline"* ]]; then - sum_file="$result_dir/ml_${capacity}${time_unit}miss_timeline" + if [[ $fn == *"${header}ml-miss-ratio-timeline"* ]]; then + sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}miss_ratio_timeline" + fi + if [[ $fn == *"${header}ml-mrc"* ]]; then + tmpfn="$fn" + IFS='-' read -ra elements <<< "$tmpfn" + target_cf_name=${elements[-1]} + sum_file="${result_dir}/ml_${target_cf_name}_mrc" fi - if [[ $fn == "${header}ml-miss-ratio-timeline"* ]]; then - sum_file="$result_dir/ml_${capacity}${time_unit}miss_ratio_timeline" + if [[ $fn == *"${header}ml-avgmb"* ]]; then + tmpfn="$fn" + IFS='-' read -ra elements <<< "$tmpfn" + time_unit=${elements[3]} + target_cf_name=${elements[-1]} + sum_file="${result_dir}/ml_${time_unit}_${target_cf_name}_avgmb" fi - if [[ $fn == "${header}ml-mrc"* ]]; then - sum_file="$mrc_file" + if [[ $fn == *"${header}ml-p95mb"* ]]; then + tmpfn="$fn" + IFS='-' read -ra elements <<< "$tmpfn" + time_unit=${elements[3]} + target_cf_name=${elements[-1]} + sum_file="${result_dir}/ml_${time_unit}_${target_cf_name}_p95mb" fi if [[ $sum_file == "" ]]; then continue @@ -106,13 +139,18 @@ do continue fi fi - cat "$ml_tmp_result_dir/$fn" >> "$sum_file" + cat "$fn" >> "$sum_file" done done echo "Done" -# Sort MRC file by cache_type and cache_size. -tmp_file="$result_dir/tmp_mrc" -cat "$mrc_file" | sort -t ',' -k1,1 -k4,4n > "$tmp_file" -cat "$tmp_file" > "$mrc_file" -rm -rf "$tmp_file" +for fn in $result_dir/* +do + if [[ $fn == *"_mrc" || $fn == *"_avgmb" || $fn == *"_p95mb" ]]; then + # Sort MRC file by cache_type and cache_size. + tmp_file="$result_dir/tmp_mrc" + cat "$fn" | sort -t ',' -k1,1 -k4,4n > "$tmp_file" + cat "$tmp_file" > "$fn" + rm -rf "$tmp_file" + fi +done diff --git a/tools/block_cache_analyzer/block_cache_pysim_test.py b/tools/block_cache_analyzer/block_cache_pysim_test.py index e298d7bbd..4b2bdeba6 100644 --- a/tools/block_cache_analyzer/block_cache_pysim_test.py +++ b/tools/block_cache_analyzer/block_cache_pysim_test.py @@ -1,17 +1,30 @@ #!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +import os import random +import sys from block_cache_pysim import ( + ARCCache, + CacheEntry, + GDSizeCache, HashTable, + HyperbolicPolicy, LFUPolicy, LinUCBCache, + LRUCache, LRUPolicy, MRUPolicy, + OPTCache, + OPTCacheEntry, ThompsonSamplingCache, + TraceCache, TraceRecord, + create_cache, + kMicrosInSecond, kSampleSize, + run, ) @@ -33,30 +46,44 @@ def test_hash_table(): records = 100 for i in range(n): key_id = random.randint(0, records) + v = random.randint(0, records) key = "k{}".format(key_id) - value = "v{}".format(key_id) - action = random.randint(0, 2) - # print "{}:{}:{}".format(action, key, value) + value = CacheEntry(v, v, v, v, v, v, v) + action = random.randint(0, 10) assert len(truth_map) == table.elements, "{} {} {}".format( len(truth_map), table.elements, i ) - if action == 0: - table.insert(key, key_id, value) - truth_map[key] = value - elif action == 1: + if action <= 8: if key in truth_map: assert table.lookup(key, key_id) is not None - assert truth_map[key] == table.lookup(key, key_id) + assert truth_map[key].value_size == table.lookup(key, key_id).value_size else: assert table.lookup(key, key_id) is None + table.insert(key, key_id, value) + truth_map[key] = value else: - table.delete(key, key_id) + deleted = table.delete(key, key_id) + if deleted: + assert key in truth_map if key in truth_map: del truth_map[key] + + # Check all keys are unique in the sample set. + for _i in range(10): + samples = table.random_sample(kSampleSize) + unique_keys = {} + for sample in samples: + unique_keys[sample.key] = True + assert len(samples) == len(unique_keys) + + assert len(table) == len(truth_map) + for key in truth_map: + assert table.lookup(key, int(key[1:])) is not None + assert truth_map[key].value_size == table.lookup(key, int(key[1:])).value_size print("Test hash table: Success") -def assert_metrics(cache, expected_value): +def assert_metrics(cache, expected_value, expected_value_size=1, custom_hashtable=True): assert cache.used_size == expected_value[0], "Expected {}, Actual {}".format( expected_value[0], cache.used_size ) @@ -70,24 +97,35 @@ def assert_metrics(cache, expected_value): ), "Expected {}, Actual {}".format( expected_value[2], cache.miss_ratio_stats.num_misses ) - assert cache.table.elements == len(expected_value[3]) + len( + assert len(cache.table) == len(expected_value[3]) + len( expected_value[4] ), "Expected {}, Actual {}".format( len(expected_value[3]) + len(expected_value[4]), cache.table.elements ) for expeceted_k in expected_value[3]: - val = cache.table.lookup("b{}".format(expeceted_k), expeceted_k) - assert val is not None - assert val.value_size == 1 + if custom_hashtable: + val = cache.table.lookup("b{}".format(expeceted_k), expeceted_k) + else: + val = cache.table["b{}".format(expeceted_k)] + assert val is not None, "Expected {} Actual: Not Exist {}, Table: {}".format( + expeceted_k, expected_value, cache.table + ) + assert val.value_size == expected_value_size for expeceted_k in expected_value[4]: - val = cache.table.lookup("g{}".format(expeceted_k), expeceted_k) + if custom_hashtable: + val = cache.table.lookup("g0-{}".format(expeceted_k), expeceted_k) + else: + val = cache.table["g0-{}".format(expeceted_k)] assert val is not None - assert val.value_size == 1 + assert val.value_size == expected_value_size # Access k1, k1, k2, k3, k3, k3, k4 -def test_cache(policies, expected_value): - cache = ThompsonSamplingCache(3, False, policies) +# When k4 is inserted, +# LRU should evict k1. +# LFU should evict k2. +# MRU should evict k3. +def test_cache(cache, expected_value, custom_hashtable=True): k1 = TraceRecord( access_time=0, block_id=1, @@ -103,6 +141,14 @@ def test_cache(policies, expected_value): key_id=1, kv_size=5, is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, ) k2 = TraceRecord( access_time=1, @@ -119,6 +165,14 @@ def test_cache(policies, expected_value): key_id=1, kv_size=5, is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, ) k3 = TraceRecord( access_time=2, @@ -135,6 +189,14 @@ def test_cache(policies, expected_value): key_id=1, kv_size=5, is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, ) k4 = TraceRecord( access_time=3, @@ -151,6 +213,14 @@ def test_cache(policies, expected_value): key_id=1, kv_size=5, is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, ) sequence = [k1, k1, k2, k3, k3, k3] index = 0 @@ -167,20 +237,29 @@ def test_cache(policies, expected_value): expected_values.append([3, 5, 3, [1, 2, 3], []]) # Access k3, hit. expected_values.append([3, 6, 3, [1, 2, 3], []]) + access_time = 0 for access in sequence: + access.access_time = access_time cache.access(access) - assert_metrics(cache, expected_values[index]) + assert_metrics( + cache, + expected_values[index], + expected_value_size=1, + custom_hashtable=custom_hashtable, + ) + access_time += 1 index += 1 + k4.access_time = access_time cache.access(k4) - assert_metrics(cache, expected_value) + assert_metrics( + cache, expected_value, expected_value_size=1, custom_hashtable=custom_hashtable + ) -def test_lru_cache(): +def test_lru_cache(cache, custom_hashtable): print("Test LRU cache") - policies = [] - policies.append(LRUPolicy()) # Access k4, miss. evict k1 - test_cache(policies, [3, 7, 4, [2, 3, 4], []]) + test_cache(cache, [3, 7, 4, [2, 3, 4], []], custom_hashtable) print("Test LRU cache: Success") @@ -189,7 +268,10 @@ def test_mru_cache(): policies = [] policies.append(MRUPolicy()) # Access k4, miss. evict k3 - test_cache(policies, [3, 7, 4, [1, 2, 4], []]) + test_cache( + ThompsonSamplingCache(3, False, policies, cost_class_label=None), + [3, 7, 4, [1, 2, 4], []], + ) print("Test MRU cache: Success") @@ -198,22 +280,36 @@ def test_lfu_cache(): policies = [] policies.append(LFUPolicy()) # Access k4, miss. evict k2 - test_cache(policies, [3, 7, 4, [1, 3, 4], []]) + test_cache( + ThompsonSamplingCache(3, False, policies, cost_class_label=None), + [3, 7, 4, [1, 3, 4], []], + ) print("Test LFU cache: Success") def test_mix(cache): print("Test Mix {} cache".format(cache.cache_name())) n = 100000 - records = 199 + records = 100 + block_size_table = {} + trace_num_misses = 0 for i in range(n): key_id = random.randint(0, records) vs = random.randint(0, 10) + now = i * kMicrosInSecond + block_size = vs + if key_id in block_size_table: + block_size = block_size_table[key_id] + else: + block_size_table[key_id] = block_size + is_hit = key_id % 2 + if is_hit == 0: + trace_num_misses += 1 k = TraceRecord( - access_time=i, + access_time=now, block_id=key_id, block_type=1, - block_size=vs, + block_size=block_size, cf_id=0, cf_name="", level=0, @@ -223,13 +319,117 @@ def test_mix(cache): get_id=key_id, key_id=key_id, kv_size=5, - is_hit=1, + is_hit=is_hit, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=vs, ) cache.access(k) assert cache.miss_ratio_stats.miss_ratio() > 0 + if cache.cache_name() == "Trace": + assert cache.miss_ratio_stats.num_accesses == n + assert cache.miss_ratio_stats.num_misses == trace_num_misses + else: + assert cache.used_size <= cache.cache_size + all_values = cache.table.values() + cached_size = 0 + for value in all_values: + cached_size += value.value_size + assert cached_size == cache.used_size, "Expeced {} Actual {}".format( + cache.used_size, cached_size + ) print("Test Mix {} cache: Success".format(cache.cache_name())) +def test_end_to_end(): + print("Test All caches") + n = 100000 + nblocks = 1000 + block_size = 16 * 1024 + ncfs = 7 + nlevels = 6 + nfds = 100000 + trace_file_path = "test_trace" + # All blocks are of the same size so that OPT must achieve the lowest miss + # ratio. + with open(trace_file_path, "w+") as trace_file: + access_records = "" + for i in range(n): + key_id = random.randint(0, nblocks) + cf_id = random.randint(0, ncfs) + level = random.randint(0, nlevels) + fd = random.randint(0, nfds) + now = i * kMicrosInSecond + access_record = "" + access_record += "{},".format(now) + access_record += "{},".format(key_id) + access_record += "{},".format(9) # block type + access_record += "{},".format(block_size) # block size + access_record += "{},".format(cf_id) + access_record += "cf_{},".format(cf_id) + access_record += "{},".format(level) + access_record += "{},".format(fd) + access_record += "{},".format(key_id % 3) # caller + access_record += "{},".format(0) # no insert + access_record += "{},".format(i) # get_id + access_record += "{},".format(i) # key_id + access_record += "{},".format(100) # kv_size + access_record += "{},".format(1) # is_hit + access_record += "{},".format(1) # referenced_key_exist_in_block + access_record += "{},".format(10) # num_keys_in_block + access_record += "{},".format(1) # table_id + access_record += "{},".format(0) # seq_number + access_record += "{},".format(10) # block key size + access_record += "{},".format(20) # key size + access_record += "{},".format(0) # block offset + access_record = access_record[:-1] + access_records += access_record + "\n" + trace_file.write(access_records) + + print("Test All caches: Start testing caches") + cache_size = block_size * nblocks / 10 + downsample_size = 1 + cache_ms = {} + for cache_type in [ + "ts", + "opt", + "lru", + "pylru", + "linucb", + "gdsize", + "pyccbt", + "pycctbbt", + ]: + cache = create_cache(cache_type, cache_size, downsample_size) + run(trace_file_path, cache_type, cache, 0, -1, "all") + cache_ms[cache_type] = cache + assert cache.miss_ratio_stats.num_accesses == n + + for cache_type in cache_ms: + cache = cache_ms[cache_type] + ms = cache.miss_ratio_stats.miss_ratio() + assert ms <= 100.0 and ms >= 0.0 + # OPT should perform the best. + assert cache_ms["opt"].miss_ratio_stats.miss_ratio() <= ms + assert cache.used_size <= cache.cache_size + all_values = cache.table.values() + cached_size = 0 + for value in all_values: + cached_size += value.value_size + assert cached_size == cache.used_size, "Expeced {} Actual {}".format( + cache.used_size, cached_size + ) + print("Test All {}: Success".format(cache.cache_name())) + + os.remove(trace_file_path) + print("Test All: Success") + + def test_hybrid(cache): print("Test {} cache".format(cache.cache_name())) k = TraceRecord( @@ -247,6 +447,14 @@ def test_hybrid(cache): key_id=1, kv_size=0, # no size. is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, ) cache.access(k) # Expect a miss. # used size, num accesses, num misses, hash table size, blocks, get keys. @@ -319,22 +527,208 @@ def test_hybrid(cache): k.key_id = 4 # Same row key and should not be inserted again. k.kv_size = 1 cache.access(k) - assert_metrics(cache, [16, 103, 99, [i for i in range(101 - kSampleSize, 101)], []]) + assert_metrics( + cache, [kSampleSize, 103, 99, [i for i in range(101 - kSampleSize, 101)], []] + ) print("Test {} cache: Success".format(cache.cache_name())) +def test_opt_cache(): + print("Test OPT cache") + cache = OPTCache(3) + # seq: 0, 1, 2, 3, 4, 5, 6, 7, 8 + # key: k1, k2, k3, k4, k5, k6, k7, k1, k8 + # next_access: 7, 19, 18, M, M, 17, 16, 25, M + k = TraceRecord( + access_time=0, + block_id=1, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, # the first get request. + key_id=1, + kv_size=0, # no size. + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=7, + ) + cache.access(k) + assert_metrics( + cache, [1, 1, 1, [1], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 2 + k.next_access_seq_no = 19 + cache.access(k) + assert_metrics( + cache, [2, 2, 2, [1, 2], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 3 + k.next_access_seq_no = 18 + cache.access(k) + assert_metrics( + cache, [3, 3, 3, [1, 2, 3], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 4 + k.next_access_seq_no = sys.maxsize # Never accessed again. + cache.access(k) + # Evict 2 since its next access 19 is the furthest in the future. + assert_metrics( + cache, [3, 4, 4, [1, 3, 4], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 5 + k.next_access_seq_no = sys.maxsize # Never accessed again. + cache.access(k) + # Evict 4 since its next access MAXINT is the furthest in the future. + assert_metrics( + cache, [3, 5, 5, [1, 3, 5], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 6 + k.next_access_seq_no = 17 + cache.access(k) + # Evict 5 since its next access MAXINT is the furthest in the future. + assert_metrics( + cache, [3, 6, 6, [1, 3, 6], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 7 + k.next_access_seq_no = 16 + cache.access(k) + # Evict 3 since its next access 18 is the furthest in the future. + assert_metrics( + cache, [3, 7, 7, [1, 6, 7], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 1 + k.next_access_seq_no = 25 + cache.access(k) + assert_metrics( + cache, [3, 8, 7, [1, 6, 7], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 8 + k.next_access_seq_no = sys.maxsize + cache.access(k) + # Evict 1 since its next access 25 is the furthest in the future. + assert_metrics( + cache, [3, 9, 8, [6, 7, 8], []], expected_value_size=1, custom_hashtable=False + ) + + # Insert a large kv pair to evict all keys. + k.access_time += 1 + k.block_id = 10 + k.block_size = 3 + k.next_access_seq_no = sys.maxsize + cache.access(k) + assert_metrics( + cache, [3, 10, 9, [10], []], expected_value_size=3, custom_hashtable=False + ) + print("Test OPT cache: Success") + + +def test_trace_cache(): + print("Test trace cache") + cache = TraceCache(0) + k = TraceRecord( + access_time=0, + block_id=1, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=0, + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=7, + ) + cache.access(k) + assert cache.miss_ratio_stats.num_accesses == 1 + assert cache.miss_ratio_stats.num_misses == 0 + k.is_hit = 0 + cache.access(k) + assert cache.miss_ratio_stats.num_accesses == 2 + assert cache.miss_ratio_stats.num_misses == 1 + print("Test trace cache: Success") + + if __name__ == "__main__": - policies = [] - policies.append(MRUPolicy()) - policies.append(LRUPolicy()) - policies.append(LFUPolicy()) test_hash_table() - test_lru_cache() + test_trace_cache() + test_opt_cache() + test_lru_cache( + ThompsonSamplingCache( + 3, enable_cache_row_key=0, policies=[LRUPolicy()], cost_class_label=None + ), + custom_hashtable=True, + ) + test_lru_cache(LRUCache(3, enable_cache_row_key=0), custom_hashtable=False) test_mru_cache() test_lfu_cache() - test_mix(ThompsonSamplingCache(100, False, policies)) - test_mix(ThompsonSamplingCache(100, True, policies)) - test_mix(LinUCBCache(100, False, policies)) - test_mix(LinUCBCache(100, True, policies)) - test_hybrid(ThompsonSamplingCache(kSampleSize, True, [LRUPolicy()])) - test_hybrid(LinUCBCache(kSampleSize, True, [LRUPolicy()])) + test_hybrid( + ThompsonSamplingCache( + kSampleSize, + enable_cache_row_key=1, + policies=[LRUPolicy()], + cost_class_label=None, + ) + ) + test_hybrid( + LinUCBCache( + kSampleSize, + enable_cache_row_key=1, + policies=[LRUPolicy()], + cost_class_label=None, + ) + ) + for cache_type in [ + "ts", + "opt", + "arc", + "pylfu", + "pymru", + "trace", + "pyhb", + "lru", + "pylru", + "linucb", + "gdsize", + "pycctbbt", + "pycctb", + "pyccbt", + ]: + for enable_row_cache in [0, 1, 2]: + cache_type_str = cache_type + if cache_type != "opt" and cache_type != "trace": + if enable_row_cache == 1: + cache_type_str += "_hybrid" + elif enable_row_cache == 2: + cache_type_str += "_hybridn" + test_mix(create_cache(cache_type_str, cache_size=100, downsample_size=1)) + test_end_to_end() diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc index 032ed2be2..e1021b466 100644 --- a/tools/block_cache_analyzer/block_cache_trace_analyzer.cc +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc @@ -127,6 +127,9 @@ DEFINE_string(analyze_get_spatial_locality_labels, "", "Group data blocks using these labels."); DEFINE_string(analyze_get_spatial_locality_buckets, "", "Group data blocks by their statistics using these buckets."); +DEFINE_string(skew_labels, "", + "Group the access count of a block using these labels."); +DEFINE_string(skew_buckets, "", "Group the skew labels using these buckets."); DEFINE_bool(mrc_only, false, "Evaluate alternative cache policies only. When this flag is true, " "the analyzer does NOT maintain states of each block in memory for " @@ -147,6 +150,7 @@ namespace { const std::string kMissRatioCurveFileName = "mrc"; const std::string kGroupbyBlock = "block"; +const std::string kGroupbyTable = "table"; const std::string kGroupbyColumnFamily = "cf"; const std::string kGroupbySSTFile = "sst"; const std::string kGroupbyBlockType = "bt"; @@ -164,6 +168,7 @@ const std::string kSupportedCacheNames = // The suffix for the generated csv files. const std::string kFileNameSuffixMissRatioTimeline = "miss_ratio_timeline"; const std::string kFileNameSuffixMissTimeline = "miss_timeline"; +const std::string kFileNameSuffixSkew = "skewness"; const std::string kFileNameSuffixAccessTimeline = "access_timeline"; const std::string kFileNameSuffixCorrelation = "correlation_input"; const std::string kFileNameSuffixAvgReuseIntervalNaccesses = @@ -540,6 +545,62 @@ void BlockCacheTraceAnalyzer::WriteMissTimeline(uint64_t time_unit) const { } } +void BlockCacheTraceAnalyzer::WriteSkewness( + const std::string& label_str, const std::vector& percent_buckets, + TraceType target_block_type) const { + std::set labels = ParseLabelStr(label_str); + std::map label_naccesses; + uint64_t total_naccesses = 0; + auto block_callback = [&](const std::string& cf_name, uint64_t fd, + uint32_t level, TraceType type, + const std::string& /*block_key*/, uint64_t block_id, + const BlockAccessInfo& block) { + if (target_block_type != TraceType::kTraceMax && + target_block_type != type) { + return; + } + const std::string label = BuildLabel( + labels, cf_name, fd, level, type, + TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block); + label_naccesses[label] += block.num_accesses; + total_naccesses += block.num_accesses; + }; + TraverseBlocks(block_callback, &labels); + std::map> label_bucket_naccesses; + std::vector> pairs; + for (auto const& itr : label_naccesses) { + pairs.push_back(itr); + } + // Sort in descending order. + sort( + pairs.begin(), pairs.end(), + [=](std::pair& a, + std::pair& b) { return b.second < a.second; }); + + size_t prev_start_index = 0; + for (auto const& percent : percent_buckets) { + label_bucket_naccesses[label_str][percent] = 0; + size_t end_index = 0; + if (percent == port::kMaxUint64) { + end_index = label_naccesses.size(); + } else { + end_index = percent * label_naccesses.size() / 100; + } + for (size_t i = prev_start_index; i < end_index; i++) { + label_bucket_naccesses[label_str][percent] += pairs[i].second; + } + prev_start_index = end_index; + } + std::string filename_suffix; + if (target_block_type != TraceType::kTraceMax) { + filename_suffix = block_type_to_string(target_block_type); + filename_suffix += "_"; + } + filename_suffix += kFileNameSuffixSkew; + WriteStatsToFile(label_str, percent_buckets, filename_suffix, + label_bucket_naccesses, total_naccesses); +} + void BlockCacheTraceAnalyzer::WriteCorrelationFeatures( const std::string& label_str, uint32_t max_number_of_values) const { std::set labels = ParseLabelStr(label_str); @@ -549,12 +610,16 @@ void BlockCacheTraceAnalyzer::WriteCorrelationFeatures( [&](const std::string& cf_name, uint64_t fd, uint32_t level, TraceType block_type, const std::string& /*block_key*/, uint64_t /*block_key_id*/, const BlockAccessInfo& block) { + if (block.table_id == 0 && labels.find(kGroupbyTable) != labels.end()) { + // We only know table id information for get requests. + return; + } if (labels.find(kGroupbyCaller) != labels.end()) { // Group by caller. for (auto const& caller_map : block.caller_access_timeline) { const std::string label = BuildLabel(labels, cf_name, fd, level, block_type, - caller_map.first, /*block_id=*/0); + caller_map.first, /*block_id=*/0, block); auto it = block.caller_access_sequence__number_timeline.find( caller_map.first); assert(it != block.caller_access_sequence__number_timeline.end()); @@ -563,14 +628,15 @@ void BlockCacheTraceAnalyzer::WriteCorrelationFeatures( } return; } - const std::string label = BuildLabel( - labels, cf_name, fd, level, block_type, - TableReaderCaller::kMaxBlockCacheLookupCaller, /*block_id=*/0); + const std::string label = + BuildLabel(labels, cf_name, fd, level, block_type, + TableReaderCaller::kMaxBlockCacheLookupCaller, + /*block_id=*/0, block); UpdateFeatureVectors(block.access_sequence_number_timeline, block.access_timeline, label, &label_features, &label_predictions); }; - TraverseBlocks(block_callback); + TraverseBlocks(block_callback, &labels); WriteCorrelationFeaturesToFile(label_str, label_features, label_predictions, max_number_of_values); } @@ -656,7 +722,7 @@ std::set BlockCacheTraceAnalyzer::ParseLabelStr( std::string BlockCacheTraceAnalyzer::BuildLabel( const std::set& labels, const std::string& cf_name, uint64_t fd, uint32_t level, TraceType type, TableReaderCaller caller, - uint64_t block_key) const { + uint64_t block_key, const BlockAccessInfo& block) const { std::map label_value_map; label_value_map[kGroupbyAll] = kGroupbyAll; label_value_map[kGroupbyLevel] = std::to_string(level); @@ -665,6 +731,7 @@ std::string BlockCacheTraceAnalyzer::BuildLabel( label_value_map[kGroupbyBlockType] = block_type_to_string(type); label_value_map[kGroupbyColumnFamily] = cf_name; label_value_map[kGroupbyBlock] = std::to_string(block_key); + label_value_map[kGroupbyTable] = std::to_string(block.table_id); // Concatenate the label values. std::string label; for (auto const& l : labels) { @@ -683,7 +750,8 @@ void BlockCacheTraceAnalyzer::TraverseBlocks( const std::string& /*block_key*/, uint64_t /*block_key_id*/, const BlockAccessInfo& /*block_access_info*/)> - block_callback) const { + block_callback, + std::set* labels) const { for (auto const& cf_aggregates : cf_aggregates_map_) { // Stats per column family. const std::string& cf_name = cf_aggregates.first; @@ -698,6 +766,11 @@ void BlockCacheTraceAnalyzer::TraverseBlocks( for (auto const& block_access_info : block_type_aggregates.second.block_access_info_map) { // Stats per block. + if (labels && block_access_info.second.table_id == 0 && + labels->find(kGroupbyTable) != labels->end()) { + // We only know table id information for get requests. + return; + } block_callback(cf_name, fd, level, type, block_access_info.first, block_access_info.second.block_id, block_access_info.second); @@ -733,7 +806,7 @@ void BlockCacheTraceAnalyzer::WriteGetSpatialLocality( } const std::string label = BuildLabel(labels, cf_name, fd, level, TraceType::kBlockTraceDataBlock, - TableReaderCaller::kUserGet, /*block_id=*/0); + TableReaderCaller::kUserGet, /*block_id=*/0, block); const uint64_t percent_referenced_for_existing_keys = static_cast(std::max( @@ -761,7 +834,7 @@ void BlockCacheTraceAnalyzer::WriteGetSpatialLocality( ->second += 1; nblocks += 1; }; - TraverseBlocks(block_callback); + TraverseBlocks(block_callback, &labels); WriteStatsToFile(label_str, percent_buckets, kFileNameSuffixPercentRefKeys, label_pnrefkeys_nblocks, nblocks); WriteStatsToFile(label_str, percent_buckets, @@ -792,7 +865,7 @@ void BlockCacheTraceAnalyzer::WriteAccessTimeline(const std::string& label_str, continue; } const std::string label = - BuildLabel(labels, cf_name, fd, level, type, caller, block_id); + BuildLabel(labels, cf_name, fd, level, type, caller, block_id, block); for (auto const& naccess : timeline.second) { const uint64_t timestamp = naccess.first / time_unit; const uint64_t num = naccess.second; @@ -806,7 +879,7 @@ void BlockCacheTraceAnalyzer::WriteAccessTimeline(const std::string& label_str, access_count_block_id_map[naccesses].push_back(std::to_string(block_id)); } }; - TraverseBlocks(block_callback); + TraverseBlocks(block_callback, &labels); // We have label_access_timeline now. Write them into a file. const std::string user_access_prefix = @@ -877,9 +950,9 @@ void BlockCacheTraceAnalyzer::WriteReuseDistance( uint32_t level, TraceType type, const std::string& /*block_key*/, uint64_t block_id, const BlockAccessInfo& block) { - const std::string label = - BuildLabel(labels, cf_name, fd, level, type, - TableReaderCaller::kMaxBlockCacheLookupCaller, block_id); + const std::string label = BuildLabel( + labels, cf_name, fd, level, type, + TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block); if (label_distance_num_reuses.find(label) == label_distance_num_reuses.end()) { // The first time we encounter this label. @@ -894,7 +967,7 @@ void BlockCacheTraceAnalyzer::WriteReuseDistance( total_num_reuses += reuse_distance.second; } }; - TraverseBlocks(block_callback); + TraverseBlocks(block_callback, &labels); // We have label_naccesses and label_distance_num_reuses now. Write them into // a file. const std::string output_path = @@ -1016,17 +1089,17 @@ void BlockCacheTraceAnalyzer::WriteReuseInterval( if (labels.find(kGroupbyCaller) != labels.end()) { for (auto const& timeline : block.caller_num_accesses_timeline) { const TableReaderCaller caller = timeline.first; - const std::string label = - BuildLabel(labels, cf_name, fd, level, type, caller, block_id); + const std::string label = BuildLabel(labels, cf_name, fd, level, type, + caller, block_id, block); UpdateReuseIntervalStats(label, time_buckets, timeline.second, &label_time_num_reuses, &total_num_reuses); } return; } // Does not group by caller so we need to flatten the access timeline. - const std::string label = - BuildLabel(labels, cf_name, fd, level, type, - TableReaderCaller::kMaxBlockCacheLookupCaller, block_id); + const std::string label = BuildLabel( + labels, cf_name, fd, level, type, + TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block); std::map timeline; for (auto const& caller_timeline : block.caller_num_accesses_timeline) { for (auto const& time_naccess : caller_timeline.second) { @@ -1045,7 +1118,7 @@ void BlockCacheTraceAnalyzer::WriteReuseInterval( label_avg_reuse_naccesses[label].upper_bound(avg_reuse_interval)->second += block.num_accesses; }; - TraverseBlocks(block_callback); + TraverseBlocks(block_callback, &labels); // Write the stats into files. WriteStatsToFile(label_str, time_buckets, kFileNameSuffixReuseInterval, @@ -1074,9 +1147,9 @@ void BlockCacheTraceAnalyzer::WriteReuseLifetime( } else { lifetime = port::kMaxUint64 - 1; } - const std::string label = - BuildLabel(labels, cf_name, fd, level, type, - TableReaderCaller::kMaxBlockCacheLookupCaller, block_id); + const std::string label = BuildLabel( + labels, cf_name, fd, level, type, + TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block); if (label_lifetime_nblocks.find(label) == label_lifetime_nblocks.end()) { // The first time we encounter this label. @@ -1087,7 +1160,7 @@ void BlockCacheTraceAnalyzer::WriteReuseLifetime( label_lifetime_nblocks[label].upper_bound(lifetime)->second += 1; total_nblocks += 1; }; - TraverseBlocks(block_callback); + TraverseBlocks(block_callback, &labels); WriteStatsToFile(label_str, time_buckets, kFileNameSuffixReuseLifetime, label_lifetime_nblocks, total_nblocks); } @@ -1396,11 +1469,17 @@ Status BlockCacheTraceAnalyzer::WriteHumanReadableTraceRecord( int ret = snprintf( trace_record_buffer_, sizeof(trace_record_buffer_), "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32 - ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u\n", + ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64 + ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n", access.access_timestamp, block_id, access.block_type, access.block_size, access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number, access.caller, access.no_insert, access.get_id, get_key_id, - access.referenced_data_size, access.is_cache_hit); + access.referenced_data_size, access.is_cache_hit, + access.referenced_key_exist_in_block, access.num_keys_in_block, + BlockCacheTraceHelper::GetTableId(access), + BlockCacheTraceHelper::GetSequenceNumber(access), access.block_key.size(), + access.referenced_key.size(), + BlockCacheTraceHelper::GetBlockOffsetInFile(access)); if (ret < 0) { return Status::IOError("failed to format the output"); } @@ -1432,13 +1511,13 @@ Status BlockCacheTraceAnalyzer::RecordAccess( uint64_t get_key_id = 0; if (access.caller == TableReaderCaller::kUserGet && access.get_id != BlockCacheTraceHelper::kReservedGetId) { - std::string row_key = BlockCacheTraceHelper::ComputeRowKey(access); - if (get_key_info_map_.find(row_key) == get_key_info_map_.end()) { - get_key_info_map_[row_key].key_id = unique_get_key_id_; - get_key_id = unique_get_key_id_; + std::string user_key = ExtractUserKey(access.referenced_key).ToString(); + if (get_key_info_map_.find(user_key) == get_key_info_map_.end()) { + get_key_info_map_[user_key].key_id = unique_get_key_id_; unique_get_key_id_++; } - get_key_info_map_[row_key].AddAccess(access, access_sequence_number_); + get_key_id = get_key_info_map_[user_key].key_id; + get_key_info_map_[user_key].AddAccess(access, access_sequence_number_); } if (compute_reuse_distance_) { @@ -2224,6 +2303,25 @@ int block_cache_trace_analyzer_tool(int argc, char** argv) { analyzer.WriteCorrelationFeaturesForGet( FLAGS_analyze_correlation_coefficients_max_number_of_values); } + + if (!FLAGS_skew_labels.empty() && !FLAGS_skew_buckets.empty()) { + std::vector buckets = parse_buckets(FLAGS_skew_buckets); + std::stringstream ss(FLAGS_skew_labels); + while (ss.good()) { + std::string label; + getline(ss, label, ','); + if (label.find("block") != std::string::npos) { + analyzer.WriteSkewness(label, buckets, + TraceType::kBlockTraceIndexBlock); + analyzer.WriteSkewness(label, buckets, + TraceType::kBlockTraceFilterBlock); + analyzer.WriteSkewness(label, buckets, TraceType::kBlockTraceDataBlock); + analyzer.WriteSkewness(label, buckets, TraceType::kTraceMax); + } else { + analyzer.WriteSkewness(label, buckets, TraceType::kTraceMax); + } + } + } return 0; } diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer.h b/tools/block_cache_analyzer/block_cache_trace_analyzer.h index bc41ff468..f22a9da68 100644 --- a/tools/block_cache_analyzer/block_cache_trace_analyzer.h +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer.h @@ -33,6 +33,8 @@ struct GetKeyInfo { // Statistics of a block. struct BlockAccessInfo { uint64_t block_id = 0; + uint64_t table_id = 0; + uint64_t block_offset = 0; uint64_t num_accesses = 0; uint64_t block_size = 0; uint64_t first_access_time = 0; @@ -73,6 +75,8 @@ struct BlockAccessInfo { if (first_access_time == 0) { first_access_time = access.access_timestamp; } + table_id = BlockCacheTraceHelper::GetTableId(access); + block_offset = BlockCacheTraceHelper::GetBlockOffsetInFile(access); last_access_time = access.access_timestamp; block_size = access.block_size; caller_num_access_map[access.caller]++; @@ -301,6 +305,10 @@ class BlockCacheTraceAnalyzer { void WriteCorrelationFeaturesForGet(uint32_t max_number_of_values) const; + void WriteSkewness(const std::string& label_str, + const std::vector& percent_buckets, + TraceType target_block_type) const; + const std::map& TEST_cf_aggregates_map() const { return cf_aggregates_map_; @@ -312,7 +320,8 @@ class BlockCacheTraceAnalyzer { std::string BuildLabel(const std::set& labels, const std::string& cf_name, uint64_t fd, uint32_t level, TraceType type, - TableReaderCaller caller, uint64_t block_key) const; + TableReaderCaller caller, uint64_t block_key, + const BlockAccessInfo& block) const; void ComputeReuseDistance(BlockAccessInfo* info) const; @@ -341,7 +350,8 @@ class BlockCacheTraceAnalyzer { const std::string& /*block_key*/, uint64_t /*block_key_id*/, const BlockAccessInfo& /*block_access_info*/)> - block_callback) const; + block_callback, + std::set* labels = nullptr) const; void UpdateFeatureVectors( const std::vector& access_sequence_number_timeline, diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc index 9917d5b9e..eecd6e80d 100644 --- a/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc @@ -181,7 +181,9 @@ class BlockCacheTracerTest : public testing::Test { analyze_get_spatial_locality_labels_, "-analyze_get_spatial_locality_buckets=" + analyze_get_spatial_locality_buckets_, - "-analyze_correlation_coefficients_labels=all"}; + "-analyze_correlation_coefficients_labels=all", + "-skew_labels=all", + "-skew_buckets=10,50,100"}; char arg_buffer[kArgBufferSize]; char* argv[kMaxArgCount]; int argc = 0; @@ -331,6 +333,33 @@ TEST_F(BlockCacheTracerTest, BlockCacheAnalyzer) { } } } + { + // Validate the skewness csv file. + const std::string skewness_file_path = test_path_ + "/all_skewness"; + std::ifstream skew_file(skewness_file_path); + // Read header. + std::string line; + ASSERT_TRUE(getline(skew_file, line)); + std::stringstream ss(line); + double sum_percent = 0; + while (getline(skew_file, line)) { + std::stringstream ss_naccess(line); + std::string substr; + bool read_label = false; + while (ss_naccess.good()) { + ASSERT_TRUE(getline(ss_naccess, substr, ',')); + if (!read_label) { + read_label = true; + continue; + } + sum_percent += ParseDouble(substr); + } + } + ASSERT_EQ(100.0, sum_percent); + ASSERT_FALSE(getline(skew_file, line)); + skew_file.close(); + ASSERT_OK(env_->DeleteFile(skewness_file_path)); + } { // Validate the timeline csv files. const std::vector time_units{"_60", "_3600"}; diff --git a/trace_replay/block_cache_tracer.cc b/trace_replay/block_cache_tracer.cc index 1eeb64ac8..4f39be609 100644 --- a/trace_replay/block_cache_tracer.cc +++ b/trace_replay/block_cache_tracer.cc @@ -61,11 +61,40 @@ std::string BlockCacheTraceHelper::ComputeRowKey( return ""; } Slice key = ExtractUserKey(access.referenced_key); - uint64_t seq_no = access.get_from_user_specified_snapshot == Boolean::kFalse - ? 0 - : 1 + GetInternalKeySeqno(access.referenced_key); - return std::to_string(access.sst_fd_number) + "_" + key.ToString() + "_" + - std::to_string(seq_no); + return std::to_string(access.sst_fd_number) + "_" + key.ToString(); +} + +uint64_t BlockCacheTraceHelper::GetTableId( + const BlockCacheTraceRecord& access) { + if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) { + return 0; + } + return static_cast(DecodeFixed32(access.referenced_key.data())) + 1; +} + +uint64_t BlockCacheTraceHelper::GetSequenceNumber( + const BlockCacheTraceRecord& access) { + if (!IsGetOrMultiGet(access.caller)) { + return 0; + } + return access.get_from_user_specified_snapshot == Boolean::kFalse + ? 0 + : 1 + GetInternalKeySeqno(access.referenced_key); +} + +uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile( + const BlockCacheTraceRecord& access) { + Slice input(access.block_key); + uint64_t offset = 0; + while (true) { + uint64_t tmp = 0; + if (GetVarint64(&input, &tmp)) { + offset = tmp; + } else { + break; + } + } + return offset; } BlockCacheTraceWriter::BlockCacheTraceWriter( diff --git a/trace_replay/block_cache_tracer.h b/trace_replay/block_cache_tracer.h index 3863ca430..b109b1db0 100644 --- a/trace_replay/block_cache_tracer.h +++ b/trace_replay/block_cache_tracer.h @@ -31,6 +31,15 @@ class BlockCacheTraceHelper { // Row key is a concatenation of the access's fd_number and the referenced // user key. static std::string ComputeRowKey(const BlockCacheTraceRecord& access); + // The first four bytes of the referenced key in a Get request is the table + // id. + static uint64_t GetTableId(const BlockCacheTraceRecord& access); + // The sequence number of a get request is the last part of the referenced + // key. + static uint64_t GetSequenceNumber(const BlockCacheTraceRecord& access); + // Block offset in a file is the last varint64 in the block key. + static uint64_t GetBlockOffsetInFile(const BlockCacheTraceRecord& access); + static const std::string kUnknownColumnFamilyName; static const uint64_t kReservedGetId; }; diff --git a/utilities/simulator_cache/cache_simulator_test.cc b/utilities/simulator_cache/cache_simulator_test.cc index babdd431f..3d3432e20 100644 --- a/utilities/simulator_cache/cache_simulator_test.cc +++ b/utilities/simulator_cache/cache_simulator_test.cc @@ -84,7 +84,7 @@ class CacheSimulatorTest : public testing::Test { for (auto const& key : keys) { std::string row_key = kRefKeyPrefix + key + kRefKeySequenceNumber; auto handle = - sim_cache->Lookup("0_" + ExtractUserKey(row_key).ToString() + "_0"); + sim_cache->Lookup("0_" + ExtractUserKey(row_key).ToString()); EXPECT_NE(nullptr, handle); sim_cache->Release(handle); } @@ -229,10 +229,9 @@ TEST_F(CacheSimulatorTest, HybridRowBlockCacheSimulator) { ASSERT_EQ(100, cache_simulator->miss_ratio_stats().miss_ratio()); ASSERT_EQ(10, cache_simulator->miss_ratio_stats().user_accesses()); ASSERT_EQ(100, cache_simulator->miss_ratio_stats().user_miss_ratio()); - auto handle = sim_cache->Lookup( - std::to_string(first_get.sst_fd_number) + "_" + - ExtractUserKey(first_get.referenced_key).ToString() + "_" + - std::to_string(1 + GetInternalKeySeqno(first_get.referenced_key))); + auto handle = + sim_cache->Lookup(std::to_string(first_get.sst_fd_number) + "_" + + ExtractUserKey(first_get.referenced_key).ToString()); ASSERT_NE(nullptr, handle); sim_cache->Release(handle); for (uint32_t i = 100; i < block_id; i++) { @@ -256,10 +255,9 @@ TEST_F(CacheSimulatorTest, HybridRowBlockCacheSimulator) { ASSERT_EQ(15, cache_simulator->miss_ratio_stats().user_accesses()); ASSERT_EQ(66, static_cast( cache_simulator->miss_ratio_stats().user_miss_ratio())); - handle = sim_cache->Lookup( - std::to_string(second_get.sst_fd_number) + "_" + - ExtractUserKey(second_get.referenced_key).ToString() + "_" + - std::to_string(1 + GetInternalKeySeqno(second_get.referenced_key))); + handle = + sim_cache->Lookup(std::to_string(second_get.sst_fd_number) + "_" + + ExtractUserKey(second_get.referenced_key).ToString()); ASSERT_NE(nullptr, handle); sim_cache->Release(handle); for (uint32_t i = 100; i < block_id; i++) { @@ -394,7 +392,7 @@ TEST_F(CacheSimulatorTest, HybridRowBlockCacheSimulatorGetTest) { AssertCache(sim_cache, cache_simulator->miss_ratio_stats(), 7, 8, 4, {"1", "2", "3", "5"}, {"1", "2", "4"}); for (auto const& key : {"1", "2", "4"}) { - auto handle = sim_cache->Lookup("0_" + kRefKeyPrefix + key + "_0"); + auto handle = sim_cache->Lookup("0_" + kRefKeyPrefix + key); ASSERT_NE(nullptr, handle); sim_cache->Release(handle); } @@ -417,7 +415,7 @@ TEST_F(CacheSimulatorTest, HybridRowBlockCacheSimulatorGetTest) { AssertCache(sim_cache, cache_simulator->miss_ratio_stats(), 16, 103, 99, {}, {}); for (auto const& key : {"1", "2", "4"}) { - auto handle = sim_cache->Lookup("0_" + kRefKeyPrefix + key + "_0"); + auto handle = sim_cache->Lookup("0_" + kRefKeyPrefix + key); ASSERT_EQ(nullptr, handle); } } @@ -437,9 +435,9 @@ TEST_F(CacheSimulatorTest, HybridRowBlockNoInsertCacheSimulator) { cache_simulator->Access(first_get); block_id++; } - auto handle = sim_cache->Lookup( - std::to_string(first_get.sst_fd_number) + "_" + - ExtractUserKey(first_get.referenced_key).ToString() + "_0"); + auto handle = + sim_cache->Lookup(std::to_string(first_get.sst_fd_number) + "_" + + ExtractUserKey(first_get.referenced_key).ToString()); ASSERT_NE(nullptr, handle); sim_cache->Release(handle); // All blocks are missing from the cache since insert_blocks_row_kvpair_misses