diff --git a/util/blob_store.cc b/util/blob_store.cc index d082c4189..a30d48e66 100644 --- a/util/blob_store.cc +++ b/util/blob_store.cc @@ -47,59 +47,15 @@ Blob::Blob(const std::string& blob) { } // FreeList -FreeList::FreeList() { - // We add (0, 0, 0) blob because it makes our life easier and - // code cleaner. (0, 0, 0) is always in the list so we can - // guarantee that free_chunks_list_ != nullptr, which avoids - // lots of unnecessary ifs - free_chunks_list_ = (FreeChunk *)malloc(sizeof(FreeChunk)); - free_chunks_list_->chunk = BlobChunk(0, 0, 0); - free_chunks_list_->next = nullptr; -} - -FreeList::~FreeList() { - while (free_chunks_list_ != nullptr) { - FreeChunk* t = free_chunks_list_; - free_chunks_list_ = free_chunks_list_->next; - free(t); - } -} - Status FreeList::Free(const Blob& blob) { - MutexLock l(&mutex_); - // add it back to the free list for (auto chunk : blob.chunks) { - FreeChunk* itr = free_chunks_list_; - - // find a node AFTER which we'll add the block - for ( ; itr->next != nullptr && itr->next->chunk <= chunk; - itr = itr->next) { - } - - // try to merge with previous block - if (itr->chunk.ImmediatelyBefore(chunk)) { - // merge - itr->chunk.size += chunk.size; + free_blocks_ += chunk.size; + if (fifo_free_chunks_.size() && + fifo_free_chunks_.back().ImmediatelyBefore(chunk)) { + fifo_free_chunks_.back().size += chunk.size; } else { - // Insert the block after itr - FreeChunk* t = (FreeChunk*)malloc(sizeof(FreeChunk)); - if (t == nullptr) { - throw runtime_error("Malloc failed"); - } - t->chunk = chunk; - t->next = itr->next; - itr->next = t; - itr = t; - } - - // try to merge with the next block - if (itr->next != nullptr && - itr->chunk.ImmediatelyBefore(itr->next->chunk)) { - FreeChunk *tobedeleted = itr->next; - itr->chunk.size += itr->next->chunk.size; - itr->next = itr->next->next; - free(tobedeleted); + fifo_free_chunks_.push_back(chunk); } } @@ -107,48 +63,38 @@ Status FreeList::Free(const Blob& blob) { } Status FreeList::Allocate(uint32_t blocks, Blob* blob) { - MutexLock l(&mutex_); - FreeChunk** best_fit_node = nullptr; - - // Find the smallest free chunk whose size is greater or equal to blocks - for (FreeChunk** itr = &free_chunks_list_; (*itr) != nullptr; - itr = &((*itr)->next)) { - if ((*itr)->chunk.size >= blocks && - (best_fit_node == nullptr || - (*best_fit_node)->chunk.size > (*itr)->chunk.size)) { - best_fit_node = itr; - } - } - - if (best_fit_node == nullptr || *best_fit_node == nullptr) { - // Not enough memory + if (free_blocks_ < blocks) { return Status::Incomplete(""); } - blob->SetOneChunk((*best_fit_node)->chunk.bucket_id, - (*best_fit_node)->chunk.offset, - blocks); - - if ((*best_fit_node)->chunk.size > blocks) { - // just shorten best_fit_node - (*best_fit_node)->chunk.offset += blocks; - (*best_fit_node)->chunk.size -= blocks; - } else { - assert(blocks == (*best_fit_node)->chunk.size); - // delete best_fit_node - FreeChunk* t = *best_fit_node; - (*best_fit_node) = (*best_fit_node)->next; - free(t); + blob->chunks.clear(); + free_blocks_ -= blocks; + + while (blocks > 0) { + assert(fifo_free_chunks_.size() > 0); + auto& front = fifo_free_chunks_.front(); + if (front.size > blocks) { + blob->chunks.push_back(BlobChunk(front.bucket_id, front.offset, blocks)); + front.offset += blocks; + front.size -= blocks; + blocks = 0; + } else { + blob->chunks.push_back(front); + blocks -= front.size; + fifo_free_chunks_.pop_front(); + } } + assert(blocks == 0); return Status::OK(); } bool FreeList::Overlap(const Blob &blob) const { - MutexLock l(&mutex_); for (auto chunk : blob.chunks) { - for (auto itr = free_chunks_list_; itr != nullptr; itr = itr->next) { - if (itr->chunk.Overlap(chunk)) { + for (auto itr = fifo_free_chunks_.begin(); + itr != fifo_free_chunks_.end(); + ++itr) { + if (itr->Overlap(chunk)) { return true; } } @@ -177,26 +123,28 @@ BlobStore::~BlobStore() { // TODO we don't care about recovery for now } -Status BlobStore::Put(const char* value, uint64_t size, Blob* blob) { +Status BlobStore::Put(const Slice& value, Blob* blob) { // convert size to number of blocks - Status s = Allocate((size + block_size_ - 1) / block_size_, blob); + Status s = Allocate((value.size() + block_size_ - 1) / block_size_, blob); if (!s.ok()) { return s; } + ReadLock l(&buckets_mutex_); + size_t size_left = value.size(); uint64_t offset = 0; // in bytes, not blocks for (auto chunk : blob->chunks) { - uint64_t write_size = min(chunk.size * block_size_, size); + uint64_t write_size = min(chunk.size * block_size_, size_left); assert(chunk.bucket_id < buckets_.size()); s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_, - Slice(value + offset, + Slice(value.data() + offset, write_size)); if (!s.ok()) { Delete(*blob); return s; } offset += write_size; - size -= write_size; + size_left -= write_size; if (write_size < chunk.size * block_size_) { // if we have any space left in the block, fill it up with zeros string zero_string(chunk.size * block_size_ - write_size, 0); @@ -206,7 +154,7 @@ Status BlobStore::Put(const char* value, uint64_t size, Blob* blob) { } } - if (size > 0) { + if (size_left > 0) { Delete(*blob); return Status::IOError("Tried to write more data than fits in the blob"); } @@ -216,16 +164,13 @@ Status BlobStore::Put(const char* value, uint64_t size, Blob* blob) { Status BlobStore::Get(const Blob& blob, string* value) const { + ReadLock l(&buckets_mutex_); + // assert that it doesn't overlap with free list // it will get compiled out for release assert(!free_list_.Overlap(blob)); - uint32_t total_size = 0; // in blocks - for (auto chunk : blob.chunks) { - total_size += chunk.size; - } - assert(total_size > 0); - value->resize(total_size * block_size_); + value->resize(blob.Size() * block_size_); uint64_t offset = 0; // in bytes, not blocks for (auto chunk : blob.chunks) { @@ -250,13 +195,23 @@ Status BlobStore::Get(const Blob& blob, } Status BlobStore::Delete(const Blob& blob) { + MutexLock l(&free_list_mutex_); return free_list_.Free(blob); } +Status BlobStore::Sync() { + ReadLock l(&buckets_mutex_); + for (size_t i = 0; i < buckets_.size(); ++i) { + Status s = buckets_[i].get()->Sync(); + if (!s.ok()) { + return s; + } + } + return Status::OK(); +} + Status BlobStore::Allocate(uint32_t blocks, Blob* blob) { - // TODO we don't currently support fragmented blobs - MutexLock l(&allocate_mutex_); - assert(blocks <= blocks_per_bucket_); + MutexLock l(&free_list_mutex_); Status s; s = free_list_.Allocate(blocks, blob); @@ -271,7 +226,9 @@ Status BlobStore::Allocate(uint32_t blocks, Blob* blob) { return s; } +// called with free_list_mutex_ held Status BlobStore::CreateNewBucket() { + WriteLock l(&buckets_mutex_); int new_bucket_id; new_bucket_id = buckets_.size(); buckets_.push_back(unique_ptr()); diff --git a/util/blob_store.h b/util/blob_store.h index 5d505f66d..be9947216 100644 --- a/util/blob_store.h +++ b/util/blob_store.h @@ -10,6 +10,7 @@ #include "util/coding.h" #include +#include #include #include #include @@ -26,16 +27,6 @@ struct BlobChunk { BlobChunk(uint32_t bucket_id, uint32_t offset, uint32_t size) : bucket_id(bucket_id), offset(offset), size(size) {} - bool operator <= (const BlobChunk& chunk) const { - if (bucket_id != chunk.bucket_id) { - return bucket_id < chunk.bucket_id; - } - if (offset != chunk.offset) { - return offset < chunk.offset; - } - return true; - } - // returns true if it's immediately before chunk bool ImmediatelyBefore(const BlobChunk& chunk) const; // returns true if chunks overlap @@ -63,15 +54,26 @@ struct Blob { chunks.push_back(BlobChunk(bucket_id, offset, size)); } + uint32_t Size() const { // in blocks + uint32_t ret = 0; + for (auto chunk : chunks) { + ret += chunk.size; + } + assert(ret > 0); + return ret; + } + // bucket_id, offset, size std::vector chunks; }; // Keeps a list of free chunks +// NOT thread-safe. Externally synchronized class FreeList { public: - FreeList (); - ~FreeList(); + FreeList() : + free_blocks_(0) {} + ~FreeList() {} // Allocates a a blob. Stores the allocated blob in // 'blob'. Returns non-OK status if it failed to allocate. @@ -85,12 +87,8 @@ class FreeList { bool Overlap(const Blob &blob) const; private: - struct FreeChunk { - BlobChunk chunk; - FreeChunk* next; - }; - - FreeChunk* free_chunks_list_; + std::deque fifo_free_chunks_; + uint32_t free_blocks_; mutable port::Mutex mutex_; }; @@ -111,17 +109,19 @@ class BlobStore { Env* env); ~BlobStore(); - // Allocates space for size bytes (rounded up to be multiple of - // block size) and writes size bytes from value to a backing store. + // Allocates space for value.size bytes (rounded up to be multiple of + // block size) and writes value.size bytes from value.data to a backing store. // Sets Blob blob that can than be used for addressing the // stored value. Returns non-OK status on error. - Status Put(const char* value, uint64_t size, Blob* blob); + Status Put(const Slice& value, Blob* blob); // Value needs to have enough space to store all the loaded stuff. // This function is thread safe! Status Get(const Blob& blob, std::string* value) const; // Frees the blob for reuse, but does not delete the data // on the backing store. Status Delete(const Blob& blob); + // Sync all opened files that are modified + Status Sync(); private: const std::string directory_; @@ -132,10 +132,12 @@ class BlobStore { const uint32_t blocks_per_bucket_; Env* env_; EnvOptions storage_options_; + // protected by free_list_mutex_ FreeList free_list_; + mutable port::Mutex free_list_mutex_; // protected by buckets mutex std::vector> buckets_; - mutable port::Mutex allocate_mutex_; + mutable port::RWMutex buckets_mutex_; // Calls FreeList allocate. If free list can't allocate // new blob, creates new bucket and tries again @@ -144,7 +146,6 @@ class BlobStore { // Creates a new backing store and adds all the blocks // from the new backing store to the free list - // NOT thread-safe, call with lock held Status CreateNewBucket(); }; diff --git a/util/blob_store_test.cc b/util/blob_store_test.cc index c8a8e8650..0983faece 100644 --- a/util/blob_store_test.cc +++ b/util/blob_store_test.cc @@ -44,7 +44,7 @@ TEST(BlobStoreTest, SanityTest) { // put string of size 170 test::RandomString(&random, 170, &buf); Blob r1; - ASSERT_OK(blob_store.Put(buf.data(), 170, &r1)); + ASSERT_OK(blob_store.Put(Slice(buf), &r1)); // use the first file for (size_t i = 0; i < r1.chunks.size(); ++i) { ASSERT_EQ(r1.chunks[0].bucket_id, 0u); @@ -53,7 +53,7 @@ TEST(BlobStoreTest, SanityTest) { // put string of size 30 test::RandomString(&random, 30, &buf); Blob r2; - ASSERT_OK(blob_store.Put(buf.data(), 30, &r2)); + ASSERT_OK(blob_store.Put(Slice(buf), &r2)); // use the first file for (size_t i = 0; i < r2.chunks.size(); ++i) { ASSERT_EQ(r2.chunks[0].bucket_id, 0u); @@ -65,7 +65,7 @@ TEST(BlobStoreTest, SanityTest) { // put a string of size 100 test::RandomString(&random, 100, &buf); Blob r3; - ASSERT_OK(blob_store.Put(buf.data(), 100, &r3)); + ASSERT_OK(blob_store.Put(Slice(buf), &r3)); // use the first file for (size_t i = 0; i < r3.chunks.size(); ++i) { ASSERT_EQ(r3.chunks[0].bucket_id, 0u); @@ -74,7 +74,7 @@ TEST(BlobStoreTest, SanityTest) { // put a string of size 70 test::RandomString(&random, 70, &buf); Blob r4; - ASSERT_OK(blob_store.Put(buf.data(), 70, &r4)); + ASSERT_OK(blob_store.Put(Slice(buf), &r4)); // use the first file for (size_t i = 0; i < r4.chunks.size(); ++i) { ASSERT_EQ(r4.chunks[0].bucket_id, 0u); @@ -83,12 +83,50 @@ TEST(BlobStoreTest, SanityTest) { // put a string of size 5 test::RandomString(&random, 5, &buf); Blob r5; - ASSERT_OK(blob_store.Put(buf.data(), 70, &r5)); + ASSERT_OK(blob_store.Put(Slice(buf), &r5)); // now you get to use the second file for (size_t i = 0; i < r5.chunks.size(); ++i) { ASSERT_EQ(r5.chunks[0].bucket_id, 1u); } +} + +TEST(BlobStoreTest, FragmentedChunksTest) { + const uint64_t block_size = 10; + const uint32_t blocks_per_file = 20; + Random random(5); + + BlobStore blob_store(test::TmpDir() + "/blob_store_test", + block_size, + blocks_per_file, + Env::Default()); + + string buf; + vector r(4); + + // put 4 strings of size 50 + for (int k = 0; k < 4; ++k) { + test::RandomString(&random, 50, &buf); + ASSERT_OK(blob_store.Put(Slice(buf), &r[k])); + // use the first file + for (size_t i = 0; i < r[k].chunks.size(); ++i) { + ASSERT_EQ(r[k].chunks[0].bucket_id, 0u); + } + } + + // delete the first and third + ASSERT_OK(blob_store.Delete(r[0])); + ASSERT_OK(blob_store.Delete(r[2])); + + // put string of size 100. it should reuse space that we deleting + // by deleting first and third strings of size 50 + test::RandomString(&random, 100, &buf); + Blob r2; + ASSERT_OK(blob_store.Put(Slice(buf), &r2)); + // use the first file + for (size_t i = 0; i < r2.chunks.size(); ++i) { + ASSERT_EQ(r2.chunks[0].bucket_id, 0u); + } } TEST(BlobStoreTest, CreateAndStoreTest) { @@ -111,7 +149,7 @@ TEST(BlobStoreTest, CreateAndStoreTest) { int string_size = size_blocks * block_size - (rand() % block_size); test::RandomString(&random, string_size, &buf); Blob r; - ASSERT_OK(blob_store.Put(buf.data(), string_size, &r)); + ASSERT_OK(blob_store.Put(Slice(buf), &r)); ranges.push_back(make_pair(r, buf)); } else if (decision == 3) { int ti = rand() % ranges.size(); @@ -124,6 +162,7 @@ TEST(BlobStoreTest, CreateAndStoreTest) { ranges.erase(ranges.begin() + ti); } } + ASSERT_OK(blob_store.Sync()); } } // namespace rocksdb