Enable blobs to be fragmented

Summary:
I have implemented a FreeList version that supports fragmented blob chunks. Each block gets allocated and freed in FIFO order. Since the idea for the blocks to be big, we will not take a big hit of non-sequential IO. Free list is also faster, taking only O(k) size in both free and allocate instead of O(N) as before.

See more info on the task: https://our.intern.facebook.com/intern/tasks/?t=2990558

Also, I'm taking Slice instead of const char * and size in Put function.

Test Plan: unittests

Reviewers: haobo, kailiu, dhruba, emayanke

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13569
main
Igor Canadi 11 years ago
parent 70e87f7866
commit 30f1b97a06
  1. 149
      util/blob_store.cc
  2. 47
      util/blob_store.h
  3. 51
      util/blob_store_test.cc

@ -47,59 +47,15 @@ Blob::Blob(const std::string& blob) {
}
// FreeList
FreeList::FreeList() {
// We add (0, 0, 0) blob because it makes our life easier and
// code cleaner. (0, 0, 0) is always in the list so we can
// guarantee that free_chunks_list_ != nullptr, which avoids
// lots of unnecessary ifs
free_chunks_list_ = (FreeChunk *)malloc(sizeof(FreeChunk));
free_chunks_list_->chunk = BlobChunk(0, 0, 0);
free_chunks_list_->next = nullptr;
}
FreeList::~FreeList() {
while (free_chunks_list_ != nullptr) {
FreeChunk* t = free_chunks_list_;
free_chunks_list_ = free_chunks_list_->next;
free(t);
}
}
Status FreeList::Free(const Blob& blob) {
MutexLock l(&mutex_);
// add it back to the free list
for (auto chunk : blob.chunks) {
FreeChunk* itr = free_chunks_list_;
// find a node AFTER which we'll add the block
for ( ; itr->next != nullptr && itr->next->chunk <= chunk;
itr = itr->next) {
}
// try to merge with previous block
if (itr->chunk.ImmediatelyBefore(chunk)) {
// merge
itr->chunk.size += chunk.size;
free_blocks_ += chunk.size;
if (fifo_free_chunks_.size() &&
fifo_free_chunks_.back().ImmediatelyBefore(chunk)) {
fifo_free_chunks_.back().size += chunk.size;
} else {
// Insert the block after itr
FreeChunk* t = (FreeChunk*)malloc(sizeof(FreeChunk));
if (t == nullptr) {
throw runtime_error("Malloc failed");
}
t->chunk = chunk;
t->next = itr->next;
itr->next = t;
itr = t;
}
// try to merge with the next block
if (itr->next != nullptr &&
itr->chunk.ImmediatelyBefore(itr->next->chunk)) {
FreeChunk *tobedeleted = itr->next;
itr->chunk.size += itr->next->chunk.size;
itr->next = itr->next->next;
free(tobedeleted);
fifo_free_chunks_.push_back(chunk);
}
}
@ -107,48 +63,38 @@ Status FreeList::Free(const Blob& blob) {
}
Status FreeList::Allocate(uint32_t blocks, Blob* blob) {
MutexLock l(&mutex_);
FreeChunk** best_fit_node = nullptr;
// Find the smallest free chunk whose size is greater or equal to blocks
for (FreeChunk** itr = &free_chunks_list_; (*itr) != nullptr;
itr = &((*itr)->next)) {
if ((*itr)->chunk.size >= blocks &&
(best_fit_node == nullptr ||
(*best_fit_node)->chunk.size > (*itr)->chunk.size)) {
best_fit_node = itr;
}
}
if (best_fit_node == nullptr || *best_fit_node == nullptr) {
// Not enough memory
if (free_blocks_ < blocks) {
return Status::Incomplete("");
}
blob->SetOneChunk((*best_fit_node)->chunk.bucket_id,
(*best_fit_node)->chunk.offset,
blocks);
if ((*best_fit_node)->chunk.size > blocks) {
// just shorten best_fit_node
(*best_fit_node)->chunk.offset += blocks;
(*best_fit_node)->chunk.size -= blocks;
} else {
assert(blocks == (*best_fit_node)->chunk.size);
// delete best_fit_node
FreeChunk* t = *best_fit_node;
(*best_fit_node) = (*best_fit_node)->next;
free(t);
blob->chunks.clear();
free_blocks_ -= blocks;
while (blocks > 0) {
assert(fifo_free_chunks_.size() > 0);
auto& front = fifo_free_chunks_.front();
if (front.size > blocks) {
blob->chunks.push_back(BlobChunk(front.bucket_id, front.offset, blocks));
front.offset += blocks;
front.size -= blocks;
blocks = 0;
} else {
blob->chunks.push_back(front);
blocks -= front.size;
fifo_free_chunks_.pop_front();
}
}
assert(blocks == 0);
return Status::OK();
}
bool FreeList::Overlap(const Blob &blob) const {
MutexLock l(&mutex_);
for (auto chunk : blob.chunks) {
for (auto itr = free_chunks_list_; itr != nullptr; itr = itr->next) {
if (itr->chunk.Overlap(chunk)) {
for (auto itr = fifo_free_chunks_.begin();
itr != fifo_free_chunks_.end();
++itr) {
if (itr->Overlap(chunk)) {
return true;
}
}
@ -177,26 +123,28 @@ BlobStore::~BlobStore() {
// TODO we don't care about recovery for now
}
Status BlobStore::Put(const char* value, uint64_t size, Blob* blob) {
Status BlobStore::Put(const Slice& value, Blob* blob) {
// convert size to number of blocks
Status s = Allocate((size + block_size_ - 1) / block_size_, blob);
Status s = Allocate((value.size() + block_size_ - 1) / block_size_, blob);
if (!s.ok()) {
return s;
}
ReadLock l(&buckets_mutex_);
size_t size_left = value.size();
uint64_t offset = 0; // in bytes, not blocks
for (auto chunk : blob->chunks) {
uint64_t write_size = min(chunk.size * block_size_, size);
uint64_t write_size = min(chunk.size * block_size_, size_left);
assert(chunk.bucket_id < buckets_.size());
s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_,
Slice(value + offset,
Slice(value.data() + offset,
write_size));
if (!s.ok()) {
Delete(*blob);
return s;
}
offset += write_size;
size -= write_size;
size_left -= write_size;
if (write_size < chunk.size * block_size_) {
// if we have any space left in the block, fill it up with zeros
string zero_string(chunk.size * block_size_ - write_size, 0);
@ -206,7 +154,7 @@ Status BlobStore::Put(const char* value, uint64_t size, Blob* blob) {
}
}
if (size > 0) {
if (size_left > 0) {
Delete(*blob);
return Status::IOError("Tried to write more data than fits in the blob");
}
@ -216,16 +164,13 @@ Status BlobStore::Put(const char* value, uint64_t size, Blob* blob) {
Status BlobStore::Get(const Blob& blob,
string* value) const {
ReadLock l(&buckets_mutex_);
// assert that it doesn't overlap with free list
// it will get compiled out for release
assert(!free_list_.Overlap(blob));
uint32_t total_size = 0; // in blocks
for (auto chunk : blob.chunks) {
total_size += chunk.size;
}
assert(total_size > 0);
value->resize(total_size * block_size_);
value->resize(blob.Size() * block_size_);
uint64_t offset = 0; // in bytes, not blocks
for (auto chunk : blob.chunks) {
@ -250,13 +195,23 @@ Status BlobStore::Get(const Blob& blob,
}
Status BlobStore::Delete(const Blob& blob) {
MutexLock l(&free_list_mutex_);
return free_list_.Free(blob);
}
Status BlobStore::Sync() {
ReadLock l(&buckets_mutex_);
for (size_t i = 0; i < buckets_.size(); ++i) {
Status s = buckets_[i].get()->Sync();
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status BlobStore::Allocate(uint32_t blocks, Blob* blob) {
// TODO we don't currently support fragmented blobs
MutexLock l(&allocate_mutex_);
assert(blocks <= blocks_per_bucket_);
MutexLock l(&free_list_mutex_);
Status s;
s = free_list_.Allocate(blocks, blob);
@ -271,7 +226,9 @@ Status BlobStore::Allocate(uint32_t blocks, Blob* blob) {
return s;
}
// called with free_list_mutex_ held
Status BlobStore::CreateNewBucket() {
WriteLock l(&buckets_mutex_);
int new_bucket_id;
new_bucket_id = buckets_.size();
buckets_.push_back(unique_ptr<RandomRWFile>());

@ -10,6 +10,7 @@
#include "util/coding.h"
#include <list>
#include <deque>
#include <cstdint>
#include <iostream>
#include <stdexcept>
@ -26,16 +27,6 @@ struct BlobChunk {
BlobChunk(uint32_t bucket_id, uint32_t offset, uint32_t size) :
bucket_id(bucket_id), offset(offset), size(size) {}
bool operator <= (const BlobChunk& chunk) const {
if (bucket_id != chunk.bucket_id) {
return bucket_id < chunk.bucket_id;
}
if (offset != chunk.offset) {
return offset < chunk.offset;
}
return true;
}
// returns true if it's immediately before chunk
bool ImmediatelyBefore(const BlobChunk& chunk) const;
// returns true if chunks overlap
@ -63,15 +54,26 @@ struct Blob {
chunks.push_back(BlobChunk(bucket_id, offset, size));
}
uint32_t Size() const { // in blocks
uint32_t ret = 0;
for (auto chunk : chunks) {
ret += chunk.size;
}
assert(ret > 0);
return ret;
}
// bucket_id, offset, size
std::vector<BlobChunk> chunks;
};
// Keeps a list of free chunks
// NOT thread-safe. Externally synchronized
class FreeList {
public:
FreeList ();
~FreeList();
FreeList() :
free_blocks_(0) {}
~FreeList() {}
// Allocates a a blob. Stores the allocated blob in
// 'blob'. Returns non-OK status if it failed to allocate.
@ -85,12 +87,8 @@ class FreeList {
bool Overlap(const Blob &blob) const;
private:
struct FreeChunk {
BlobChunk chunk;
FreeChunk* next;
};
FreeChunk* free_chunks_list_;
std::deque<BlobChunk> fifo_free_chunks_;
uint32_t free_blocks_;
mutable port::Mutex mutex_;
};
@ -111,17 +109,19 @@ class BlobStore {
Env* env);
~BlobStore();
// Allocates space for size bytes (rounded up to be multiple of
// block size) and writes size bytes from value to a backing store.
// Allocates space for value.size bytes (rounded up to be multiple of
// block size) and writes value.size bytes from value.data to a backing store.
// Sets Blob blob that can than be used for addressing the
// stored value. Returns non-OK status on error.
Status Put(const char* value, uint64_t size, Blob* blob);
Status Put(const Slice& value, Blob* blob);
// Value needs to have enough space to store all the loaded stuff.
// This function is thread safe!
Status Get(const Blob& blob, std::string* value) const;
// Frees the blob for reuse, but does not delete the data
// on the backing store.
Status Delete(const Blob& blob);
// Sync all opened files that are modified
Status Sync();
private:
const std::string directory_;
@ -132,10 +132,12 @@ class BlobStore {
const uint32_t blocks_per_bucket_;
Env* env_;
EnvOptions storage_options_;
// protected by free_list_mutex_
FreeList free_list_;
mutable port::Mutex free_list_mutex_;
// protected by buckets mutex
std::vector<unique_ptr<RandomRWFile>> buckets_;
mutable port::Mutex allocate_mutex_;
mutable port::RWMutex buckets_mutex_;
// Calls FreeList allocate. If free list can't allocate
// new blob, creates new bucket and tries again
@ -144,7 +146,6 @@ class BlobStore {
// Creates a new backing store and adds all the blocks
// from the new backing store to the free list
// NOT thread-safe, call with lock held
Status CreateNewBucket();
};

@ -44,7 +44,7 @@ TEST(BlobStoreTest, SanityTest) {
// put string of size 170
test::RandomString(&random, 170, &buf);
Blob r1;
ASSERT_OK(blob_store.Put(buf.data(), 170, &r1));
ASSERT_OK(blob_store.Put(Slice(buf), &r1));
// use the first file
for (size_t i = 0; i < r1.chunks.size(); ++i) {
ASSERT_EQ(r1.chunks[0].bucket_id, 0u);
@ -53,7 +53,7 @@ TEST(BlobStoreTest, SanityTest) {
// put string of size 30
test::RandomString(&random, 30, &buf);
Blob r2;
ASSERT_OK(blob_store.Put(buf.data(), 30, &r2));
ASSERT_OK(blob_store.Put(Slice(buf), &r2));
// use the first file
for (size_t i = 0; i < r2.chunks.size(); ++i) {
ASSERT_EQ(r2.chunks[0].bucket_id, 0u);
@ -65,7 +65,7 @@ TEST(BlobStoreTest, SanityTest) {
// put a string of size 100
test::RandomString(&random, 100, &buf);
Blob r3;
ASSERT_OK(blob_store.Put(buf.data(), 100, &r3));
ASSERT_OK(blob_store.Put(Slice(buf), &r3));
// use the first file
for (size_t i = 0; i < r3.chunks.size(); ++i) {
ASSERT_EQ(r3.chunks[0].bucket_id, 0u);
@ -74,7 +74,7 @@ TEST(BlobStoreTest, SanityTest) {
// put a string of size 70
test::RandomString(&random, 70, &buf);
Blob r4;
ASSERT_OK(blob_store.Put(buf.data(), 70, &r4));
ASSERT_OK(blob_store.Put(Slice(buf), &r4));
// use the first file
for (size_t i = 0; i < r4.chunks.size(); ++i) {
ASSERT_EQ(r4.chunks[0].bucket_id, 0u);
@ -83,12 +83,50 @@ TEST(BlobStoreTest, SanityTest) {
// put a string of size 5
test::RandomString(&random, 5, &buf);
Blob r5;
ASSERT_OK(blob_store.Put(buf.data(), 70, &r5));
ASSERT_OK(blob_store.Put(Slice(buf), &r5));
// now you get to use the second file
for (size_t i = 0; i < r5.chunks.size(); ++i) {
ASSERT_EQ(r5.chunks[0].bucket_id, 1u);
}
}
TEST(BlobStoreTest, FragmentedChunksTest) {
const uint64_t block_size = 10;
const uint32_t blocks_per_file = 20;
Random random(5);
BlobStore blob_store(test::TmpDir() + "/blob_store_test",
block_size,
blocks_per_file,
Env::Default());
string buf;
vector <Blob> r(4);
// put 4 strings of size 50
for (int k = 0; k < 4; ++k) {
test::RandomString(&random, 50, &buf);
ASSERT_OK(blob_store.Put(Slice(buf), &r[k]));
// use the first file
for (size_t i = 0; i < r[k].chunks.size(); ++i) {
ASSERT_EQ(r[k].chunks[0].bucket_id, 0u);
}
}
// delete the first and third
ASSERT_OK(blob_store.Delete(r[0]));
ASSERT_OK(blob_store.Delete(r[2]));
// put string of size 100. it should reuse space that we deleting
// by deleting first and third strings of size 50
test::RandomString(&random, 100, &buf);
Blob r2;
ASSERT_OK(blob_store.Put(Slice(buf), &r2));
// use the first file
for (size_t i = 0; i < r2.chunks.size(); ++i) {
ASSERT_EQ(r2.chunks[0].bucket_id, 0u);
}
}
TEST(BlobStoreTest, CreateAndStoreTest) {
@ -111,7 +149,7 @@ TEST(BlobStoreTest, CreateAndStoreTest) {
int string_size = size_blocks * block_size - (rand() % block_size);
test::RandomString(&random, string_size, &buf);
Blob r;
ASSERT_OK(blob_store.Put(buf.data(), string_size, &r));
ASSERT_OK(blob_store.Put(Slice(buf), &r));
ranges.push_back(make_pair(r, buf));
} else if (decision == 3) {
int ti = rand() % ranges.size();
@ -124,6 +162,7 @@ TEST(BlobStoreTest, CreateAndStoreTest) {
ranges.erase(ranges.begin() + ti);
}
}
ASSERT_OK(blob_store.Sync());
}
} // namespace rocksdb

Loading…
Cancel
Save