Summary: We don't have plans to work on this in the short term. If we ever resurrect the project, we can find the code in the history. No need for it to linger around Test Plan: no test Reviewers: yhchiang, rven, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D32349main
parent
d6c7300ccf
commit
e8bf2310a0
@ -1,292 +0,0 @@ |
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#ifndef ROCKSDB_LITE |
||||
#include <cstdio> |
||||
#include <vector> |
||||
#include <atomic> |
||||
|
||||
#include "rocksdb/env.h" |
||||
#include "util/blob_store.h" |
||||
#include "util/testutil.h" |
||||
|
||||
#define KB 1024LL |
||||
#define MB 1024*1024LL |
||||
// BlobStore does costly asserts to make sure it's running correctly, which
|
||||
// significantly impacts benchmark runtime.
|
||||
// NDEBUG will compile out those asserts.
|
||||
#ifndef NDEBUG |
||||
#define NDEBUG |
||||
#endif |
||||
|
||||
using namespace rocksdb; |
||||
using namespace std; |
||||
|
||||
// used by all threads
|
||||
uint64_t timeout_sec; |
||||
Env *env; |
||||
BlobStore* bs; |
||||
|
||||
namespace { |
||||
std::string RandomString(Random* rnd, uint64_t len) { |
||||
std::string r; |
||||
test::RandomString(rnd, static_cast<int>(len), &r); |
||||
return r; |
||||
} |
||||
} // namespace
|
||||
|
||||
struct Result { |
||||
uint32_t writes; |
||||
uint32_t reads; |
||||
uint32_t deletes; |
||||
uint64_t data_written; |
||||
uint64_t data_read; |
||||
|
||||
void print() { |
||||
printf("Total writes = %u\n", writes); |
||||
printf("Total reads = %u\n", reads); |
||||
printf("Total deletes = %u\n", deletes); |
||||
printf("Write throughput = %lf MB/s\n", |
||||
(double)data_written / (1024*1024.0) / timeout_sec); |
||||
printf("Read throughput = %lf MB/s\n", |
||||
(double)data_read / (1024*1024.0) / timeout_sec); |
||||
printf("Total throughput = %lf MB/s\n", |
||||
(double)(data_read + data_written) / (1024*1024.0) / timeout_sec); |
||||
} |
||||
|
||||
Result() { |
||||
writes = reads = deletes = data_read = data_written = 0; |
||||
} |
||||
|
||||
Result(uint32_t _writes, uint32_t _reads, uint32_t _deletes, |
||||
uint64_t _data_written, uint64_t _data_read) |
||||
: writes(_writes), |
||||
reads(_reads), |
||||
deletes(_deletes), |
||||
data_written(_data_written), |
||||
data_read(_data_read) {} |
||||
}; |
||||
|
||||
namespace { |
||||
Result operator + (const Result &a, const Result &b) { |
||||
return Result(a.writes + b.writes, a.reads + b.reads, |
||||
a.deletes + b.deletes, a.data_written + b.data_written, |
||||
a.data_read + b.data_read); |
||||
} |
||||
} // namespace
|
||||
|
||||
struct WorkerThread { |
||||
uint64_t data_size_from, data_size_to; |
||||
double read_ratio; |
||||
uint64_t working_set_size; // start deleting once you reach this
|
||||
Result result; |
||||
atomic<bool> stopped; |
||||
|
||||
WorkerThread(uint64_t _data_size_from, uint64_t _data_size_to, |
||||
double _read_ratio, uint64_t _working_set_size) |
||||
: data_size_from(_data_size_from), |
||||
data_size_to(_data_size_to), |
||||
read_ratio(_read_ratio), |
||||
working_set_size(_working_set_size), |
||||
stopped(false) {} |
||||
|
||||
WorkerThread(const WorkerThread& wt) : |
||||
data_size_from(wt.data_size_from), data_size_to(wt.data_size_to), |
||||
read_ratio(wt.read_ratio), working_set_size(wt.working_set_size), |
||||
stopped(false) {} |
||||
}; |
||||
|
||||
static void WorkerThreadBody(void* arg) { |
||||
WorkerThread* t = reinterpret_cast<WorkerThread*>(arg); |
||||
Random rnd(5); |
||||
string buf; |
||||
vector<pair<Blob, uint64_t>> blobs; |
||||
vector<string> random_strings; |
||||
|
||||
for (int i = 0; i < 10; ++i) { |
||||
random_strings.push_back(RandomString(&rnd, t->data_size_to)); |
||||
} |
||||
|
||||
uint64_t total_size = 0; |
||||
|
||||
uint64_t start_micros = env->NowMicros(); |
||||
while (env->NowMicros() - start_micros < timeout_sec * 1000 * 1000) { |
||||
if (blobs.size() && rand() < RAND_MAX * t->read_ratio) { |
||||
// read
|
||||
int bi = rand() % blobs.size(); |
||||
Status s = bs->Get(blobs[bi].first, &buf); |
||||
assert(s.ok()); |
||||
t->result.data_read += buf.size(); |
||||
t->result.reads++; |
||||
} else { |
||||
// write
|
||||
uint64_t size = rand() % (t->data_size_to - t->data_size_from) + |
||||
t->data_size_from; |
||||
total_size += size; |
||||
string put_str = random_strings[rand() % random_strings.size()]; |
||||
blobs.push_back(make_pair(Blob(), size)); |
||||
Status s = bs->Put(Slice(put_str.data(), size), &blobs.back().first); |
||||
assert(s.ok()); |
||||
t->result.data_written += size; |
||||
t->result.writes++; |
||||
} |
||||
|
||||
while (total_size >= t->working_set_size) { |
||||
// delete random
|
||||
int bi = rand() % blobs.size(); |
||||
total_size -= blobs[bi].second; |
||||
bs->Delete(blobs[bi].first); |
||||
blobs.erase(blobs.begin() + bi); |
||||
t->result.deletes++; |
||||
} |
||||
} |
||||
t->stopped.store(true); |
||||
} |
||||
|
||||
namespace { |
||||
Result StartBenchmark(vector<WorkerThread*>& config) { |
||||
for (auto w : config) { |
||||
env->StartThread(WorkerThreadBody, w); |
||||
} |
||||
|
||||
Result result; |
||||
|
||||
for (auto w : config) { |
||||
while (!w->stopped.load()); |
||||
result = result + w->result; |
||||
} |
||||
|
||||
for (auto w : config) { |
||||
delete w; |
||||
} |
||||
|
||||
delete bs; |
||||
|
||||
return result; |
||||
} |
||||
|
||||
vector<WorkerThread*> SetupBenchmarkBalanced() { |
||||
string test_path; |
||||
env->GetTestDirectory(&test_path); |
||||
test_path.append("/blob_store"); |
||||
|
||||
// config start
|
||||
uint32_t block_size = 16*KB; |
||||
uint32_t file_size = 1*MB; |
||||
double read_write_ratio = 0.5; |
||||
uint64_t data_read_from = 16*KB; |
||||
uint64_t data_read_to = 32*KB; |
||||
int number_of_threads = 10; |
||||
uint64_t working_set_size = 5*MB; |
||||
timeout_sec = 5; |
||||
// config end
|
||||
|
||||
bs = new BlobStore(test_path, block_size, file_size / block_size, 10000, env); |
||||
|
||||
vector <WorkerThread*> config; |
||||
|
||||
for (int i = 0; i < number_of_threads; ++i) { |
||||
config.push_back(new WorkerThread(data_read_from, |
||||
data_read_to, |
||||
read_write_ratio, |
||||
working_set_size)); |
||||
}; |
||||
|
||||
return config; |
||||
} |
||||
|
||||
vector<WorkerThread*> SetupBenchmarkWriteHeavy() { |
||||
string test_path; |
||||
env->GetTestDirectory(&test_path); |
||||
test_path.append("/blob_store"); |
||||
|
||||
// config start
|
||||
uint32_t block_size = 16*KB; |
||||
uint32_t file_size = 1*MB; |
||||
double read_write_ratio = 0.1; |
||||
uint64_t data_read_from = 16*KB; |
||||
uint64_t data_read_to = 32*KB; |
||||
int number_of_threads = 10; |
||||
uint64_t working_set_size = 5*MB; |
||||
timeout_sec = 5; |
||||
// config end
|
||||
|
||||
bs = new BlobStore(test_path, block_size, file_size / block_size, 10000, env); |
||||
|
||||
vector <WorkerThread*> config; |
||||
|
||||
for (int i = 0; i < number_of_threads; ++i) { |
||||
config.push_back(new WorkerThread(data_read_from, |
||||
data_read_to, |
||||
read_write_ratio, |
||||
working_set_size)); |
||||
}; |
||||
|
||||
return config; |
||||
} |
||||
|
||||
vector<WorkerThread*> SetupBenchmarkReadHeavy() { |
||||
string test_path; |
||||
env->GetTestDirectory(&test_path); |
||||
test_path.append("/blob_store"); |
||||
|
||||
// config start
|
||||
uint32_t block_size = 16*KB; |
||||
uint32_t file_size = 1*MB; |
||||
double read_write_ratio = 0.9; |
||||
uint64_t data_read_from = 16*KB; |
||||
uint64_t data_read_to = 32*KB; |
||||
int number_of_threads = 10; |
||||
uint64_t working_set_size = 5*MB; |
||||
timeout_sec = 5; |
||||
// config end
|
||||
|
||||
bs = new BlobStore(test_path, block_size, file_size / block_size, 10000, env); |
||||
|
||||
vector <WorkerThread*> config; |
||||
|
||||
for (int i = 0; i < number_of_threads; ++i) { |
||||
config.push_back(new WorkerThread(data_read_from, |
||||
data_read_to, |
||||
read_write_ratio, |
||||
working_set_size)); |
||||
}; |
||||
|
||||
return config; |
||||
} |
||||
} // namespace
|
||||
|
||||
int main(int argc, const char** argv) { |
||||
srand(33); |
||||
env = Env::Default(); |
||||
|
||||
{ |
||||
printf("--- Balanced read/write benchmark ---\n"); |
||||
vector <WorkerThread*> config = SetupBenchmarkBalanced(); |
||||
Result r = StartBenchmark(config); |
||||
r.print(); |
||||
} |
||||
{ |
||||
printf("--- Write heavy benchmark ---\n"); |
||||
vector <WorkerThread*> config = SetupBenchmarkWriteHeavy(); |
||||
Result r = StartBenchmark(config); |
||||
r.print(); |
||||
} |
||||
{ |
||||
printf("--- Read heavy benchmark ---\n"); |
||||
vector <WorkerThread*> config = SetupBenchmarkReadHeavy(); |
||||
Result r = StartBenchmark(config); |
||||
r.print(); |
||||
} |
||||
|
||||
return 0; |
||||
} |
||||
#else // ROCKSDB_LITE
|
||||
#include <stdio.h> |
||||
int main(int argc, char** argv) { |
||||
fprintf(stderr, "Not supported in lite mode.\n"); |
||||
return 1; |
||||
} |
||||
#endif // ROCKSDB_LITE
|
@ -1,272 +0,0 @@ |
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#ifndef ROCKSDB_LITE |
||||
#include "util/blob_store.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
using namespace std; |
||||
|
||||
// BlobChunk
|
||||
bool BlobChunk::ImmediatelyBefore(const BlobChunk& chunk) const { |
||||
// overlapping!?
|
||||
assert(!Overlap(chunk)); |
||||
// size == 0 is a marker, not a block
|
||||
return size != 0 && |
||||
bucket_id == chunk.bucket_id && |
||||
offset + size == chunk.offset; |
||||
} |
||||
|
||||
bool BlobChunk::Overlap(const BlobChunk &chunk) const { |
||||
return size != 0 && chunk.size != 0 && bucket_id == chunk.bucket_id && |
||||
((offset >= chunk.offset && offset < chunk.offset + chunk.size) || |
||||
(chunk.offset >= offset && chunk.offset < offset + size)); |
||||
} |
||||
|
||||
// Blob
|
||||
string Blob::ToString() const { |
||||
string ret; |
||||
for (auto chunk : chunks) { |
||||
PutFixed32(&ret, chunk.bucket_id); |
||||
PutFixed32(&ret, chunk.offset); |
||||
PutFixed32(&ret, chunk.size); |
||||
} |
||||
return ret; |
||||
} |
||||
|
||||
Blob::Blob(const std::string& blob) { |
||||
for (uint32_t i = 0; i < blob.size(); ) { |
||||
uint32_t t[3] = {0}; |
||||
for (int j = 0; j < 3 && i + sizeof(uint32_t) - 1 < blob.size(); |
||||
++j, i += sizeof(uint32_t)) { |
||||
t[j] = DecodeFixed32(blob.data() + i); |
||||
} |
||||
chunks.push_back(BlobChunk(t[0], t[1], t[2])); |
||||
} |
||||
} |
||||
|
||||
// FreeList
|
||||
Status FreeList::Free(const Blob& blob) { |
||||
// add it back to the free list
|
||||
for (auto chunk : blob.chunks) { |
||||
free_blocks_ += chunk.size; |
||||
if (fifo_free_chunks_.size() && |
||||
fifo_free_chunks_.back().ImmediatelyBefore(chunk)) { |
||||
fifo_free_chunks_.back().size += chunk.size; |
||||
} else { |
||||
fifo_free_chunks_.push_back(chunk); |
||||
} |
||||
} |
||||
|
||||
return Status::OK(); |
||||
} |
||||
|
||||
Status FreeList::Allocate(uint32_t blocks, Blob* blob) { |
||||
if (free_blocks_ < blocks) { |
||||
return Status::Incomplete(""); |
||||
} |
||||
|
||||
blob->chunks.clear(); |
||||
free_blocks_ -= blocks; |
||||
|
||||
while (blocks > 0) { |
||||
assert(fifo_free_chunks_.size() > 0); |
||||
auto& front = fifo_free_chunks_.front(); |
||||
if (front.size > blocks) { |
||||
blob->chunks.push_back(BlobChunk(front.bucket_id, front.offset, blocks)); |
||||
front.offset += blocks; |
||||
front.size -= blocks; |
||||
blocks = 0; |
||||
} else { |
||||
blob->chunks.push_back(front); |
||||
blocks -= front.size; |
||||
fifo_free_chunks_.pop_front(); |
||||
} |
||||
} |
||||
assert(blocks == 0); |
||||
|
||||
return Status::OK(); |
||||
} |
||||
|
||||
bool FreeList::Overlap(const Blob &blob) const { |
||||
for (auto chunk : blob.chunks) { |
||||
for (auto itr = fifo_free_chunks_.begin(); |
||||
itr != fifo_free_chunks_.end(); |
||||
++itr) { |
||||
if (itr->Overlap(chunk)) { |
||||
return true; |
||||
} |
||||
} |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
// BlobStore
|
||||
BlobStore::BlobStore(const string& directory, |
||||
uint64_t block_size, |
||||
uint32_t blocks_per_bucket, |
||||
uint32_t max_buckets, |
||||
Env* env) : |
||||
directory_(directory), |
||||
block_size_(block_size), |
||||
blocks_per_bucket_(blocks_per_bucket), |
||||
env_(env), |
||||
max_buckets_(max_buckets) { |
||||
env_->CreateDirIfMissing(directory_); |
||||
|
||||
storage_options_.use_mmap_writes = false; |
||||
storage_options_.use_mmap_reads = false; |
||||
|
||||
buckets_size_ = 0; |
||||
buckets_ = new unique_ptr<RandomRWFile>[max_buckets_]; |
||||
|
||||
CreateNewBucket(); |
||||
} |
||||
|
||||
BlobStore::~BlobStore() { |
||||
// TODO we don't care about recovery for now
|
||||
delete [] buckets_; |
||||
} |
||||
|
||||
Status BlobStore::Put(const Slice& value, Blob* blob) { |
||||
// convert size to number of blocks
|
||||
Status s = Allocate( |
||||
static_cast<uint32_t>((value.size() + block_size_ - 1) / block_size_), |
||||
blob); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
auto size_left = (uint64_t) value.size(); |
||||
|
||||
uint64_t offset = 0; // in bytes, not blocks
|
||||
for (auto chunk : blob->chunks) { |
||||
uint64_t write_size = min(chunk.size * block_size_, size_left); |
||||
assert(chunk.bucket_id < buckets_size_); |
||||
s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_, |
||||
Slice(value.data() + offset, |
||||
write_size)); |
||||
if (!s.ok()) { |
||||
Delete(*blob); |
||||
return s; |
||||
} |
||||
offset += write_size; |
||||
size_left -= write_size; |
||||
if (write_size < chunk.size * block_size_) { |
||||
// if we have any space left in the block, fill it up with zeros
|
||||
string zero_string(chunk.size * block_size_ - write_size, 0); |
||||
s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_ + |
||||
write_size, |
||||
Slice(zero_string)); |
||||
} |
||||
} |
||||
|
||||
if (size_left > 0) { |
||||
Delete(*blob); |
||||
return Status::Corruption("Tried to write more data than fits in the blob"); |
||||
} |
||||
|
||||
return Status::OK(); |
||||
} |
||||
|
||||
Status BlobStore::Get(const Blob& blob, |
||||
string* value) const { |
||||
{ |
||||
// assert that it doesn't overlap with free list
|
||||
// it will get compiled out for release
|
||||
MutexLock l(&free_list_mutex_); |
||||
assert(!free_list_.Overlap(blob)); |
||||
} |
||||
|
||||
value->resize(blob.Size() * block_size_); |
||||
|
||||
uint64_t offset = 0; // in bytes, not blocks
|
||||
for (auto chunk : blob.chunks) { |
||||
Slice result; |
||||
assert(chunk.bucket_id < buckets_size_); |
||||
Status s; |
||||
s = buckets_[chunk.bucket_id].get()->Read(chunk.offset * block_size_, |
||||
chunk.size * block_size_, |
||||
&result, |
||||
&value->at(offset)); |
||||
if (!s.ok()) { |
||||
value->clear(); |
||||
return s; |
||||
} |
||||
if (result.size() < chunk.size * block_size_) { |
||||
value->clear(); |
||||
return Status::Corruption("Could not read in from file"); |
||||
} |
||||
offset += chunk.size * block_size_; |
||||
} |
||||
|
||||
// remove the '\0's at the end of the string
|
||||
value->erase(find(value->begin(), value->end(), '\0'), value->end()); |
||||
|
||||
return Status::OK(); |
||||
} |
||||
|
||||
Status BlobStore::Delete(const Blob& blob) { |
||||
MutexLock l(&free_list_mutex_); |
||||
return free_list_.Free(blob); |
||||
} |
||||
|
||||
Status BlobStore::Sync() { |
||||
for (size_t i = 0; i < buckets_size_; ++i) { |
||||
Status s = buckets_[i].get()->Sync(); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
} |
||||
return Status::OK(); |
||||
} |
||||
|
||||
Status BlobStore::Allocate(uint32_t blocks, Blob* blob) { |
||||
MutexLock l(&free_list_mutex_); |
||||
Status s; |
||||
|
||||
s = free_list_.Allocate(blocks, blob); |
||||
if (!s.ok()) { |
||||
s = CreateNewBucket(); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
s = free_list_.Allocate(blocks, blob); |
||||
} |
||||
|
||||
return s; |
||||
} |
||||
|
||||
// called with free_list_mutex_ held
|
||||
Status BlobStore::CreateNewBucket() { |
||||
MutexLock l(&buckets_mutex_); |
||||
|
||||
if (buckets_size_ >= max_buckets_) { |
||||
return Status::NotSupported("Max size exceeded\n"); |
||||
} |
||||
|
||||
int new_bucket_id = buckets_size_; |
||||
|
||||
char fname[200]; |
||||
sprintf(fname, "%s/%d.bs", directory_.c_str(), new_bucket_id); |
||||
|
||||
Status s = env_->NewRandomRWFile(string(fname), |
||||
&buckets_[new_bucket_id], |
||||
storage_options_); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
|
||||
// whether Allocate succeeds or not, does not affect the overall correctness
|
||||
// of this function - calling Allocate is really optional
|
||||
// (also, tmpfs does not support allocate)
|
||||
buckets_[new_bucket_id].get()->Allocate(0, block_size_ * blocks_per_bucket_); |
||||
|
||||
buckets_size_ = new_bucket_id + 1; |
||||
|
||||
return free_list_.Free(Blob(new_bucket_id, 0, blocks_per_bucket_)); |
||||
} |
||||
|
||||
} // namespace rocksdb
|
||||
#endif // ROCKSDB_LITE
|
@ -1,163 +0,0 @@ |
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#ifndef ROCKSDB_LITE |
||||
#pragma once |
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/status.h" |
||||
#include "port/port.h" |
||||
#include "util/mutexlock.h" |
||||
#include "util/coding.h" |
||||
|
||||
#include <list> |
||||
#include <deque> |
||||
#include <cstdint> |
||||
#include <iostream> |
||||
#include <stdexcept> |
||||
#include <algorithm> |
||||
#include <cstdio> |
||||
|
||||
namespace rocksdb { |
||||
|
||||
struct BlobChunk { |
||||
uint32_t bucket_id; |
||||
uint32_t offset; // in blocks
|
||||
uint32_t size; // in blocks
|
||||
BlobChunk() {} |
||||
BlobChunk(uint32_t _bucket_id, uint32_t _offset, uint32_t _size) |
||||
: bucket_id(_bucket_id), offset(_offset), size(_size) {} |
||||
|
||||
// returns true if it's immediately before chunk
|
||||
bool ImmediatelyBefore(const BlobChunk& chunk) const; |
||||
// returns true if chunks overlap
|
||||
bool Overlap(const BlobChunk &chunk) const; |
||||
}; |
||||
|
||||
// We represent each Blob as a string in format:
|
||||
// bucket_id offset size|bucket_id offset size...
|
||||
// The string can be used to reference the Blob stored on external
|
||||
// device/file
|
||||
// Not thread-safe!
|
||||
struct Blob { |
||||
// Generates the string
|
||||
std::string ToString() const; |
||||
// Parses the previously generated string
|
||||
explicit Blob(const std::string& blob); |
||||
// Creates unfragmented Blob
|
||||
Blob(uint32_t bucket_id, uint32_t offset, uint32_t size) { |
||||
SetOneChunk(bucket_id, offset, size); |
||||
} |
||||
Blob() {} |
||||
|
||||
void SetOneChunk(uint32_t bucket_id, uint32_t offset, uint32_t size) { |
||||
chunks.clear(); |
||||
chunks.push_back(BlobChunk(bucket_id, offset, size)); |
||||
} |
||||
|
||||
uint32_t Size() const { // in blocks
|
||||
uint32_t ret = 0; |
||||
for (auto chunk : chunks) { |
||||
ret += chunk.size; |
||||
} |
||||
assert(ret > 0); |
||||
return ret; |
||||
} |
||||
|
||||
// bucket_id, offset, size
|
||||
std::vector<BlobChunk> chunks; |
||||
}; |
||||
|
||||
// Keeps a list of free chunks
|
||||
// NOT thread-safe. Externally synchronized
|
||||
class FreeList { |
||||
public: |
||||
FreeList() : |
||||
free_blocks_(0) {} |
||||
~FreeList() {} |
||||
|
||||
// Allocates a a blob. Stores the allocated blob in
|
||||
// 'blob'. Returns non-OK status if it failed to allocate.
|
||||
// Thread-safe
|
||||
Status Allocate(uint32_t blocks, Blob* blob); |
||||
// Frees the blob for reuse. Thread-safe
|
||||
Status Free(const Blob& blob); |
||||
|
||||
// returns true if blob is overlapping with any of the
|
||||
// chunks stored in free list
|
||||
bool Overlap(const Blob &blob) const; |
||||
|
||||
private: |
||||
std::deque<BlobChunk> fifo_free_chunks_; |
||||
uint32_t free_blocks_; |
||||
mutable port::Mutex mutex_; |
||||
}; |
||||
|
||||
// thread-safe
|
||||
class BlobStore { |
||||
public: |
||||
// directory - wherever the blobs should be stored. It will be created
|
||||
// if missing
|
||||
// block_size - self explanatory
|
||||
// blocks_per_bucket - how many blocks we want to keep in one bucket.
|
||||
// Bucket is a device or a file that we use to store the blobs.
|
||||
// If we don't have enough blocks to allocate a new blob, we will
|
||||
// try to create a new file or device.
|
||||
// max_buckets - maximum number of buckets BlobStore will create
|
||||
// BlobStore max size in bytes is
|
||||
// max_buckets * blocks_per_bucket * block_size
|
||||
// env - env for creating new files
|
||||
BlobStore(const std::string& directory, |
||||
uint64_t block_size, |
||||
uint32_t blocks_per_bucket, |
||||
uint32_t max_buckets, |
||||
Env* env); |
||||
~BlobStore(); |
||||
|
||||
// Allocates space for value.size bytes (rounded up to be multiple of
|
||||
// block size) and writes value.size bytes from value.data to a backing store.
|
||||
// Sets Blob blob that can than be used for addressing the
|
||||
// stored value. Returns non-OK status on error.
|
||||
Status Put(const Slice& value, Blob* blob); |
||||
// Value needs to have enough space to store all the loaded stuff.
|
||||
// This function is thread safe!
|
||||
Status Get(const Blob& blob, std::string* value) const; |
||||
// Frees the blob for reuse, but does not delete the data
|
||||
// on the backing store.
|
||||
Status Delete(const Blob& blob); |
||||
// Sync all opened files that are modified
|
||||
Status Sync(); |
||||
|
||||
private: |
||||
const std::string directory_; |
||||
// block_size_ is uint64_t because when we multiply with
|
||||
// blocks_size_ we want the result to be uint64_t or
|
||||
// we risk overflowing
|
||||
const uint64_t block_size_; |
||||
const uint32_t blocks_per_bucket_; |
||||
Env* env_; |
||||
EnvOptions storage_options_; |
||||
// protected by free_list_mutex_
|
||||
FreeList free_list_; |
||||
// free_list_mutex_ is locked BEFORE buckets_mutex_
|
||||
mutable port::Mutex free_list_mutex_; |
||||
// protected by buckets_mutex_
|
||||
// array of buckets
|
||||
unique_ptr<RandomRWFile>* buckets_; |
||||
// number of buckets in the array
|
||||
uint32_t buckets_size_; |
||||
uint32_t max_buckets_; |
||||
mutable port::Mutex buckets_mutex_; |
||||
|
||||
// Calls FreeList allocate. If free list can't allocate
|
||||
// new blob, creates new bucket and tries again
|
||||
// Thread-safe
|
||||
Status Allocate(uint32_t blocks, Blob* blob); |
||||
|
||||
// Creates a new backing store and adds all the blocks
|
||||
// from the new backing store to the free list
|
||||
Status CreateNewBucket(); |
||||
}; |
||||
|
||||
} // namespace rocksdb
|
||||
#endif // ROCKSDB_LITE
|
@ -1,200 +0,0 @@ |
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "util/blob_store.h" |
||||
|
||||
#include "util/testharness.h" |
||||
#include "util/testutil.h" |
||||
#include "util/random.h" |
||||
|
||||
#include <cstdlib> |
||||
#include <string> |
||||
|
||||
namespace rocksdb { |
||||
|
||||
using namespace std; |
||||
|
||||
class BlobStoreTest { }; |
||||
|
||||
TEST(BlobStoreTest, RangeParseTest) { |
||||
Blob e; |
||||
for (int i = 0; i < 5; ++i) { |
||||
e.chunks.push_back(BlobChunk(rand(), rand(), rand())); |
||||
} |
||||
string x = e.ToString(); |
||||
Blob nx(x); |
||||
|
||||
ASSERT_EQ(nx.ToString(), x); |
||||
} |
||||
|
||||
// make sure we're reusing the freed space
|
||||
TEST(BlobStoreTest, SanityTest) { |
||||
const uint64_t block_size = 10; |
||||
const uint32_t blocks_per_file = 20; |
||||
Random random(5); |
||||
|
||||
BlobStore blob_store(test::TmpDir() + "/blob_store_test", |
||||
block_size, |
||||
blocks_per_file, |
||||
1000, |
||||
Env::Default()); |
||||
|
||||
string buf; |
||||
|
||||
// put string of size 170
|
||||
test::RandomString(&random, 170, &buf); |
||||
Blob r1; |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r1)); |
||||
// use the first file
|
||||
for (size_t i = 0; i < r1.chunks.size(); ++i) { |
||||
ASSERT_EQ(r1.chunks[0].bucket_id, 0u); |
||||
} |
||||
|
||||
// put string of size 30
|
||||
test::RandomString(&random, 30, &buf); |
||||
Blob r2; |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r2)); |
||||
// use the first file
|
||||
for (size_t i = 0; i < r2.chunks.size(); ++i) { |
||||
ASSERT_EQ(r2.chunks[0].bucket_id, 0u); |
||||
} |
||||
|
||||
// delete blob of size 170
|
||||
ASSERT_OK(blob_store.Delete(r1)); |
||||
|
||||
// put a string of size 100
|
||||
test::RandomString(&random, 100, &buf); |
||||
Blob r3; |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r3)); |
||||
// use the first file
|
||||
for (size_t i = 0; i < r3.chunks.size(); ++i) { |
||||
ASSERT_EQ(r3.chunks[0].bucket_id, 0u); |
||||
} |
||||
|
||||
// put a string of size 70
|
||||
test::RandomString(&random, 70, &buf); |
||||
Blob r4; |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r4)); |
||||
// use the first file
|
||||
for (size_t i = 0; i < r4.chunks.size(); ++i) { |
||||
ASSERT_EQ(r4.chunks[0].bucket_id, 0u); |
||||
} |
||||
|
||||
// put a string of size 5
|
||||
test::RandomString(&random, 5, &buf); |
||||
Blob r5; |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r5)); |
||||
// now you get to use the second file
|
||||
for (size_t i = 0; i < r5.chunks.size(); ++i) { |
||||
ASSERT_EQ(r5.chunks[0].bucket_id, 1u); |
||||
} |
||||
} |
||||
|
||||
TEST(BlobStoreTest, FragmentedChunksTest) { |
||||
const uint64_t block_size = 10; |
||||
const uint32_t blocks_per_file = 20; |
||||
Random random(5); |
||||
|
||||
BlobStore blob_store(test::TmpDir() + "/blob_store_test", |
||||
block_size, |
||||
blocks_per_file, |
||||
1000, |
||||
Env::Default()); |
||||
|
||||
string buf; |
||||
|
||||
vector <Blob> r(4); |
||||
|
||||
// put 4 strings of size 50
|
||||
for (int k = 0; k < 4; ++k) { |
||||
test::RandomString(&random, 50, &buf); |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r[k])); |
||||
// use the first file
|
||||
for (size_t i = 0; i < r[k].chunks.size(); ++i) { |
||||
ASSERT_EQ(r[k].chunks[0].bucket_id, 0u); |
||||
} |
||||
} |
||||
|
||||
// delete the first and third
|
||||
ASSERT_OK(blob_store.Delete(r[0])); |
||||
ASSERT_OK(blob_store.Delete(r[2])); |
||||
|
||||
// put string of size 100. it should reuse space that we deleting
|
||||
// by deleting first and third strings of size 50
|
||||
test::RandomString(&random, 100, &buf); |
||||
Blob r2; |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r2)); |
||||
// use the first file
|
||||
for (size_t i = 0; i < r2.chunks.size(); ++i) { |
||||
ASSERT_EQ(r2.chunks[0].bucket_id, 0u); |
||||
} |
||||
} |
||||
|
||||
TEST(BlobStoreTest, CreateAndStoreTest) { |
||||
const uint64_t block_size = 10; |
||||
const uint32_t blocks_per_file = 1000; |
||||
const int max_blurb_size = 300; |
||||
Random random(5); |
||||
|
||||
BlobStore blob_store(test::TmpDir() + "/blob_store_test", |
||||
block_size, |
||||
blocks_per_file, |
||||
10000, |
||||
Env::Default()); |
||||
vector<pair<Blob, string>> ranges; |
||||
|
||||
for (int i = 0; i < 2000; ++i) { |
||||
int decision = rand() % 5; |
||||
if (decision <= 2 || ranges.size() == 0) { |
||||
string buf; |
||||
int size_blocks = (rand() % max_blurb_size + 1); |
||||
int string_size = size_blocks * block_size - (rand() % block_size); |
||||
test::RandomString(&random, string_size, &buf); |
||||
Blob r; |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r)); |
||||
ranges.push_back(make_pair(r, buf)); |
||||
} else if (decision == 3) { |
||||
int ti = rand() % ranges.size(); |
||||
string out_buf; |
||||
ASSERT_OK(blob_store.Get(ranges[ti].first, &out_buf)); |
||||
ASSERT_EQ(ranges[ti].second, out_buf); |
||||
} else { |
||||
int ti = rand() % ranges.size(); |
||||
ASSERT_OK(blob_store.Delete(ranges[ti].first)); |
||||
ranges.erase(ranges.begin() + ti); |
||||
} |
||||
} |
||||
ASSERT_OK(blob_store.Sync()); |
||||
} |
||||
|
||||
TEST(BlobStoreTest, MaxSizeTest) { |
||||
const uint64_t block_size = 10; |
||||
const uint32_t blocks_per_file = 100; |
||||
const int max_buckets = 10; |
||||
Random random(5); |
||||
|
||||
BlobStore blob_store(test::TmpDir() + "/blob_store_test", |
||||
block_size, |
||||
blocks_per_file, |
||||
max_buckets, |
||||
Env::Default()); |
||||
string buf; |
||||
for (int i = 0; i < max_buckets; ++i) { |
||||
test::RandomString(&random, 1000, &buf); |
||||
Blob r; |
||||
ASSERT_OK(blob_store.Put(Slice(buf), &r)); |
||||
} |
||||
|
||||
test::RandomString(&random, 1000, &buf); |
||||
Blob r; |
||||
// should fail because max size
|
||||
Status s = blob_store.Put(Slice(buf), &r); |
||||
ASSERT_EQ(s.ok(), false); |
||||
} |
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) { |
||||
return rocksdb::test::RunAllTests(); |
||||
} |
Loading…
Reference in new issue